zl程序教程

您现在的位置是:首页 >  工具

当前栏目

【项目实战】分布式定时任务框架XXL-JOB核心源码分析 - 作业调度和执行 (路由策略介绍)

2023-09-14 09:14:14 时间

一、 XXL-JOB的作业组

在XXL-JOB中,作业被组织成组,每个组可以有多个作业实例。
当提交作业时,它会根据作业组名称分配到特定的组中。

二、XXL-JOB的路由策略介绍

路由策略决定了组中哪个作业实例将执行该作业。

2.1 如何设置XXL-JOB的路由策略?

2.1.1 在XXL-JOB管理界面中设置

可以在XXL-JOB管理界面创建作业时设置路由策略

2.1.2 通过调用API在代码中设置

也可以通过调用XxlJobExecutor API在代码中设置路由策略。

要在代码中设置路由策略,可以使用XxlJobExecutor.setExecutorRouteStrategy方法。
例如,要将路由策略设置为“Last”,可以使用以下代码:

XxlJobExecutor.setExecutorRouteStrategy(ExecutorRouteStrategyEnum.LAST);

三、路由策略源码分析

调度中心触发任务之后,他的调用链如下
RemoteHttpJobBean > executeInternal > XxlJobTrigger > trigger ,

xxl-job路由策略主要发生在trigger这个方法中

public static void trigger(int jobId) {
    // 通过JobId从数据库中查询该任务的具体信息
    XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId);              // job info
    if (jobInfo == null) {
        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
        return;
    }
    // 获取该类型的执行器信息
    XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());  // group info

    // 匹配运行模式
    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
    // 匹配失败后的处理模式
    ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(), ExecutorFailStrategyEnum.FAIL_ALARM);    // fail strategy
    //  获取路由策略
    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
    // 获取该执行器的集群机器列表
    ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
    // 判断路由策略  是否为  分片广播模式
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum && CollectionUtils.isNotEmpty(addressList)) {
        for (int i = 0; i < addressList.size(); i++) {
            String address = addressList.get(i);
            //定义日志信息
            XxlJobLog jobLog = new XxlJobLog();
            // .....省略
            ReturnT<String> triggerResult = new ReturnT<String>(null);
            if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
                // 4.1、trigger-param
                TriggerParam triggerParam = new TriggerParam();
                triggerParam.setJobId(jobInfo.getId());
                triggerParam.setBroadcastIndex(i); // 设置分片标记
                triggerParam.setBroadcastIndex(addressList.size());// 设置分片总数
                // ......省略组装参数的过程
                // 根据参数以及 机器地址,向执行器发送执行信息 , 此处将会详细讲解runExecutor 这个方法
                triggerResult = runExecutor(triggerParam, address);
            }
            // 将日志ID,放入队列,便于日志监控线程来监控任务的执行状态
            JobFailMonitorHelper.monitor(jobLog.getId());
            logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
        }
    } else {
        //除分片广播模式外,其他的路由策略均走这里
        //定义日志信息
        XxlJobLog jobLog = new XxlJobLog();
        jobLog.setJobGroup(jobInfo.getJobGroup());
        // .....省略
        ReturnT<String> triggerResult = new ReturnT<String>(null);
        if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
            // 4.1、trigger-param
            TriggerParam triggerParam = new TriggerParam();
            triggerParam.setJobId(jobInfo.getId());
            triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
            triggerParam.setBroadcastIndex(0); // 默认分片标记为0
            triggerParam.setBroadcastTotal(1);  // 默认分片总数为1
            // .... 省略组装参数的过程
            // 此处使用了策略模式, 根据不同的策略 使用不同的实现类,下面将会详细讲解
            triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
        }
        JobFailMonitorHelper.monitor(jobLog.getId());
        logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
    }
}

上面的代码主要讲了分片广播这个策略的实现以及xxl-job的其他路由策略的调用位置在哪里。

四、ExecutorRouteStrategyEnum枚举类

ExecutorRouteStrategyEnum枚举类是xxl-job路由策略非常重要的一个类
在这里插入图片描述
该类通过枚举的方式,把路由key, 和策略实现类进行了一个聚合、

4.1 ExecutorRouteStrategyEnum

public enum ExecutorRouteStrategyEnum {
    FIRST("第一个", new ExecutorRouteFirst()),
    LAST("最后一个", new ExecutorRouteLast()),
    ROUND("轮循", new ExecutorRouteRound()),
    RANDOM("随机", new ExecutorRouteRandom()),
    CONSISTENT_HASH("一致性哈希", new ExecutorRouteConsistentHash()),
    LEAST_FREQUENTLY_USED("最不经常使用", new ExecutorRouteLFU()),
    LEAST_RECENTLY_USED("最近最久未使用", new ExecutorRouteLRU()),
    FAILOVER("故障转移", new ExecutorRouteFailover()),
    BUSYOVER("忙碌转移", new ExecutorRouteBusyover()),
    SHARDING_BROADCAST("分片广播", null);

    ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
        this.title = title;
        this.router = router;
    }
    private String title;
    private ExecutorRouter router;
    public String getTitle() {
        return title;
    }
    public ExecutorRouter getRouter() {
        return router;
    }
    // 数据库中存的是枚举的名称,此处通过名称的对比,找到路由策略对应的枚举信息
    public static ExecutorRouteStrategyEnum match(String name, ExecutorRouteStrategyEnum defaultItem){
        if (name != null) {
            for (ExecutorRouteStrategyEnum item: ExecutorRouteStrategyEnum.values()) {
                if (item.name().equals(name)) {
                    return item;
                }
            }
        }
        return defaultItem;
    }
}

4.2 XXL-JOB支持的路由策略

在这里插入图片描述

XXL-JOB支持的路由策略,分别是:

序号支持的路由策略策略说明
1First第一个,组中第一个可用的实例将执行作业。
2Last最后一个,组中最后一个可用的实例将执行作业。
3Round轮循,轮询选择一个可用的实例执行作业。
4Random随机,随机选择一个可用的实例执行作业。
5ConsistentHash一致性哈希,基于一致性哈希算法选择一个可用的实例执行作业。
6Lfu基于最不经常使用算法选择一个可用的实例执行作业。
7Lru基于最近最少使用(最近最久未使用)算法选择一个可用的实例执行作业。
8FAILOVER故障转移
9BUSYOVER忙碌转移
10SHARDING_BROADCAST分片广播,组中的所有实例都将执行作业。
11FirstByOrder按照实例注册顺序选择第一个可用的实例执行作业。
12LastByOrder按照实例注册顺序选择最后一个可用的实例执行作业。
13PollingByOrder按照实例注册顺序轮询选择一个可用的实例执行作业。

五、XXL-JOB支持的路由策略详解

5.1 第一个ExecutorRouteFirst

由上面对ExecutorRouteStrategyEnum的分析,该策略对应的是 这个ExecutorRouteFirst执行策略类。
主要看routeRun 这个方法

public String route(int jobId, ArrayList<String> addressList) {
    return addressList.get(0);
}
@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
    // 直接取集群地址列表里面的第一台机器来进行执行
    String address = route(triggerParam.getJobId(), addressList);
    // run executor
    ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
    // 将执行该任务的执行器地址,放入到结果里面返回,最后会记录到日志里面取
    runResult.setContent(address);
    return runResult;
}

5.2 最后一个ExecutorRouteLast

直接 从执行机集群列表的list里面取最后一个,源码如下

public String route(int jobId, ArrayList<String> addressList) {
    return addressList.get(addressList.size()-1);
}

@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
    // 通过看上面的route方法,可以看到直接取得是list最后一个数据
    String address = route(triggerParam.getJobId(), addressList);
    // run executor
    ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
    runResult.setContent(address);
    return runResult;
}

5.3 轮循ExecutorRouteRound

主要看ExecutorRouteRound这个类里面的代码

private static ConcurrentHashMap<Integer, Integer> routeCountEachJob = new ConcurrentHashMap<Integer, Integer>();
// 缓存过期时间戳
private static long CACHE_VALID_TIME = 0;
private static int count(int jobId) {
    // 如果当前的时间,大于缓存的时间,那么说明需要刷新了
    if (System.currentTimeMillis() > CACHE_VALID_TIME) {
        routeCountEachJob.clear();
        // 设置缓存时间戳,默认缓存一天,一天之后会从新开始
        CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
    }
    // count++
    Integer count = routeCountEachJob.get(jobId);
    // 当第一次执行轮循这个策略的时候,routeCountEachJob这个Map里面肯定是没有这个地址的, count==null ,
    // 当 count==null或者count大于100万的时候,系统会默认在100之间随机一个数字 , 放入hashMap, 然后返回该数字
    // 当系统第二次进来的时候,count!=null 并且小于100万, 那么把count加1 之后返回出去。 
    count = (count==null || count>1000000)?(new Random().nextInt(100)):++count;  // 初始化时主动Random一次,缓解首次压力
    // 为啥首次需要随机一次,而不是指定第一台呢?
    // 因为如果默认指定第一台的话,那么所有任务的首次加载全部会到第一台执行器上面去,这样会导致第一台机器刚开始的时候压力很大。
    routeCountEachJob.put(jobId, count);
    return count;
}
public String route(int jobId, ArrayList<String> addressList) {
    // 在执行器地址列表,获取相应的地址,  通过count(jobid) 这个方法来实现,主要逻辑在这个方法
    // 通过count(jobId)拿到数字之后, 通过求于的方式,拿到执行器地址
    // 例: count=2 , addresslist.size = 3
    // 2%3 = 2 ,  则拿list中下表为2的地址
    return addressList.get(count(jobId)%addressList.size());
}
@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
    // 通过route方法获取执行器地址
    String address = route(triggerParam.getJobId(), addressList);
    // run executor
    ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
    runResult.setContent(address);
    return runResult;
}

5.4 随机 ExecutorRouteRandom

随机这个策略比较简单,通过在集群列表的大小内随机拿出一台机器来执行,比较简单

private static Random localRandom = new Random();

public String route(int jobId, ArrayList<String> addressList) {
    // Collections.shuffle(addressList);
    return addressList.get(localRandom.nextInt(addressList.size()));
}
@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
    // address
    String address = route(triggerParam.getJobId(), addressList);
    // run executor
    ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
    runResult.setContent(address);
    return runResult;
}

5.5 一致性Hash ExecutorRouteConsistentHash

5.5.1 一致性Hash算法

一致性哈希算法是一种路由策略,它基于哈希值将作业分配给可用的作业实例。这种算法的优点是,当作业实例的数量发生变化时,只有一小部分作业需要重新分配,而不是所有作业。这可以减少系统的负载和网络流量。

先构造一个长度为2^32的整数环(这个环被称为一致性Hash环),根据节点名称的Hash值(其分布为[0, 2^32-1])将服务器节点放置在这个Hash环上,然后根据数据的Key值计算得到其Hash值(其分布也为[0, 2^32-1]),接着在Hash环上顺时针查找距离这个Key值的Hash值最近的服务器节点,完成Key到服务器的映射查找。

5.5.2 一致性哈希算法的核心实现

可以参考xxl-job-core模块中的com.xxl.job.core.util.ShardingUtil类。
该类中的consistentHash方法实现了一致性哈希算法的核心逻辑。
以下是该方法的代码

public static String consistentHash(String value, List<String> shards) {
    if (value == null || value.trim().length() == 0 || shards == null || shards.size() == 0) {
        return null;
    }
    int shardsCount = shards.size();
    if (shardsCount == 1) {
        return shards.get(0);
    }
    TreeMap<Long, String> nodes = new TreeMap<Long, String>();
    for (String shard : shards) {
        for (int i = 0; i < VIRTUAL_NODES; i++) {
            nodes.put(hash("SHARD-" + shard + "-NODE-" + i), shard);
        }
    }
    long hash = hash(value);
    SortedMap<Long, String> tailMap = nodes.tailMap(hash);
    if (tailMap.isEmpty()) {
        return nodes.get(nodes.firstKey());
    }
    return tailMap.get(tailMap.firstKey());
}

private static long hash(String key) {
    if (key == null || key.trim().length() == 0) {
        return 0;
    }
    byte[] digest = md5(key);
    return ((long) (digest[3] & 0xFF) << 24)
            | ((long) (digest[2] & 0xFF) << 16)
            | ((long) (digest[1] & 0xFF) << 8)
            | (digest[0] & 0xFF);
}

private static byte[] md5(String key) {
    MessageDigest md5;
    try {
        md5 = MessageDigest.getInstance("MD5");
    } catch (NoSuchAlgorithmException e) {
        throw new IllegalStateException("no md5 algorythm found");
    }
    md5.reset();
    md5.update(key.getBytes());
    return md5.digest();
}

该方法接受一个字符串和一个字符串列表作为参数,其中字符串列表表示可用的作业实例。该方法将字符串列表中的每个作业实例映射到一个或多个虚拟节点上,并将虚拟节点按照哈希值排序。

然后,该方法将输入字符串的哈希值与虚拟节点的哈希值进行比较,找到最近的虚拟节点,并将作业分配给该虚拟节点所对应的作业实例。

5.5.2 一致性Hash路由策略源码

分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;

这个地方使用的Hash方法是作者自己写的,因为String的hashCode可能重复,需要进一步扩大hashCode的取值范围

private static int VIRTUAL_NODE_NUM = 5;

/**
 * get hash code on 2^32 ring (md5散列的方式计算hash值)
 */
private static long hash(String key) {
    // md5 byte
   MessageDigest md5;
    try {
        md5 = MessageDigest.getInstance("MD5");
    } catch (NoSuchAlgorithmException e) {
        throw new RuntimeException("MD5 not supported", e);
    }
    md5.reset();
    byte[] keyBytes = null;
    try {
        keyBytes = key.getBytes("UTF-8");
    } catch (UnsupportedEncodingException e) {
        throw new RuntimeException("Unknown string :" + key, e);
    }
    md5.update(keyBytes);
    byte[] digest = md5.digest();
    // hash code, Truncate to 32-bits
    long hashCode = ((long) (digest[3] & 0xFF) << 24)
            | ((long) (digest[2] & 0xFF) << 16)
            | ((long) (digest[1] & 0xFF) << 8)
            | (digest[0] & 0xFF);
    long truncateHashCode = hashCode & 0xffffffffL;
    return truncateHashCode;
}

public String route(int jobId, ArrayList<String> addressList) {
    TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
    for (String address: addressList) {
        for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
            // 通过自定义的Hash方法,得到服务节点的Hash值,同时放入treeMap
            long addressHash = hash("SHARD-" + address + "-NODE-" + i);
            addressRing.put(addressHash, address);
        }
    }
    // 得到JobId的Hash值
    long jobHash = hash(String.valueOf(jobId));
    // 调用treeMap的tailMap方法,拿到map中键大于jobHash的值列表
    SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
    // 如果addressRing中有比jobHash的那么直接取lastRing 的第一个
    if (!lastRing.isEmpty()) {
        return lastRing.get(lastRing.firstKey());
    }
    // 如果没有,则直接取addresRing的第一个
    // 反正最终的效果是在Hash环上,顺时针拿离jobHash最近的一个值
    return addressRing.firstEntry().getValue();
}

@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
    // address
    String address = route(triggerParam.getJobId(), addressList);
    // run executor
    ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
    runResult.setContent(address);
    return runResult;
}

5.6 最不经常使用 ExecutorRouteLFU

单个JOB对应的每个执行器,使用频率最低的优先被选举

// 定义个静态的MAP, 用来存储任务ID对应的执行信息
private static ConcurrentHashMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>();
// 定义过期时间戳
private static long CACHE_VALID_TIME = 0;

public String route(int jobId, ArrayList<String> addressList) {
    // 如果当前系统时间大于过期时间
    if (System.currentTimeMillis() > CACHE_VALID_TIME) {
        jobLfuMap.clear(); //清空
       //重新设置过期时间,默认为一天
        CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
    }
    // 从MAP中获取执行信息
    //lfuItemMap中放的是执行器地址以及执行次数
    HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId);     // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList;
    if (lfuItemMap == null) {
        lfuItemMap = new HashMap<String, Integer>();
        jobLfuMap.put(jobId, lfuItemMap);
    }
    for (String address: addressList) {
        // map中不包含,并且值大于一万的时候,需要重新初始化执行器地址对应的执行次数
        // 初始化的规则是在机器地址列表size里面进行随机
        // 当运行一段时间后,有新机器加入的时候,此时,新机器初始化的执行次数较小,所以一开始,新机器的压力会比较大,后期慢慢趋于平衡
        if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {
            lfuItemMap.put(address, new Random().nextInt(addressList.size()));  // 初始化时主动Random一次,缓解首次压力
        }
    }

    // 将lfuItemMap中的key.value, 取出来,然后使用Comparator进行排序,value小的靠前。
    List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet());
    Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
        @Override
        public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
           return o1.getValue().compareTo(o2.getValue());
        }
    });
    //取第一个,也就是最小的一个,将address返回,同时对该address对应的值加1 。
    Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
    String minAddress = addressItem.getKey();
    addressItem.setValue(addressItem.getValue() + 1);
    return addressItem.getKey();
}

@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
    // address
    String address = route(triggerParam.getJobId(), addressList);
    // run executor
    ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
    runResult.setContent(address);
    return runResult;
}

5.7 最近最久未使用 ExecutorRouteLRU

单个JOB对应的每个执行器,最久为使用的优先被选举
此处使用的是LinkedHashMap来实现LRU算法的。通过LinkedHashMap的每次get/put的时候会进行排序,最新操作的数据会在最后面。 从而取第一个数据就代表是最久没有被使用的

// 定义个静态的MAP, 用来存储任务ID对应的执行信息
private static ConcurrentHashMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();

// 定义过期时间戳
private static long CACHE_VALID_TIME = 0;
public String route(int jobId, ArrayList<String> addressList) {
    // cache clear
    if (System.currentTimeMillis() > CACHE_VALID_TIME) {
        jobLRUMap.clear();
        //重新设置过期时间,默认为一天
        CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
    }
    // init lru
    LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);
    if (lruItem == null) {
        /**
         * LinkedHashMap
         *      a、accessOrder:ture=访问顺序排序(get/put时排序);false=插入顺序排期;
         *      b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;
         */
        lruItem = new LinkedHashMap<>(16, 0.75f, true);
        jobLRUMap.put(jobId, lruItem);
    }

    // 如果地址列表里面有地址不在map中,此处是可以再次放入,防止添加机器的问题
    for (String address: addressList) {
        if (!lruItem.containsKey(address)) {
            lruItem.put(address, address);
        }
    }
   // 取头部的一个元素,也就是最久操作过的数据
    String eldestKey = lruItem.entrySet().iterator().next().getKey();
    String eldestValue = lruItem.get(eldestKey);
    return eldestValue;

}

5.8 故障转移 ExecutorRouteFailover

这个策略比较简单,遍历集群地址列表,
如果失败,则继续调用下一台机器
如果成功,则跳出循环,返回成功信息

public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
    StringBuffer beatResultSB = new StringBuffer();
    //循环集群地址
    for (String address : addressList) {
        // beat
        ReturnT<String> beatResult = null;
        try {
            // 向执行器发送 执行beat信息  , 试探该机器是否可以正常工作
            ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
            beatResult = executorBiz.beat();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
        }
        // 拼接日志 , 收集日志信息,后期一起返回
        beatResultSB.append( (beatResultSB.length()>0)?"<br><br>":"")
                .append(I18nUtil.getString("jobconf_beat") + ":")
                .append("<br>address:").append(address)
                .append("<br>code:").append(beatResult.getCode())
                .append("<br>msg:").append(beatResult.getMsg());

        // 返回状态为成功
        if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
            // 执行任务
            ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
            beatResultSB.append("<br><br>").append(runResult.getMsg());
            // result
            runResult.setMsg(beatResultSB.toString());
            runResult.setContent(address);
            return runResult;
        }
    }
    return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());
}

5.9 忙碌转移 ExecutorRouteBusyover

这个策略更上面那个故障转移的原理一致,只不过不同的是,故障转移是判断机器是否存活, 二忙碌转移是想执行器发送消息判断该任务

对应的线程是否处于执行状态。

@Override
public ReturnT<String> routeRun(TriggerParam triggerParam, ArrayList<String> addressList) {
    StringBuffer idleBeatResultSB = new StringBuffer();
    // 循环集群地址
    for (String address : addressList) {
        // beat
        ReturnT<String> idleBeatResult = null;
        try {
            // 向执行服务器发送消息,判断当前jobId对应的线程是否忙碌,接下来可以看一下idleBeat这个方法
            ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
            idleBeatResult = executorBiz.idleBeat(triggerParam.getJobId());
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
        }
        idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"")
                .append(I18nUtil.getString("jobconf_idleBeat") + ":")
                .append("<br>address:").append(address)
                .append("<br>code:").append(idleBeatResult.getCode())
                .append("<br>msg:").append(idleBeatResult.getMsg());

        // 返回成功,代表这台执行服务器对应的线程处于空闲状态
        if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
            // 执行人呢无
            ReturnT<String> runResult = XxlJobTrigger.runExecutor(triggerParam, address);
            idleBeatResultSB.append("<br><br>").append(runResult.getMsg());
            // result
            runResult.setMsg(idleBeatResultSB.toString());
            runResult.setContent(address);
            return runResult;
        }
    }
    return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
}

看一下执行器那边的idleBeat代码实现
ExecutorBizImpl
@Override
public ReturnT<String> idleBeat(int jobId) {
    // isRunningOrHasQueue
    boolean isRunningOrHasQueue = false;
    // 从线程池里面获取当前任务对应的线程
    JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
    if (jobThread != null && jobThread.isRunningOrHasQueue()) {
        // 线程处于运行中
        isRunningOrHasQueue = true;
    }
     if (isRunningOrHasQueue) {
        // 线程运行中,则返回fasle
        return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue.");
    }
    // 线程空闲,返回success
    return ReturnT.SUCCESS;
}

5.10 分片广播 SHARDING_BROADCAST

当系统判断当前任务的路由策略是分片广播时, 就会遍历执行器的集群机器列表,
给每一台机器都发送执行消息,分片总数为集群机器数量,分片标记从0开始
上面的代码已经非常清楚了,此处不再赘述。

// 判断路由策略  是否为  分片广播模式
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum && CollectionUtils.isNotEmpty(addressList)) {
     for (int i = 0; i < addressList.size(); i++) {
         String address = addressList.get(i);
         //定义日志信息
         XxlJobLog jobLog = new XxlJobLog();
         // .....省略
         ReturnT<String> triggerResult = new ReturnT<String>(null);
         if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
             // 4.1、trigger-param
             TriggerParam triggerParam = new TriggerParam();
             triggerParam.setJobId(jobInfo.getId());
             triggerParam.setBroadcastIndex(i); // 设置分片标记
             triggerParam.setBroadcastIndex(addressList.size());// 设置分片总数
             // ......省略组装参数的过程
             // 根据参数以及 机器地址,向执行器发送执行信息 , 此处将会详细讲解runExecutor 这个方法
             triggerResult = runExecutor(triggerParam, address);
         }
         // 将日志ID,放入队列,便于日志监控线程来监控任务的执行状态
         JobFailMonitorHelper.monitor(jobLog.getId());
         logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
     }
 }else{
  // 除分片广播模式外,其他的路由策略均走这里
}