zl程序教程

您现在的位置是:首页 >  其他

当前栏目

Eurkea,Ribbon和RestTemplate是如何结合到一起完成服务注册与发现功能的? --上

2023-03-07 09:10:11 时间

Eurkea,Ribbon和RestTemplate是如何结合到一起完成服务注册与发现功能的? --上


引言

ResrTemplate组件是用来完成远程调用功能的,而Ribbon组件负责完成客户端负载均衡功能的,Eurkea服务端负责保存服务名和真实服务器地址的映射关系的,如果我们想要这三者结合起来完成服务发现与注册功能,有一个很简单的思路如下:

  • 拦截RestTemplate发出的请求,Ribbon负责解析出请求中的服务名,然后通过该服务名去Eurkea上拉取获得该服务名下的真实服务列表,Ribbon随机相关负载均衡算法,选择出一个服务返回,然后将该服务真实ip和port替换原来的服务名,最终将请求移交给ResrtTemplate发出。

本文我们来具体看一下这个实现过程。


如何进行远程调用

java如何想要进行http远程调用,可以使用别人封装号的工具库,具体有:

  • JDK自带的URLConnection
  • HttpClient
  • OKHttp

这三个工具库中,jdk自带的性能是最差的,而第三方提供的两个相对较好,但是使用需要引入额外的依赖才可以。

RestTemplate是Spring提供的一个访问Http服务的客户端,该类是针对RESTFUL风格API进行的设计,底层具体请求的发送默认依靠的是jdk自带的URLConnection,那么如果我们想让RestTemplate底层使用HttpClient或者OkHttp进行请求发送,我们应该如何实现切换呢?

别急,我们先来大概看一下RestTemplate发送请求的一个过程:

    //RestTemplate的getForObject还是其他方法,最终调用的都是该方法
	protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
			@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
	    ...		
		ClientHttpResponse response = null;
		try {
		   //创建请求对象--ClientHttpRequest只提供了一个execute接口,该接口完成具体请求执行过程
		   //该接口分别有UrlConnection,HttpClient,OkHttp相关支持的实现类
			ClientHttpRequest request = createRequest(url, method);
			 ...
			 //执行请求
			response = request.execute();
			//处理返回结果的过程
			handleResponse(url, method, response);
			return (responseExtractor != null ? responseExtractor.extractData(response) : null);
		}
		... 
	}

由于UrlConnection,OkHttp,HttpClient三个组件库提供的相关api都不一样,因此Spring为了实现能够切换底层实现的效果,就采用了适配器的思想,通过一个顶层ClientHttpRequest接口规定好执行请求调用的execute方法,然后为了适配相关组件库分别给出了相关的实现类,分别实现了execute方法完成具体的请求转发。

那么如何知道当前应该使用何种类型的ClientHttpRequest实现呢?

RestTemplate将该工作交给ClientHttpRequestFactory负责处理,createRequest方法中我们也可以看到:

	protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
	    //获取ClientHttpRequestFactory来创建具体的请求对象
		ClientHttpRequest request = getRequestFactory().createRequest(url, method);
		initialize(request);
		if (logger.isDebugEnabled()) {
			logger.debug("HTTP " + method.name() + " " + url);
		}
		return request;
	}

Spring这里并没有将ClientHttpRequestFactory设计为简单工厂,而是采用了抽象工厂实现:

对于请求对象的创建,不是简单的new就完事了,还需要针对特定组件做一些特定参数设置,因此不适合采用简单工厂实现,通过抽象工厂实现,可以将相关逻辑放到具体工厂中完成。

RestTemplate默认采用SimpleClientHttpRequestFactory,也就是说RestTemplate底层默认使用URLConnection发起请求:

我们可以通过RestTemplate构造函数传入或者手动调用setter方法设置具体请求工厂实现。


如何拦截RestTemplate请求执行

拦截请求执行,首先想到的就是拦截器,RestTemplate也是采用拦截器实现的请求拦截:

那么如何将拦截器与请求执行关联起来呢?

RestTemplate采用InterceptingClientHttpRequestFactory创建出对应的InterceptingClientHttpRequest请求对象,该请求对象是对原请求对象的包装,主要是在执行InterceptingClientHttpRequest对象的execute方法时,先执行拦截器链,再调用目标请求对象的ececute方法来执行请求。

InterceptingHttpAccessor覆写了父类返回RequestFactory的getter方法:

	@Override
	public ClientHttpRequestFactory getRequestFactory() {
	    //如果我们设置了请求拦截器--那么就使用interceptingRequestFactory对原始请求工厂和请求对象进行包装
		List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
		if (!CollectionUtils.isEmpty(interceptors)) {
			ClientHttpRequestFactory factory = this.interceptingRequestFactory;
			if (factory == null) {
				factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
				this.interceptingRequestFactory = factory;
			}
			return factory;
		}
		//如果没有设置拦截器,那么返回原始的请求工厂
		else {
			return super.getRequestFactory();
		}
	}

InterceptingClientHttpRequestFactory所谓对原始工厂的包装行为实现也非常简单:

public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {
	private final List<ClientHttpRequestInterceptor> interceptors;
	public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory,
			@Nullable List<ClientHttpRequestInterceptor> interceptors) {
		super(requestFactory);
		this.interceptors = (interceptors != null ? interceptors : Collections.emptyList());
	}

	@Override
	protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
	    //最终包装行为体现在InterceptingClientHttpRequest中
		return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
	}
}

InterceptingClientHttpRequest类实现如下;

class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
    //原始的请求工厂和请求相关信息
	private final ClientHttpRequestFactory requestFactory;
	private HttpMethod method;
	private URI uri;
	//请求拦截器链
	private final List<ClientHttpRequestInterceptor> interceptors;
	...
	@Override
	protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
	    //InterceptingRequestExecution是拦截器链模式的体现
		InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
		return requestExecution.execute(this, bufferedOutput);
	}


	private class InterceptingRequestExecution implements ClientHttpRequestExecution {
		private final Iterator<ClientHttpRequestInterceptor> iterator;
		public InterceptingRequestExecution() {
			this.iterator = interceptors.iterator();
		}
        //注意: 我们可以在拦截器中调用InterceptingRequestExecution的execute方法回到拦截器链继续往下执行,也可以
        //不调用,终止拦截器链的执行
		@Override
		public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
		    //先将拦截器执行完毕
			if (this.iterator.hasNext()) {
				ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
				return nextInterceptor.intercept(request, body, this);
			}
			//执行完拦截器再利用原始的请求工厂创建出对应的请求对象,然后执行请求,返回结果
			else {
			     ... 
				ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
				...
				return delegate.execute();
			}
		}
	}
}

如何实现负载均衡

实现负载均衡的第一步首先我们需要准备一个ClientHttpRequestInterceptor用于拦截RestTemplate的请求执行,Spring Cloud提供了对应的LoadBalancerInterceptor:

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

	private LoadBalancerClient loadBalancer;

	private LoadBalancerRequestFactory requestFactory;

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
			LoadBalancerRequestFactory requestFactory) {
		this.loadBalancer = loadBalancer;
		this.requestFactory = requestFactory;
	}

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
		this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
	}

	@Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
	    //获取到请求的URI		
		final URI originalUri = request.getURI();
		//获取主机名: 这里主机名可能是服务名,我们需要将其转换为对应服务的真实地址
		String serviceName = originalUri.getHost();
		//由客户端负载均衡器LoadBalancerClient根据服务名转换为真实服务地址,然后执行请求
		return this.loadBalancer.execute(serviceName,
				this.requestFactory.createRequest(request, body, execution));
	}
}

LoadBalancerInterceptor的intercept方法中,最终调用了客户端负载均衡器的execute方法,并传入了服务名及对应的请求对象,我们可以猜想一下客户端负载均衡器的execute实现逻辑是啥:

  • 首先根据服务名去服务注册中心获取到对应的服务实例列表
  • 然后采用负载均衡算法从拉取到的服务实例列表中挑选出一个可用实例
  • 然后将原始请求URL中的服务名转换为真实的ip地址
  • 最终让传入的请求对象执行请求并返回结果

LoadBalancerRequestFactory并没有实现ClientHttpRequestFactory接口,因为他的createRequest方法想要返回的是LoadBalancerRequest,而不是ClientRequest:

public class LoadBalancerRequestFactory {
	private LoadBalancerClient loadBalancer;
	private List<LoadBalancerRequestTransformer> transformers;
     ...
	public LoadBalancerRequest<ClientHttpResponse> createRequest(
			final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) {
	    //返回一个LoadBalancerRequest实例对象,注意LoadBalancerRequest是函数式接口	
		return instance -> {
		    //ServiceRequestWrapper对象主要负责重新请求对象的getUri方法,返回服务名被替换为服务真实ip地址后的URI
			HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
					this.loadBalancer);
		    //回调接口,可以对请求进行修改			
			if (this.transformers != null) {
				for (LoadBalancerRequestTransformer transformer : this.transformers) {
					serviceRequest = transformer.transformRequest(serviceRequest,
							instance);
				}
			}
			//URI替换完成后,继续交由原始请求对象执行请求,当然拦截器链如果还存在其他拦截器还需要继续执行
			return execution.execute(serviceRequest, body);
		};
	}
}

LoadBalancerRequest本身是一个函数式接口,只提供一个apply方法:

public interface LoadBalancerRequest<T> {
	T apply(ServiceInstance instance) throws Exception;
}
public class ServiceRequestWrapper extends HttpRequestWrapper {
	private final ServiceInstance instance;
	private final LoadBalancerClient loadBalancer;
     ...
     //核心是重写getURI,完成服务名到服务真实IP地址的替换
	@Override
	public URI getURI() {
		URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
		return uri;
	}
}

客户端负载器的具体实现

springCloud规定了客户端负载器需要实现的接口:

//serviceId这里就是服务名--application-name
public interface LoadBalancerClient extends ServiceInstanceChooser {
	<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
	<T> T execute(String serviceId, ServiceInstance serviceInstance,
			LoadBalancerRequest<T> request) throws IOException;
	URI reconstructURI(ServiceInstance instance, URI original);
}

public interface ServiceInstanceChooser {
	ServiceInstance choose(String serviceId);
}

//客户端负载均衡器会根据服务名寻找到一批具体的服务实例列表
//服务实例列表中封装了当前服务的具体ip,port,是否采用https进行通信等信息
public interface ServiceInstance {
	default String getInstanceId() {
		return null;
	}
	String getServiceId();
	String getHost();
	int getPort();
	boolean isSecure();
	URI getUri();
	Map<String, String> getMetadata();
	default String getScheme() {
		return null;
	}
}

springCloud默认只提供了一个BlockingLoadBalancerClient的客户端负载均衡器的实现,但是我们一般采用Ribbon提供的客户端负载均衡器实现:


Ribbon提供的客户端负载均衡器实现

客户端负载均衡器的核心实现是execute方法,我们一起来看一下:

	@Override
	public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
			throws IOException {
		return execute(serviceId, request, null);
	}
	
	public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
			throws IOException {
		//1.根据服务名获取到对应的服务实例列表--ILoadBalancer负责管理服务实例列表
		ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
		//2.利用负载均衡算法从服务实例列表取出一个服务实例--server是ribbon提供的对服务实例信息封装的类
		Server server = getServer(loadBalancer, hint);
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		//3.RibbonServer继承至ServerInstance接口--将server信息适配到springcloud规定获取服务实例信息的接口上
		RibbonServer ribbonServer = new RibbonServer(serviceId, server,
				isSecure(server, serviceId),
				serverIntrospector(serviceId).getMetadata(server));
        //4.执行请求
		return execute(serviceId, ribbonServer, request);
	}
//ribbon提供的对于服务实例进行管理的类
public interface ILoadBalancer {
    //添加服务实例
	public void addServers(List<Server> newServers);
	//负责根据负责均衡算法选择一个合适的服务实例
	public Server chooseServer(Object key);
	//标记某个实例服务下线
	public void markServerDown(Server server);
	@Deprecated
	public List<Server> getServerList(boolean availableOnly);
	//只获取未下线的服务实例
    public List<Server> getReachableServers();
	public List<Server> getAllServers();
}

根据服务名获取掌握服务实例列表的ILoadBalancer:

	protected ILoadBalancer getLoadBalancer(String serviceId) {
	    //clientFactory即SpringClientFactory负责根据服务名寻找到服务实例列表
	    //SpringClientFactory是ribbon提供的继承NamedContextFactory
		return this.clientFactory.getLoadBalancer(serviceId);
	}

如何根据服务名找到服务实例列表

SpringCloud团队定义了LoadBalancerClient负载均衡客户端接口,那么负载均衡客户端如何根据服务名去寻找对应的服务实例列表呢?

大家第一反应肯定是负载均衡客户端去问注册中心Eurkea要,但是注册中心实现有很多,如何以统一的方式让注册中心将相关信息告知客户端负载均衡器呢?

SpringCloud团队已经为我们定义好了相关的交互过程:

该交互过程的核心类是NamedContextFactory,该类是客户端负载均衡器与各种类型的注册中心交互的核心,由于此部分设计较为复杂,将放到下篇文章进行介绍。


小结

本文我们对如何拦截ResTemplate请求执行进行了详细分析,并且简单介绍了一些客户端负载均衡器接口实现,下篇文章中我们将来详细看看客户端负载均衡器如何从注册中心获取到服务列表实例、