zl程序教程

您现在的位置是:首页 >  其他

当前栏目

System|分布式|Dubbo是如何实现集群化的?

2023-03-15 22:01:31 时间

多个被调用者在调用时虚拟化为一个被调用者,这就是Dubbo的集群化,本文将从一次碰壁经历出发,讲一讲Dubbo是如何进行集群化的。

(不要吐槽虚拟化,按照我们上课讲的广义定义,抽象就是以不同接口隐藏细节[e.g. 设备->文件],虚拟化就是以相同接口隐藏细节[e.g. 物理设备->虚拟设备])

场景

我需要为所有的Invoker维护他们的metadata,因此在LoadBalance中添加了代码来维护,然而,奇怪的事情发现了,当Invoker数目为1时,LoadBalance居然不发生了。

这可不行,虽然现在不需要这些数据,万一动态增加了Invoker呢?那就存在问题了。我想要让一个被调用者也能实现集群化,行不行?毕竟,集群和单独的Invoker是有本质区别的,而大小为100的集群和大小为1的集群却只有量变。

我希望大小为1的集群依旧能够执行负载均衡的代码,于是想把边界检测给删了。

服务发现源码分析

读Dubbo框架源码,Dubbo在单invoker情况下进行了优化。

public abstract class AbstractLoadBalance implements LoadBalance {
     public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        } else {
            return invokers.size() == 1 ? (Invoker)invokers.get(0) : this.doSelect(invokers, url, invocation);
        }
    }

我们在自定义LoadBalance实现中进行重载,结果还是不行

    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        } else {
            return this.doSelect(invokers, url, invocation);
        }
    }

这里就要讲Dubbo服务发现的原理了,Directory对应的是zookeeper中注册的服务列表,Router是按照条件筛选服务,LoadBalance是按照算法选择服务。

这里的Cluster就是把一组服务伪装成一个服务,也就是按照我们上面定义的Directory、Router、LoadBalance等规则选出Invoker,作为集群被调用的Invoker。

集群架构

我们来看一看Cluster是怎样通过这些规则伪装成一个Invoker的。(省略无关代码)

Directory:根据URL从Router Chain筛选Invoker

public abstract class AbstractDirectory<T> implements Directory<T> {
    protected RouterChain<T> routerChain;
}
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    private void refreshInvoker(List<URL> invokerUrls) {
            List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList(newUrlInvokerMap.values()));
            this.routerChain.setInvokers(newInvokers);
            this.invokers = this.multiGroup ? this.toMergeInvokerList(newInvokers) : newInvokers;
            this.urlInvokerMap = newUrlInvokerMap;
        }
    }

RouterChain: 每个路由依次进行筛选,取交集

    public List<Invoker<T>> route(URL url, Invocation invocation) {
        List<Invoker<T>> finalInvokers = this.invokers;
        Router router;
        for(Iterator var4 = this.routers.iterator(); var4.hasNext(); finalInvokers = router.route(finalInvokers, url, invocation)) {
            router = (Router)var4.next();
        }
        return finalInvokers;
    }

Router:根据配置项进行筛选,如果match则加入结果中

    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
                Iterator var5 = invokers.iterator();
                while(var5.hasNext()) {
                    Invoker<T> invoker = (Invoker)var5.next();
                    if (this.matchThen(invoker.getUrl(), url)) {
                        result.add(invoker);
                    }

Cluster : 根据Directory创建集群Invoker,通过directory/router获取invokers

public class FailoverCluster extends AbstractCluster { 
    public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker(directory);
    }

ClusterInvoker:doInvoke->AbstractInvoker的select

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
   
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
      Invoker<T> invoker = this.select(loadbalance, invocation, copyInvokers, invoked);
    }
}

AbstractInvoker:select->doselect->loadBalance的select

public abstract class AbstractClusterInvoker<T> implements Invoker<T> {

    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
                Invoker<T> invoker = this.doSelect(loadbalance, invocation, invokers, selected);
                if (sticky) {
                    this.stickyInvoker = invoker;
                }
           }
    private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        } else if (invokers.size() == 1) {
            return (Invoker)invokers.get(0);
        } else {
            Invoker<T> invoker = loadbalance.select(invokers, this.getUrl(), invocation);

可以看出,关键在于AbstractInvoker的doSelect这一步,将单个服务的集群直接跳过了。我们需要重写Abstract Invoker,Dubbo是开源的,如果直接fork源码那这一步轻而易举,问题是maven项目里这些都是只读的。

然而,艰难地把这里也给重写之后,现实依然残酷,我只能继续向前追溯。

ReferenceConfig: CreateProxy->Cluster的doJoin

经过了化身恶魔之后,我继续看哪里还有边界检测,答案是在引用设置里面。一开始的时候URL只有一条就没有集群。

public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
           @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
                  private T createProxy(Map<String, String> map) {
           if (urls.size() == 1) {
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            }  
           else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                    if (UrlUtils.isRegistry(url)) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // for multi-subscription scenario, use 'zone-aware' policy by default
                    String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
                    // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                    invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
                }

WDNMD!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

我选择死亡。我不走这条路了。我宣布,需要维护数据的集群节点不得小于2。

没法改maven项目的源码好气啊,这就是框架的痛么。

总结

Dubbo实现集群化分为这么几个步骤

准备阶段 (doJoin)

  1. 监听注册中心的URL并创建Invoker,如果URL数目大于1则将所有Invoker组成集群
  2. 集群根据URL目录,将invokers根据路由规则进行依次筛选,直到选出符合规则的Invoker
  3. 返回虚拟化的Invoker,和单Invoker同接口,封装集群容错细节。

调用阶段(doInvoke)

  1. 满足路由规则的Invoker根据负载均衡选择出对应的invoker
  2. 选择的invoker按照集群容错策略进行可多次执行的调用,收集到结果再返回

和单独Invoker的区别在于集群容错的时候,可以分别选择不同的Invoker,实际调用多次。甚至可以进行广播,收集所有invoker的结果并返回Qurom。