Elastic-Job2.1.5源码-图解错过作业重新触发执行功能
大家好,本文给大家介绍一下Elastic-Job 中错过作业重新触发的概念,配置与原理
错过作业重新触发执行功能
文 | 宋小生
7.5 错过重触发功能
7.5.1 错过执行作业概念
错过作业重触发是什么意思呢:
要弄清楚作业的misfire,首先需要了解几个重要的概念:
触发器超时
举个例子说明这个概念。比如调度引擎中有5个工作线程,然后在某天的下午2点 有6个任务需要执行,那么由于调度引擎中只有5个工作线程,所以在2点的时候会有5个任务会按照之前设定的时间正常执行,有1个任务因为没有线程资源而被延迟执行,这个就叫触发器超时。下面这些情况会造成触发器超时:
- 系统因为某些原因被重启。在系统关闭到重新启动之间的一段时间里,可能有些任务会被 misfire。
- Trigger 被暂停(suspend)的一段时间里,有些任务可能会被 misfire。
- 线程池中所有线程都被占用,导致任务无法被触发执行,造成 misfire。
- 有状态任务在下次触发时间到达时,上次执行还没有结束。
接下来我们可以看下几种执行的场景:
正常执行
举例来说,现在有个作业每个小时执行一次,在12:00,13:00,14:00:
执行时长都在1个小时之内,则每个时间点都可以正常执行,示例图如下:
图 7.5.1 正常执行作业
错过执行作业
12:00的执行时长过长(可能是处理业务数据过大,也可能其他原因),执行了1个多小时,当未开启错过作业重新触发功能则在13:00的时候作业是无法执行将被错过,执行示例图如下:
图 7.5.2 错过执行作业
错过作业执行重新触发
12:00的执行时长过长(可能是处理业务数据过大,也可能其他原因),执行了1个多小时,当开启错过作业重新触发功能后在12:00执行之后为13:00错过的执行补偿一次执行,执行示例图如下:
图 7.5.3 错过执行重新触发作业
7.5.2 错过执行作业配置
在Quartz内部具有个属性为作业的misfire的阈值,单位是秒,
org.quartz.jobStore.misfireThreshold
回顾下我们初始化StdSchedulerFactory的时候将这个参数设置为了1S也就是说存在作业触发时间超过了这个时间则被认为是错过作业执行,有个参数org.quartz.threadPool.threadCount配置了工作线程数也就是同时并发执行作业数,如下代码:
private Properties getBaseQuartzProperties() {
Properties result = new Properties();
result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
//作业线程数设置为1无法并行执行作业,容易错过执行
result.put("org.quartz.threadPool.threadCount", "1");
result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
//错过作业重新触发时间配置
result.put("org.quartz.jobStore.misfireThreshold", "1");
result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
return result;
}
7.5.3 错过执行作业监听器写入
如果作业错过执行Quartz定时线程则会调用我们JobTriggerListener类中重写的triggerMisfired方法,回到triggerMisfired方法对于错过执行的作业我们会写入对应节点 sharding/当前实例对应分片项/misfire来标记哪个分片被错过执行。
作业错过执行监听器代码如下:
@RequiredArgsConstructor
public final class JobTriggerListener extends TriggerListenerSupport {
private final ExecutionService executionService;
private final ShardingService shardingService;
@Override
public String getName() {
return "JobTriggerListener";
}
/**
* 超过阈值,错过作业重新触发
*/
@Override
public void triggerMisfired(final Trigger trigger) {
if (null != trigger.getPreviousFireTime()) {
executionService.setMisfire(shardingService.getLocalShardingItems());
}
}
}
写入错过执行节点sharding/当前实例对应分片项/misfire,代码如下:
public void setMisfire(final Collection<Integer> items) {
for (int each : items) {
jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
}
}
7.5.4 作业运行时幂等导致的错过执行
接下来看AbstractElasticJobExecutor中是如何对幂等作业进行错过补偿的,这里如果幂等配置开启则会进行幂等控制,幂等配置开启后作业执行的时候发现存在了running节点则说明上次对应作业分片仍旧未执行结束,幂等需要保证上次作业分片执行结束时候本次才开始执行,不能在同一个分片上并行执行,这里判断了上次作业分片下如果存在running节点也就是还有作业分片在执行,那本次执行所对应作业分片则无法执行,这种情况是Elastic-Job对作业分片的幂等的处理,如果出现这样本次无法执行的作业分片也是要为当前分片设置错过作业重触发标记,设置完毕之后则返回。
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName,
shardingContexts.getShardingItemParameters().keySet()));
}
return;
}
ExecutionService类型中错过执行节点写入逻辑:
public boolean misfireIfHasRunningItems(final Collection<Integer> items) {
//当前作业正在运行
if (!hasRunningItems(items)) {
return false;
}
//写入错过执行标记
setMisfire(items);
return true;
}
7.5.5 错过作业重新补偿执行
在后面作业正常执行业务执行完毕之后,将会执行错过执行作业:
//如果当前分片项中存在错过执行作业
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
//删除错过执行节点
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
//执行作业
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
错过作业重触发,这个就是如果我们作业在执行的时候执行时间过长导致到了下次执行的时候,第一次作业执行还未结束,在这种单线程情况下,导致下次作业不能触发,容易错过执行,这里在作业第一次正常执行完毕之后查看是否有错过执行节点存在,如果存在错过执行节点则先清理错过执行节点然后立即补偿触发一次。
具体什么节点满足补偿执行呢:
- 作业开启了错过作业重触发配置,
- 当前分片项存在sharding/%s/misfire节点
- END -
相关文章
- 详解JAVA线程问题诊断工具Thread Dump
- 还在手动发早安吗?教你用java实现每日给女友微信发送早安
- Java开发如何通过IoT边缘ModuleSDK进行进程应用的开发?
- 解读Java内存模型中Happens-Before的8个原则
- Java7提供的Fork/Join框架实现高并发程序,你会使用吗?
- Java开发如何通过IoT边缘ModuleSDK进行协议转换?
- 0停机迁移Nacos?Java字节码技术来帮忙
- 视频 | ZYNQ开发板深度评测:高性能FPGA和双核ARM的强强联合!
- I²C协议官方标准文档2021最新版本下载
- 国产FPGA开发板上手体验:不足百元,集成ARM硬核处理器!
- 业内首发!感芯MC3172硬实时RISC-V芯片,还用啥RTOS!
- 从汇率转换通用解决方案到可复用设计思想
- 顶流选手专访 - 最具推广价值作品 - 背后故事
- 顶流选手专访 - 最佳展现创意作品 - 背后故事
- 【精华】顶流选手专访-最佳可视化展现作品(冠军)
- 冠军作品背后的故事会是什么呢
- IBM 人力资源综合分析案例
- 世界五百强财务高管数字化战群雄经典案例
- ZebraBI 6.0 发布,更高更快更强大
- DAX 引擎之父揭秘 DAX 引擎内部细节