zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Elastic-Job2.1.5源码-图解分片算法动画

图解动画算法源码 分片 Elastic Job2.1
2023-06-13 09:15:54 时间

大家好,本文给大家介绍一下Elastic-Job 中作业分片算法和分片之后使用Zookeeper事务来提交分片节点

图解分片算法动画

文 | 宋小生

7.4.6 分片算法

先来回顾下分片的过程分为哪几步,先来看下调用代码:

JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));

这里虽然只有两行代码,其实可以分为3步:

  • 使用策略模式+反射来获取分片策略对象。
  • 使用分片算法进行分片。
  • 使用Zookeeper事务提交分片。

前面文章我们已经介绍了如何获取分片策略对象,这里我们来看使用分片策略对象进行分片的过程,调用代码如下;

jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount))

先来看下参数说明:

  • availableJobInstances:当前可用作业实例。
  • jobName:作业名称。
  • shardingTotalCount:当前作业配置的分片总数。

对于分片算法可以用如下类图包含:

图7.4 分片算法继承关系

JobShardingStrategyFactory工厂类型通过策略模式+反射创建具体的分片策略对象,同Java的SPI机制类似通过反射的形式加载类型创建对象,有效的解决了分片策略扩展问题,在这里我们只需要在运行时通过修改作业的分片策略类型的全路径,配置类型中的jobShardingStrategyClass配置,就可以实现动态的调整分片策略来达到作业分片执行的均衡,目前系统一共提供来3个策略类型:

  • AverageAllocationJobShardingStrategy: 基于平均分配算法的分片策略,也是默认的分片策略 。
  • OdevitySortByNameJobShardingStrategy :作业名的哈希值奇偶数决定IP升降序算法的分片策略。
  • RotateServerByNameJobShardingStrategy:作业名的哈希值对服务器列表进行轮转的分片策略。

接下来我们就来通过源码详细看下每种分片算法是如何进行分片的:

7.4.6.1 AverageAllocationJobShardingStrategy平均分配分片算法

全路径:

com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy

策略说明:

基于平均分配算法的分片策略,也是默认的分片策略。

如果分片不能整除,则不能整除的多余分片将依次追加到序号小的服务器。如:

如果有3台服务器,分成9片,则每台服务器分到的分片是:

服务器1=[0,1,2], 服务器2=[3,4,5], 服务器3=[6,7,8]

如果有3台服务器,分成8片,则每台服务器分到的分片是:

服务器1=[0,1,6], 服务器2=[2,3,7], 服务器3=[4,5]

如果有3台服务器,分成10片,则每台服务器分到的分片是:

服务器1=[0,1,2,9], 服务器2=[3,4,5], 服务器3=[6,7,8]

可以参考如下动画视频:

http://mpvideo.qpic.cn/0bf2cuaaoaaa2qaojdjj7rqvafoda4kqabya.f10002.mp4?dis_k=99a35507f7c4867c81e7d58db08e7e12&dis_t=1671014344&vid=wxv_2022375316224081921&format_id=10002&support_redirect=0&mmversion=false

具体代码如下:

public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
    
    @Override
    public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
        if (jobInstances.isEmpty()) {
            return Collections.emptyMap();
        }
        Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount);
        addAliquant(jobInstances, shardingTotalCount, result);
        return result;
    }
/**
*加入3台机器 分8片 这个方法是对平均数量的分配,不包含余数那部分
*/
    private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
        Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);
        //计算出平均每个作业实例应该获取到的分片数量    2=8/3  这个计算结果不会包含余数,在这里余数将得不到分配,这个例子中会有两个分片得不到分配
        int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
          int count = 0;
        //遍历作业实例为每个作业实例分配对应分片,这个循环过程中只会分配平均应得分片数量,余数对应的分片后面会有方法处理
        for (JobInstance each : shardingUnits) {
            List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                shardingItems.add(i);
            }
            result.put(each, shardingItems);
            count++;
        }
        return result;
    }
   //这里是对余数分配,余数小于实例总数,遍历实例逐个取余数
    private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
        int aliquant = shardingTotalCount % shardingUnits.size();
        int count = 0;
        for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
            if (count < aliquant) {
                entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
            }
            count++;
        }
    }
}

上面我们看了平均分配策略的实现过程分为两部分:

  • 第一部分对可以整除的平均数量进行分配。
  • 第二部分是对余数进行再分配。

如果分片数量与实例数量是整倍数的关系就可以实现平均每台实例获取的分片数量一样,如果是不能整除就会导致前面的机器实例获取到更多的分片导致负载压力过大(前面获取可用机器实例的时候会对IP进行降序排序)。

7.4.6.2 OdevitySortByNameJobShardingStrategy 作业名的哈希值奇偶数决定IP升降序算法

配置全路径:

com.dangdang.ddframe.job.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy

策略说明:

根据作业名的哈希值奇偶数决定IP升降序算法的分片策略。

作业名的哈希值为奇数则IP升序. 作业名的哈希值为偶数则IP降序. 用于不同的作业平均分配负载至不同的服务器。

如果有3台服务器, 分成2片, 作业名称的哈希值为奇数, 则每台服务器分到的分片是:

服务器1=[0], 服务器2=[1], 服务器3=[]。

如果有3台服务器, 分成2片, 作业名称的哈希值为偶数, 则每台服务器分到的分片是:

服务器3=[0], 服务器2=[1], 服务器1=[]。

可以参考如下动画视频:

http://mpvideo.qpic.cn/0b78z4aacaaatmaoimrj5zqvbt6dahhqaaia.f10002.mp4?dis_k=05f83edf935ba79237c79889e7848022&dis_t=1671014344&vid=wxv_2022382083179937794&format_id=10002&support_redirect=0&mmversion=false

具体代码如下:


/**
* 根据作业名的哈希值奇偶数决定IP升降序算法的分片策略.
* 
* <p>
* 作业名的哈希值为奇数则IP升序.
* 作业名的哈希值为偶数则IP降序.
* 用于不同的作业平均分配负载至不同的服务器.
* 如: 
* 1. 如果有3台服务器, 分成2片, 作业名称的哈希值为奇数, 则每台服务器分到的分片是: 1=[0], 2=[1], 3=[].
* 2. 如果有3台服务器, 分成2片, 作业名称的哈希值为偶数, 则每台服务器分到的分片是: 3=[0], 2=[1], 1=[].
* </p>
* 
* @author zhangliang
*/
public final class OdevitySortByNameJobShardingStrategy implements JobShardingStrategy {
    
    private AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
    
    @Override
    public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
        long jobNameHash = jobName.hashCode();
        if (0 == jobNameHash % 2) { 
           //如果计算得到作业名字hash值是偶数则将作业实例顺序反转
            Collections.reverse(jobInstances);
        }
    //处理过顺序的作业实例列表进行平均分配
        return averageAllocationJobShardingStrategy.sharding(jobInstances, jobName, shardingTotalCount);
    }
}

哈希奇偶数其实也是一种平均分片策略,唯一与平均分片策略不同的是在平均分片之前先计算作业的Hash值,如果Hash值是奇数则按平均分片策略进行分配,如果Hash值是偶数则反转一下实例列表反向来进行平均分配,这种分配分配的可能出现的问题如下:

如果单个进程内所有作业Hash值各位奇偶,分片总数小于实例数量一半,就会导致实例列表两边的实例分配更多的分片,负载过高。

如果单个进程内所有作业Hash值各位奇偶,分片总数大于实例数量的一半,就会导致列表中间的实例负载过高。

如果Hash值计算结果都为奇偶就会导致出现与平均分片分配一样的问题。

7.4.6.3 RotateServerByNameJobShardingStrategy 作业名的哈希值对服务器列表进行轮转的分片策略

配置全路径:

com.dangdang.ddframe.job.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy

策略说明:

根据作业名计算hash值然后hash值对实例数量取模获取到偏移量,然后从这个偏移量位置开始对作业进行平均分片:

如果有3台服务器,计算hash值为1 分成9片,则每台服务器分到的分片是:

服务器1=[1,2,3], 服务器2=[4,5,6], 服务器3=[7,8,0]。

如果有3台服务器,hash值为2 分成9片,则每台服务器分到的分片是:

服务器1=[2,3,4], 服务器2=[5,6,7], 服务器3=[8,0,1]。

可以参考如下动画视频:

http://mpvideo.qpic.cn/0bf2uiaamaaaeeaog5bj7rqvbiwda2raabqa.f10002.mp4?dis_k=c6f6157925c9dd63e653a7685ee0f9ff&dis_t=1671014344&vid=wxv_2022382872715722754&format_id=10002&support_redirect=0&mmversion=false

具体代码如下:


/**
* 根据作业名的哈希值对服务器列表进行轮转的分片策略.
* 
* @author weishubin
*/
public final class RotateServerByNameJobShardingStrategy implements JobShardingStrategy {
    
    private AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
    
    @Override
    public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
        return averageAllocationJobShardingStrategy.sharding(rotateServerList(jobInstances, jobName), jobName, shardingTotalCount);
    }
   //对作业实例根据hash反转
    private List<JobInstance> rotateServerList(final List<JobInstance> shardingUnits, final String jobName) {
        int shardingUnitsSize = shardingUnits.size();
        //取余数 计算一个反转偏移量
        int offset = Math.abs(jobName.hashCode()) % shardingUnitsSize;
        //没有余数则可以均分
        if (0 == offset) {
            return shardingUnits;
        }
        //通过偏移量 将元素重新插入集合 如果便宜量为1则每个元素向前移动1个 最前面的元素向尾部移动
        List<JobInstance> result = new ArrayList<>(shardingUnitsSize);
        for (int i = 0; i < shardingUnitsSize; i++) {
            
            int index = (i + offset) % shardingUnitsSize;
            result.add(shardingUnits.get(index));
        }
        return result;
    }

哈希值轮转分片也是一种特殊的平均分配分片策略,哈希值轮转分片先计算作业名字的哈希值然后对可用实例数量进行取模运算得到一个偏移量,与平均分配分片不同的是平均分片的起始位置是从这个取模位置开始的,这种分配策略可能出现的问题如下:

如果单个实例上所有作业哈希值对服务器实例数量取模之后比较分散的话就可以相对均衡一点,但是如果哈希值计算的比较一致,也会出现与平均分配分片一样的问题。

以上就是分片算法执行过程,分片算法的执行其实就是将一次执行的任务分成多个分片来执行,每个拿到分片项的机器都可以执行这个作业。

最终我们可以得到一个作业实例对应多个分配列表的的映射关系:Map<JobInstance, List<Integer>> 作业实例映射分片列表接下来就是要看下如何将分片写入Zookeeper,想一下现在有多个分片写入假如在写的过程中发生了异常,异常之后的数据将无法存储到Zookeeper而异常之前的数据写入到了Zookeeper,这就导致了本次过程丢失分片,如果作业在这种场景下执行了,并且我们作业需要足够的分片项来执行作业则就会出现缺失执行次数的情况。那该如何规避这种情况呢,我们只要能保证所有要么全部成功,要么全部失败就可以,这样就需要引入事物,接下来可以看下如果使用Zookeeper进行事务操作。

7.4.6.4 使用Zookeeper事务来提交多个请求

接下来我们看下Curator怎么来操作事物,ZooKeeper 从3.4.0开始提供了multiop的特性,支持原子性地执行多个操作,基于multiop封装了事务对象 Transaction,使用形式更加灵活方便 ,Curator使用CuratorTransactionFinal来提交事物,使用TransactionExecutionCallback回调执行类型类操作事物内容。


/**
* 在事务中执行操作.
* 
* @param callback 执行操作的回调
*/
public void executeInTransaction(final TransactionExecutionCallback callback) {
    try {
        CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();
        callback.execute(curatorTransactionFinal);
        curatorTransactionFinal.commit();
    //CHECKSTYLE:OFF
    } catch (final Exception ex) {
    //CHECKSTYLE:ON
        RegExceptionHandler.handleException(ex);
    }
}



class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback {
    
    private final Map<JobInstance, List<Integer>> shardingResults;
    
    @Override
    public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception {
//这里我们遍历实例分片列表
        for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
    //写入分片节点值就是当前的key(作业实例)的id,对应节点/作业名字/sharding/分片项变量/instance 
            for (int shardingItem : entry.getValue()) {
                curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()).and();
            }
        }
        //所有写入操作处理完毕开始删除 需要分片标示和分片中的标示
curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and();
        curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();
    }
}

事务的操作一共分为3步:

  • 获取Curator的事务操作对象CuratorTransactionFinal。
  • 使用CuratorTransactionFinal创建事务执行操作。
  • 提交事务。

事务最终的操作写入了分片节点到Zookeeper上,同时删除分片处理中的状态保证一致性,我们可以看下在Zookeeper上分片节点的是怎么样的:

图7.8 分片节点

在Zookeeper上会在{jobName}/sharding节点下创建分片节点,而分片节点下创建子节点instance来存储当前分片所属实例,如果想要查看当前作业分片结果,可以用Zookeeper客户端工具获取对应节点即可。

- END -