zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Gateway网关-源码讲解从GatewayAutoConfiguration开始

gateway源码 开始 讲解 网关
2023-09-11 14:16:28 时间

spring-cloud-gateway-core 包 下 spring.factories

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.gateway.config.GatewayClassPathWarningAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayHystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayResilience4JCircuitBreakerAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayLoadBalancerClientAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayNoLoadBalancerClientAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayMetricsAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayRedisAutoConfiguration,\
org.springframework.cloud.gateway.discovery.GatewayDiscoveryClientAutoConfiguration,\
org.springframework.cloud.gateway.config.SimpleUrlHandlerMappingGlobalCorsAutoConfiguration,\
org.springframework.cloud.gateway.config.GatewayReactiveLoadBalancerClientAutoConfiguration

org.springframework.boot.env.EnvironmentPostProcessor=\
org.springframework.cloud.gateway.config.GatewayEnvironmentPostProcessor

GatewayAutoConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true)
@EnableConfigurationProperties
@AutoConfigureBefore({ HttpHandlerAutoConfiguration.class,
		WebFluxAutoConfiguration.class })
@AutoConfigureAfter({ GatewayLoadBalancerClientAutoConfiguration.class,
		GatewayClassPathWarningAutoConfiguration.class })
@ConditionalOnClass(DispatcherHandler.class)
public class GatewayAutoConfiguration {

GatewayAutoConfiguration 会在HttpHandlerAutoConfiguration,WebFluxAutoConfiguration之前加载,在GatewayLoadBalancerClientAutoConfiguration,GatewayClassPathWarningAutoConfiguration之后加载

ConditionalOnClass(DispatcherHandler.class) 只要存在DispatcherHandler类才会实例化加载配置GatewayAutoConfiguration

@ConditionalOnBean(仅仅在当前上下文中存在某个对象时,才会实例化一个Bean)
@ConditionalOnClass(某个class位于类路径上,才会实例化一个Bean)
@ConditionalOnExpression(当表达式为true的时候,才会实例化一个Bean)
@ConditionalOnMissingBean(仅仅在当前上下文中不存在某个对象时,才会实例化一个Bean)
@ConditionalOnMissingClass(某个class类路径上不存在的时候,才会实例化一个Bean)
@ConditionalOnNotWebApplication(不是web应用)

加载application.properties文件中配置的路由规则

@Bean
@ConditionalOnMissingBean
public PropertiesRouteDefinitionLocator propertiesRouteDefinitionLocator(
		GatewayProperties properties) {
	return new PropertiesRouteDefinitionLocator(properties);
}

@ConfigurationProperties("spring.cloud.gateway")
@Validated
public class GatewayProperties {
@NotNull
	@Valid
	private List<RouteDefinition> routes = new ArrayList<>();

	/**
	 * List of filter definitions that are applied to every route.
	 */
	private List<FilterDefinition> defaultFilters = new ArrayList<>();
}

如果项目没有自定义的RouteDefinitionRepository 则会加载InMemoryRouteDefinitionRepository
默认将加载的规则存放在内存,加载application.properties文件中配置的路由规则,InMemory加载内存中配置的路由规则
Application启动的时候内存中是没有路由规则的且Application关闭的时候内存中保存的路由规则也会丢失
我们可以通过自定义RouteDefinitionRepository类替换掉InMemoryRouteDefinitionRepository
实现将路由规则保存到数据库中
数据库表格的设计可通过分析PropertiesRouteDefinitionLocator类抽象出模型

@Bean
@ConditionalOnMissingBean(RouteDefinitionRepository.class)
public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
	return new InMemoryRouteDefinitionRepository();
}
@Bean
@Primary
public RouteDefinitionLocator routeDefinitionLocator(
		List<RouteDefinitionLocator> routeDefinitionLocators) {
	return new CompositeRouteDefinitionLocator(
			Flux.fromIterable(routeDefinitionLocators));
}

默认实现的RouteLocator 是CachingRouteLocator

	@Bean
	@Primary
	@ConditionalOnMissingBean(name = "cachedCompositeRouteLocator")
	// TODO: property to disable composite?
	public RouteLocator cachedCompositeRouteLocator(List<RouteLocator> routeLocators) {
		return new CachingRouteLocator(
				new CompositeRouteLocator(Flux.fromIterable(routeLocators)));
	}
	
	@Bean
	public RouteLocator routeDefinitionRouteLocator(GatewayProperties properties,
			List<GatewayFilterFactory> gatewayFilters,
			List<RoutePredicateFactory> predicates,
			RouteDefinitionLocator routeDefinitionLocator,
			ConfigurationService configurationService) {
		return new RouteDefinitionRouteLocator(routeDefinitionLocator, predicates,
				gatewayFilters, properties, configurationService);
	}

GatewayControllerEndpoint

@RestControllerEndpoint(id = "gateway")
public class GatewayControllerEndpoint extends AbstractGatewayControllerEndpoint {

	public GatewayControllerEndpoint(List<GlobalFilter> globalFilters,
			List<GatewayFilterFactory> gatewayFilters,
			List<RoutePredicateFactory> routePredicates,
			RouteDefinitionWriter routeDefinitionWriter, RouteLocator routeLocator) {
		super(null, globalFilters, gatewayFilters, routePredicates, routeDefinitionWriter,
				routeLocator);
	}

提供了几个接口

@GetMapping("/routes")
public Flux<Map<String, Object>> routes() {
 /*从Gateway的核心配置类GatewayAutoConfiguration中
          可以看到Spring容器中注入的RouteLocator实例
        this.routeLocator属性是CachingRouteLocator的实例*/
	return this.routeLocator.getRoutes().map(this::serialize);
}

@GetMapping("/routes/{id}")
public Mono<ResponseEntity<Map<String, Object>>> route(@PathVariable String id) {
	// @formatter:off
	return this.routeLocator.getRoutes()
			.filter(route -> route.getId().equals(id))
			.singleOrEmpty()
			.map(this::serialize)
			.map(ResponseEntity::ok)
			.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
	// @formatter:on
}
http://localhost:8085/actuator/gateway/routefilters

SpringBoot自动装配的核心就是在启动的时候从类路径下的META-INF/spring.factories中获取EnableAutoConfiguration指定的值,将这些值作为自动配置类导入到容器中,使自动配置类生效,帮我们进行自动配置工作。

重点注意的几个类和接口:
RouteDefinition类, RouteLocator接口 , Route类, RouteDefinitionLocator接口, RouteDefinitionRepository接口

RouteDefinition类对象是网关路由信息的实体,无论通过propertities配置文件,还是通过自定义RouteDefinitionRepository实现从MySQL中加载加载路由规则最终都是将定义的路由规则转换为RouteDefinitio

@Validated
public class RouteDefinition {

	private String id;

	@NotEmpty
	@Valid
	private List<PredicateDefinition> predicates = new ArrayList<>();

	@Valid
	private List<FilterDefinition> filters = new ArrayList<>();

	@NotNull
	private URI uri;

	private Map<String, Object> metadata = new HashMap<>();

	private int order = 0;

通过自定义类实现 RouteDefinitionLocator接口,可以将路由规则存储在MySQL数据库/Redis中并配合actuator URL /actuator/gateway/refresh实现在不重启Gateway的情况下动态刷新路由规则。

RouteLocator接口对应的实现类有三个,主要是获取不同加载路由的实现

public interface RouteLocator {

	Flux<Route> getRoutes();

}

CachingRouteLocator实现类

public class CachingRouteLocator implements Ordered, RouteLocator,
		ApplicationListener<RefreshRoutesEvent>, ApplicationEventPublisherAware {
	private final RouteLocator delegate;

	private final Flux<Route> routes;

	private final Map<String, List> cache = new ConcurrentHashMap<>();
}

CompositeRouteLocator 实现类

public class CompositeRouteLocator implements RouteLocator {

	private final Flux<RouteLocator> delegates;

	public CompositeRouteLocator(Flux<RouteLocator> delegates) {
		this.delegates = delegates;
	}

	@Override
	public Flux<Route> getRoutes() {
		return this.delegates.flatMapSequential(RouteLocator::getRoutes);
	}

}

RouteDefinitionLocator 接口 ,获取加载的RouteDefinition

public interface RouteDefinitionLocator {

	Flux<RouteDefinition> getRouteDefinitions();

}

PropertiesRouteDefinitionLocator类加载:

public class PropertiesRouteDefinitionLocator implements RouteDefinitionLocator {

	private final GatewayProperties properties;

	public PropertiesRouteDefinitionLocator(GatewayProperties properties) {
		this.properties = properties;
	}

	@Override
	public Flux<RouteDefinition> getRouteDefinitions() {
		return Flux.fromIterable(this.properties.getRoutes());
	}

}

gateway 事件发布

首先spring源码中在项目启动refresh的时候最后后调用 AbstractApplicationContext 的finishRefresh方法来发布事件,其他组件可以通过扩展ApplicationListener , ApplicationEvent 实现实现事件的发布和监听
大概附加上 AbstractApplicationContext 的publishEvent事件的代码

protected void finishRefresh() {
	......
	// Propagate refresh to lifecycle processor first.
	getLifecycleProcessor().onRefresh();

	// Publish the final event.
	publishEvent(new ContextRefreshedEvent(this));

	// Participate in LiveBeansView MBean, if active.
	LiveBeansView.registerApplicationContext(this);
	....
}
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
	Assert.notNull(event, "Event must not be null");
		// Decorate event as an ApplicationEvent if necessary
		ApplicationEvent applicationEvent;
		if (event instanceof ApplicationEvent) {
			applicationEvent = (ApplicationEvent) event;
		}
		else {
			applicationEvent = new PayloadApplicationEvent<>(this, event);
			if (eventType == null) {
				eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
			}
		}
  // Multicast right now if possible - or lazily once the multicaster is initialized
		if (this.earlyApplicationEvents != null) {
			this.earlyApplicationEvents.add(applicationEvent);
		}
		else {
		// 走到这个方法 进去执行SimpleApplicationEventMulticaster 的multicastEvent 方法
			getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
		}
}

SimpleApplicationEventMulticaster  的multicastEvent 方法
	@Override
	public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
		ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
		Executor executor = getTaskExecutor();
		for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
			if (executor != null) {
			 //executor  默认为空 需要自己实现注入
				executor.execute(() -> invokeListener(listener, event));
			}
			else {
			//进入该方法 开始监听
				invokeListener(listener, event);
			}
		}
	}
主要到代码
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
	try {
    	//会去执行实现了ApplicationListener 的事件监听扩展方法
		listener.onApplicationEvent(event);
	}
		......
}

下面正式进入到Gateway的路由网关监听实现类 RouteRefreshListener

public class RouteRefreshListener implements ApplicationListener<ApplicationEvent> {
   // 两个重要属性
    private final ApplicationEventPublisher publisher;

   //监控 次数逐步累加
	private HeartbeatMonitor monitor = new HeartbeatMonitor();
}

实现方法 ApplicationListener接口的 onApplicationEvent方法

@Override
public void onApplicationEvent(ApplicationEvent event) {
	if (event instanceof ContextRefreshedEvent
			|| event instanceof RefreshScopeRefreshedEvent
			|| event instanceof InstanceRegisteredEvent) {
			//如果是上面三个事件类型 就进行 reset 初始进入的时候是  ContextRefreshedEvent 事件类型
		reset();
	}
	else if (event instanceof ParentHeartbeatEvent) {
		ParentHeartbeatEvent e = (ParentHeartbeatEvent) event;
		resetIfNeeded(e.getValue());
	}
	else if (event instanceof HeartbeatEvent) {
		HeartbeatEvent e = (HeartbeatEvent) event;
		resetIfNeeded(e.getValue());
	}
}
//AnnotationConfigReactiveWebServerApplicationContext 
private void reset() {
  //发布刷新上下文的路由事件 会加载propertiesRoute 和自定义注入的Route
	this.publisher.publishEvent(new RefreshRoutesEvent(this));
}
//整个方法通过比较前后两次更新事件发布的顺序,reset继续发布路由更新事件,在动态刷新路由的时候,可以通过调用刷新路由接口/refresh或者直接手动发布路由刷新事件
private void resetIfNeeded(Object value) {
	if (this.monitor.update(value)) {
		reset();
	}
}

public class HeartbeatMonitor { 
   //魔法原子类 解决了ABA问题
    private AtomicReference<Object> latestHeartbeat = new AtomicReference();

    public HeartbeatMonitor() {
    }

    public boolean update(Object value) {
        Object last = this.latestHeartbeat.get();
        return value != null && !value.equals(last) ? this.latestHeartbeat.compareAndSet(last, value) : false;
    }
}

之后进入下一个监听实现类 CachingRouteLocator

public class CachingRouteLocator implements Ordered, RouteLocator,
		ApplicationListener<RefreshRoutesEvent>, ApplicationEventPublisherAware {
		
	// CompositeRouteLocator  是一个混合RouteLocator 会将多方面注入的路由信息全部加载 
	private final RouteLocator delegate;
	// routes 路由信息
	private final Flux<Route> routes;
	// cache 内存缓存
	private final Map<String, List> cache = new ConcurrentHashMap<>();

    //监听到上面发送的 RefreshRouteEvent 事件
    public void onApplicationEvent(RefreshRoutesEvent event) {
		try {
			fetch().collect(Collectors.toList()).subscribe(list -> Flux.fromIterable(list)
					.materialize().collect(Collectors.toList()).subscribe(signals -> {
						applicationEventPublisher
								.publishEvent(new RefreshRoutesResultEvent(this));
						cache.put(CACHE_KEY, signals);
					}, throwable -> handleRefreshError(throwable)));
		}
		catch (Throwable e) {
			handleRefreshError(e);
		}
	}

//fetch 
private Flux<Route> fetch() {
   //先后执行 RouteDefinitionRouteLocator 的getRoutes 方法
	return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
}

}

RouteDefinitionRouteLocator 类

public class RouteDefinitionRouteLocator
		implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {
	
		private final RouteDefinitionLocator routeDefinitionLocator;


	// 获取路由信息 	
    @Override
	public Flux<Route> getRoutes() {
	   //这里面 routeDefinitionLocator 也是 会去执行 CompositeRouteDefinitionLocator 的getRouteDefinitions方法 
	   //具体会去执行DiscoveryClientRouteDefinitionLocator  PropertiesRouteDefinitionLocator 以及实现 RouteDefinitionRepository 的类的  getRouteDefinitions 方法
		Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions()
				.map(this::convertToRoute);

		if (!gatewayProperties.isFailOnRouteDefinitionError()) {
			// instead of letting error bubble up, continue
			routes = routes.onErrorContinue((error, obj) -> {
				if (logger.isWarnEnabled()) {
					logger.warn("RouteDefinition id " + ((RouteDefinition) obj).getId()
							+ " will be ignored. Definition has invalid configs, "
							+ error.getMessage());
				}
			});
		}

		return routes.map(route -> {
			if (logger.isDebugEnabled()) {
				logger.debug("RouteDefinition matched: " + route.getId());
			}
			return route;
		});
	}

}

分享个人一个开源的项目,里面有具体使用到网关gateway统一处理异常,动态路由加载,网关限流处理,网关统一处理请求,请求认证处理等,并且集成了日志追踪,日志分析,RPC调用,动态限流规则加载,分布式事务,高并发限流处理等
项目地址:GITEE项目链接地址

Git操作:

将项目下载本地
git clone https://gitee.com/ITLULU/fairy-alibaba-cloud.git
搭建自己的环境  编译项目
mvn clean install