zl程序教程

您现在的位置是:首页 >  工具

当前栏目

Gateway网关-自定义实现动态路由信息存储记载

路由gateway存储 实现 信息 自定义 动态 网关
2023-09-11 14:16:28 时间

RouteDefinition实体类 记录了 PredicateDefinition 和 FilterDefinition

	@NotEmpty
	@Valid
	private List<PredicateDefinition> predicates = new ArrayList<>();

	@Valid
	private List<FilterDefinition> filters = new ArrayList<>();

PredicateDefinition 记录 断言信息

@Validated
public class PredicateDefinition {
	@NotNull
	private String name;

	private Map<String, String> args = new LinkedHashMap<>();

	public PredicateDefinition() {
	}

	public PredicateDefinition(String text) {
		int eqIdx = text.indexOf('=');
		if (eqIdx <= 0) {
			throw new ValidationException("Unable to parse PredicateDefinition text '"
					+ text + "'" + ", must be of the form name=value");
		}
		setName(text.substring(0, eqIdx));

		String[] args = tokenizeToStringArray(text.substring(eqIdx + 1), ",");

		for (int i = 0; i < args.length; i++) {
			this.args.put(NameUtils.generateName(i), args[i]);
		}
	}

}

FilterDefinition 记录 过滤拦截信息

@Validated
public class FilterDefinition {

	@NotNull
	private String name;

	private Map<String, String> args = new LinkedHashMap<>();

	public FilterDefinition() {
	}

	public FilterDefinition(String text) {
		int eqIdx = text.indexOf('=');
		if (eqIdx <= 0) {
			setName(text);
			return;
		}
		setName(text.substring(0, eqIdx));

		String[] args = tokenizeToStringArray(text.substring(eqIdx + 1), ",");

		for (int i = 0; i < args.length; i++) {
			this.args.put(NameUtils.generateName(i), args[i]);
		}
	}
}

数据表创建

-- 路由规则表
CREATE TABLE `route_rule` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `route_id` varchar(255) DEFAULT NULL COMMENT '路由ID ',
  `uri` varchar(255) DEFAULT NULL COMMENT '目标地址',
  `ordered` int(11) DEFAULT NULL COMMENT '加载顺序',
  `creator` varchar(30) DEFAULT  NULL COMMENT  '创建者',
  `created_time` datetime DEFAULT NULL COMMENT '创建时间',
  `updator` varchar(30) DEFAULT  NULL COMMENT  '修改人',
  `updated_time datetime DEFAULT NULL COMMENT '修改时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 
-- 路由参数表 主要用来存储 断言器Predicates和过滤器Filter
-- route_rule与route_args 存在一对多的映射关系
CREATE TABLE `route_args` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `type` int(11) DEFAULT NULL COMMENT '参数类型 0:断言器 1:过滤器',
  `name` varchar(255) DEFAULT NULL COMMENT '断言器名称 例如: Path RewritePath',
  `args_name` varchar(255) DEFAULT NULL COMMENT '参数名称',
  `args_value` varchar(255) DEFAULT NULL COMMENT '参数值',
  `route_id` varchar(255) DEFAULT NULL COMMENT '表route_id中的字段route_id',
  `creator` varchar(30) DEFAULT  NULL COMMENT  '创建者',
  `created_time` datetime DEFAULT NULL COMMENT '创建时间',
  `updator` varchar(30) DEFAULT  NULL COMMENT  '修改人',
  `updated_time datetime DEFAULT NULL COMMENT '修改时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

使用mysql存储记录路由信息,通过自定义 MysqlRouteDefinitionRepository 实现RouteDefinitionRepository 接口 完成路由信息的动态加载,如果没有自定义实现RouteDefinitionRepository 接口 ,会默认使用内存存储形式

  在GatewayAutoConfiguration 源码部分有说过,如果没有自定义注入的RouteDefinitionRepository会使用内存加载的形式
	@Bean
	@ConditionalOnMissingBean(RouteDefinitionRepository.class)
	public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
		return new InMemoryRouteDefinitionRepository();
	}
public class MySQLRouteDefinitionRepository implements RouteDefinitionRepository {

    @Autowired
    private ReactiveStringRedisTemplate redisTemplate;
    @Autowired
    private GatewayRouteDao gatewayRouteDao;
    @Autowired
    private GatewayRouteArgsDao gatewayRouteArgsDao;

    /**
     * Gateway启动时会通过RouteDefinitionRouteLocator.getRoutes方法
     * 将路由规则RouteDefinition转换为Route
     * this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);
     * 加载到内存中
     **/
    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {
        List<GatewayRouteDefinition> routePOS = gatewayRouteDao.getGatewayRoute();
        return Flux.fromIterable(GatewayRouteDefinition.toRouteDefinition(routePOS));
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public Mono<Void> save(Mono<RouteDefinition> route) {
        RouteDefinition definition = route.block();
        if (ObjectUtils.isEmpty(definition.getId())) {
            return Mono.error(new IllegalArgumentException("id may not be empty"));
        }
        GatewayRoutePO routePO = GatewayRoutePO.toGatewayRoute(definition);
        List<GatewayRouteArgsPO> filter = GatewayRouteArgsPO.toGatewayRouteFilterArgs(definition.getFilters(),definition.getId());
        List<GatewayRouteArgsPO> predicate = GatewayRouteArgsPO.toGatewayRoutePredictArgs(definition.getPredicates(),definition.getId());
        //1:先查询
        GatewayRoutePO gatewayRoutePOS = gatewayRouteDao.findGatewayRouteByRouteId(definition.getId());
        if (!Objects.isNull(gatewayRoutePOS)) {
            //更新
            gatewayRouteDao.updateGatewayRoute(routePO);
            gatewayRouteArgsDao.batchUpdateGatewayArgs(filter);
            gatewayRouteArgsDao.batchUpdateGatewayArgs(predicate);
        } else {
            //2: 保存
            gatewayRouteDao.saveGatewayRoute(routePO);
            gatewayRouteArgsDao.bathSaveGatewayArgs(filter);
            gatewayRouteArgsDao.bathSaveGatewayArgs(predicate);
        }

        return Mono.empty();
    }


    @Override
    @Transactional(rollbackFor = Exception.class)
    public Mono<Void> delete(Mono<String> routeId) {
//        return routeId.flatMap(r -> {
//            if (ObjectUtils.isEmpty(r)) {
//                return Mono.error(new IllegalArgumentException("id may not be empty"));
//            }
//            //删除路由以及路由 参数
//            gatewayRouteDao.deleteRoute(r);
//            return Mono.empty();
//        });

        String routId = routeId.block();
        if (ObjectUtils.isEmpty(routId)) {
            return Mono.defer(() -> Mono.error(
                new NotFoundException("RouteDefinition not found: " + routeId)));
        }
        gatewayRouteDao.deleteRoute(routId);
        return Mono.empty();
    }


}

在项目中添加actuator依赖,并配置可以监控gateway

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

可以通过接口查看加载的路由信息
请求路径:http://localhost:8085/actuator/gateway/routes 对应 GatewayControllerEndpoint 的routes 方法
在这里插入图片描述
响应结果:

[
	{
		"predicate": "Paths: [/admin/**], match trailing slash: true",
		"route_id": "authorication-admin",
		"filters": [],
		"uri": "lb://authorication-admin",
		"order": 0
	},
	{
		"predicate": "Paths: [/cloud-gateway/**], match trailing slash: true",
		"metadata": {
			"nacos.instanceId": "192.168.234.1#8085#DEFAULT#DEFAULT_GROUP@@cloud-gateway",
			"nacos.weight": "1.0",
			"nacos.cluster": "DEFAULT",
			"nacos.ephemeral": "true",
			"nacos.healthy": "true",
			"preserved.register.source": "SPRING_CLOUD"
		},
		"route_id": "ReactiveCompositeDiscoveryClient_cloud-gateway",
		"filters": [
			"[[RewritePath /cloud-gateway/(?<remaining>.*) = '/${remaining}'], order = 1]"
		],
		"uri": "lb://cloud-gateway",
		"order": 0
	},
	{
		"predicate": "Paths: [/authorication-admin/**], match trailing slash: true",
		"metadata": {
			"nacos.instanceId": "192.168.234.1#8084#DEFAULT#DEFAULT_GROUP@@authorication-admin",
			"nacos.weight": "1.0",
			"nacos.cluster": "DEFAULT",
			"nacos.ephemeral": "true",
			"nacos.healthy": "true",
			"preserved.register.source": "SPRING_CLOUD"
		},
		"route_id": "ReactiveCompositeDiscoveryClient_authorication-admin",
		"filters": [
			"[[RewritePath /authorication-admin/(?<remaining>.*) = '/${remaining}'], order = 1]"
		],
		"uri": "lb://authorication-admin",
		"order": 0
	}
]

其中下面这块是我通过实现RouteDefinitionRepository接口,自定义保存的动态路由数据

{
“predicate”: “Paths: [/admin/**], match trailing slash: true”,
“route_id”: “authorication-admin”,
“filters”: [],
“uri”: “lb://authorication-admin”,
“order”: 0
},

手动刷新路由: http://localhost:8085/actuator/gateway/refresh 这是post请求 对应AbstractGatewayControllerEndpoint 类的refresh 方法
在这里插入图片描述
AbstractGatewayControllerEndpoint refresh方法

@PostMapping("/refresh")
	public Mono<Void> refresh() {
		this.publisher.publishEvent(new RefreshRoutesEvent(this));
		return Mono.empty();
	}

动态路由信息加载过程如下所示:
在上一篇文章 Gateway网关-源码讲解从GatewayAutoConfiguration开始
已经从源码分析了gateway路由刷新事件的过程,首先会由RouteRefreshListener 监听到ContextRefreshedEvent事件

@Override
public void onApplicationEvent(ApplicationEvent event) {
	if (event instanceof ContextRefreshedEvent
			|| event instanceof RefreshScopeRefreshedEvent
			|| event instanceof InstanceRegisteredEvent) {
		reset();
	}
		
private void reset() {
		this.publisher.publishEvent(new RefreshRoutesEvent(this));
}

之后会发布一个路由刷新事件,也正是发布路由刷新事件,才会重新执行一遍RouteDefinitionLocator 的getRouteDefinitions 去获取我通过实现RouteDefinitionRepository接口写入的路由信息

public interface RouteDefinitionRepository
		extends RouteDefinitionLocator, RouteDefinitionWriter {

}
这个接集成了RouteDefinitionLocator 接口 和RouteDefinitionWriter  接口
其中RouteDefinitionWriter  接口实现动态路由信息的save和delete操作 ,而RouteDefinitionLocator 接口可以获取刷新的路由信息
public interface RouteDefinitionWriter {

	Mono<Void> save(Mono<RouteDefinition> route);

	Mono<Void> delete(Mono<String> routeId);

}

public interface RouteDefinitionLocator {

	Flux<RouteDefinition> getRouteDefinitions();

}

在CachingRouteDefinitionLocator里会去监听收到的RefreshRoutesEvent事件,会去获取RouteDefinition

public class CachingRouteDefinitionLocator
		implements RouteDefinitionLocator, ApplicationListener<RefreshRoutesEvent> {
		
   	@Override
	public void onApplicationEvent(RefreshRoutesEvent event) {
		fetch().materialize().collect(Collectors.toList())
				.doOnNext(routes -> cache.put(CACHE_KEY, routes)).subscribe();
	}
	private Flux<RouteDefinition> fetch() {
		return this.delegate.getRouteDefinitions();
	}
}

这个类重点关注对象delegate,默认是CompositeRouteDefinitionLocator 会执行 他的getRouteDefinitions方法

	private final RouteDefinitionLocator delegate;

	@Override
	public Flux<RouteDefinition> getRouteDefinitions() {
		return this.delegates
				.flatMapSequential(RouteDefinitionLocator::getRouteDefinitions)
				.flatMap(routeDefinition -> {
					if (routeDefinition.getId() == null) {
						return randomId().map(id -> {
							routeDefinition.setId(id);
							if (log.isDebugEnabled()) {
								log.debug(
										"Id set on route definition: " + routeDefinition);
							}
							return routeDefinition;
						});
					}
					return Mono.just(routeDefinition);
				});
	}

自已可以debug看下,CompositeRouteDefinitionLocator 在执行getRouteDefinitions的时候,会调用其他实现RouteDefinitionLocator接口的类也去调用他的getRouteDefinitions方法,这里面就包含我们自定义实现RouteDefinitionRepository接口的类

其中 CachingRouteLocator类也监听了 RefreshRoutesEvent事件,实现的是RouteLocator 接口,这里可以对比下RouteLocator 接口和RouteDefinitionLocator接口,他们都是去获取路由信息,只不过是返回的封装对象不同,一个是Route 路由信息用于接口封装返回路由信息,一个是 RouteDefinition 用于定义获取路由信息

public interface RouteLocator {
	Flux<Route> getRoutes();
}

public interface RouteDefinitionLocator {
	Flux<RouteDefinition> getRouteDefinitions();
}
public class CachingRouteLocator implements Ordered, RouteLocator,
		ApplicationListener<RefreshRoutesEvent>, ApplicationEventPublisherAware {
	
	private final RouteLocator delegate;
	private ApplicationEventPublisher applicationEventPublisher;
	private Flux<Route> fetch() {
		return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
	}

   @Override
	public void onApplicationEvent(RefreshRoutesEvent event) {
		try {
			fetch().collect(Collectors.toList()).subscribe(list -> Flux.fromIterable(list)
					.materialize().collect(Collectors.toList()).subscribe(signals -> {
					//发布RefreshRoutesResultEvent 事件,获取路由刷新的结果 是否成功,可以自定义实现监听RefreshRoutesResultEvent事件捕获路由刷新结果
						applicationEventPublisher
								.publishEvent(new RefreshRoutesResultEvent(this));
						cache.put(CACHE_KEY, signals);
					}, throwable -> handleRefreshError(throwable)));
		}
		catch (Throwable e) {
			handleRefreshError(e);
		}
	}
}

分享个人一个开源的项目,里面有具体使用到网关gateway统一处理异常,动态路由加载,网关限流处理,网关统一处理请求,请求认证处理等,并且集成了日志追踪,日志分析,RPC调用,动态限流规则加载,分布式事务,高并发限流处理等
项目地址:GITEE项目链接地址