zl程序教程

您现在的位置是:首页 >  后端

当前栏目

通过源码理解Spring中@Scheduled的实现原理并且实现调度任务动态装载(下)

Spring源码原理 实现 理解 通过 动态 任务
2023-09-27 14:25:56 时间
最近的新项目和数据同步相关,有定时调度的需求。之前一直有使用过Quartz、XXL-Job、Easy Scheduler等调度框架,后来越发觉得这些框架太重量级了,于是想到了Spring内置的Scheduling模块。而原生的Scheduling模块只是内存态的调度模块,不支持任务的持久化或者配置(配置任务通过@Scheduled注解进行硬编码,不能抽离到类之外),因此考虑理解Scheduling模块的底层原理,并且基于此造一个简单的轮子,使之支持调度任务配置:通过配置文件或者JDBC数据源。
调度任务动态装载


Scheduling模块本身已经支持基于NamespaceHandler支持通过XML文件配置调度任务 但是笔者一直认为XML给人的感觉太 重 使用起来显得太笨重 这里打算扩展出JSON文件配置和基于JDBC数据源配置 也就是持久化任务 这里选用MySQL 。根据前文的源码分析 需要用到SchedulingConfigurer接口的实现 用于在所有调度任务触发之前从外部添加自定义的调度任务。先定义调度任务的一些配置属性类


// 调度任务类型枚举

 Getter

 RequiredArgsConstructor

public enum ScheduleTaskType {

 CRON( CRON ),

 FIXED_DELAY( FIXED_DELAY ),

 FIXED_RATE( FIXED_RATE ),

 private final String type;

// 调度任务配置,enable属性为全局开关

 Data

public class ScheduleTaskProperties {

 private Long version;

 private Boolean enable;

 private List ScheduleTasks tasks;

// 调度任务集合,笔者设计的时候采用一个宿主类中每个独立方法都是一个任务实例的模式

 Data

public class ScheduleTasks {

 // 这里故意叫Klass代表Class,避免关键字冲突

 private String taskHostKlass;

 private Boolean enable;

 private List ScheduleTaskMethod taskMethods;

// 调度任务方法 - enable为任务开关 没有配置会被ScheduleTaskProperties或者ScheduleTasks中的enable覆盖

 Data

public class ScheduleTaskMethod {

 private Boolean enable;

 private String taskDescription;

 private String taskMethod;

 // 时区,cron的计算需要用到

 private String timeZone;

 private String cronExpression;

 private String intervalMilliseconds;

 private String initialDelayMilliseconds;

复制代码


设计的时候 考虑到多个任务执行方法可以放在同一个宿主类 这样可以方便同一种类的任务进行统一管理 如


public class TaskHostClass {

 public void task1() {

 public void task2() {

 ......

 public void taskN() {

复制代码


细节方面 intervalMilliseconds和initialDelayMilliseconds的单位设计为毫秒 使用字符串形式 方便可以基于StringValueResolver解析配置文件中的属性配置。添加一个抽象的SchedulingConfigurer


 Slf4j

public abstract class AbstractSchedulingConfigurer implements SchedulingConfigurer, InitializingBean, BeanFactoryAware,

 EmbeddedValueResolverAware {

 Getter

 private StringValueResolver embeddedValueResolver;

 private ConfigurableBeanFactory configurableBeanFactory;

 private final List InternalTaskProperties internalTasks Lists.newLinkedList();

 private final Set String tasksLoaded Sets.newHashSet();

 Override

 public void setBeanFactory(BeanFactory beanFactory) throws BeansException {

 configurableBeanFactory (ConfigurableBeanFactory) beanFactory;

 Override

 public void afterPropertiesSet() throws Exception {

 internalTasks.clear();

 internalTasks.addAll(loadTaskProperties());

 Override

 public void setEmbeddedValueResolver(StringValueResolver resolver) {

 embeddedValueResolver resolver;

 Override

 public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {

 for (InternalTaskProperties task : internalTasks) {

 try {

 synchronized (tasksLoaded) {

 String key task.taskHostKlass() # task.taskMethod();

 // 避免重复加载

 if (!tasksLoaded.contains(key)) {

 if (task instanceof CronTaskProperties) {

 loadCronTask((CronTaskProperties) task, taskRegistrar);

 if (task instanceof FixedDelayTaskProperties) {

 loadFixedDelayTask((FixedDelayTaskProperties) task, taskRegistrar);

 if (task instanceof FixedRateTaskProperties) {

 loadFixedRateTask((FixedRateTaskProperties) task, taskRegistrar);

 tasksLoaded.add(key);

 } else {

 log.info( 调度任务已经装载,任务宿主类:{},任务执行方法:{} , task.taskHostKlass(), task.taskMethod());

 } catch (Exception e) {

 throw new IllegalStateException(String.format( 加载调度任务异常,任务宿主类:%s,任务执行方法:%s ,

 task.taskHostKlass(), task.taskMethod()), e);

 private ScheduledMethodRunnable loadScheduledMethodRunnable(String taskHostKlass, String taskMethod) throws Exception {

 Class ? klass ClassUtils.forName(taskHostKlass, null);

 Object target configurableBeanFactory.getBean(klass);

 Method method ReflectionUtils.findMethod(klass, taskMethod);

 if (null method) {

 throw new IllegalArgumentException(String.format( 找不到目标方法,任务宿主类:%s,任务执行方法:%s , taskHostKlass, taskMethod));

 Method invocableMethod AopUtils.selectInvocableMethod(method, target.getClass());

 return new ScheduledMethodRunnable(target, invocableMethod);

 private void loadCronTask(CronTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {

 ScheduledMethodRunnable runnable loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());

 String cronExpression embeddedValueResolver.resolveStringValue(pops.cronExpression());

 if (null ! cronExpression) {

 String timeZoneString embeddedValueResolver.resolveStringValue(pops.timeZone());

 TimeZone timeZone;

 if (null ! timeZoneString) {

 timeZone TimeZone.getTimeZone(timeZoneString);

 } else {

 timeZone TimeZone.getDefault();

 CronTask cronTask new CronTask(runnable, new CronTrigger(cronExpression, timeZone));

 taskRegistrar.addCronTask(cronTask);

 log.info( 装载CronTask[{}#{}()]成功,cron表达式:{},任务描述:{} , cronExpression, pops.taskMethod(),

 pops.cronExpression(), pops.taskDescription());

 private void loadFixedDelayTask(FixedDelayTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {

 ScheduledMethodRunnable runnable loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());

 long fixedDelayMilliseconds parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds()));

 long initialDelayMilliseconds parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds()));

 FixedDelayTask fixedDelayTask new FixedDelayTask(runnable, fixedDelayMilliseconds, initialDelayMilliseconds);

 taskRegistrar.addFixedDelayTask(fixedDelayTask);

 log.info( 装载FixedDelayTask[{}#{}()]成功,固定延迟间隔:{} ms,初始延迟执行时间:{} ms,任务描述:{} , pops.taskHostKlass(),

 pops.taskMethod(), fixedDelayMilliseconds, initialDelayMilliseconds, pops.taskDescription());

 private void loadFixedRateTask(FixedRateTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {

 ScheduledMethodRunnable runnable loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());

 long fixedRateMilliseconds parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds()));

 long initialDelayMilliseconds parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds()));

 FixedRateTask fixedRateTask new FixedRateTask(runnable, fixedRateMilliseconds, initialDelayMilliseconds);

 taskRegistrar.addFixedRateTask(fixedRateTask);

 log.info( 装载FixedRateTask[{}#{}()]成功,固定执行频率:{} ms,初始延迟执行时间:{} ms,任务描述:{} , pops.taskHostKlass(),

 pops.taskMethod(), fixedRateMilliseconds, initialDelayMilliseconds, pops.taskDescription());

 private long parseDelayAsLong(String value) {

 if (null value) {

 return 0L;

 if (value.length() 1 (isP(value.charAt(0)) || isP(value.charAt(1)))) {

 return Duration.parse(value).toMillis();

 return Long.parseLong(value);

 private boolean isP(char ch) {

 return (ch P || ch p 

 * 加载任务配置,预留给子类实现

 protected abstract List InternalTaskProperties loadTaskProperties() throws Exception;

 interface InternalTaskProperties {

 String taskHostKlass();

 String taskMethod();

 String taskDescription();

 Builder

 protected static class CronTaskProperties implements InternalTaskProperties {

 private String taskHostKlass;

 private String taskMethod;

 private String cronExpression;

 private String taskDescription;

 private String timeZone;

 Override

 public String taskDescription() {

 return taskDescription;

 public String cronExpression() {

 return cronExpression;

 public String timeZone() {

 return timeZone;

 Override

 public String taskHostKlass() {

 return taskHostKlass;

 Override

 public String taskMethod() {

 return taskMethod;

 Builder

 protected static class FixedDelayTaskProperties implements InternalTaskProperties {

 private String taskHostKlass;

 private String taskMethod;

 private String intervalMilliseconds;

 private String initialDelayMilliseconds;

 private String taskDescription;

 Override

 public String taskDescription() {

 return taskDescription;

 public String initialDelayMilliseconds() {

 return initialDelayMilliseconds;

 public String intervalMilliseconds() {

 return intervalMilliseconds;

 Override

 public String taskHostKlass() {

 return taskHostKlass;

 Override

 public String taskMethod() {

 return taskMethod;

 Builder

 protected static class FixedRateTaskProperties implements InternalTaskProperties {

 private String taskHostKlass;

 private String taskMethod;

 private String intervalMilliseconds;

 private String initialDelayMilliseconds;

 private String taskDescription;

 Override

 public String taskDescription() {

 return taskDescription;

 public String initialDelayMilliseconds() {

 return initialDelayMilliseconds;

 public String intervalMilliseconds() {

 return intervalMilliseconds;

 Override

 public String taskHostKlass() {

 return taskHostKlass;

 Override

 public String taskMethod() {

 return taskMethod;

复制代码


loadTaskProperties()方法用于加载任务配置 留给子类实现。


JSON配置


JSON配置文件的格式如下 类路径下的scheduling/tasks.json文件


{

 version : 1,

 tasks : [

 taskKlass : club.throwable.schedule.Tasks ,

 taskMethods : [

 taskType : FIXED_DELAY ,

 taskDescription : processTask1任务 ,

 taskMethod : processTask1 ,

 intervalMilliseconds : 5000 

复制代码


每个层级都有一个enable属性 默认为true 只有强制指定为false的时候才不会装载对应的任务调度方法。这里就是简单继承AbstractSchedulingConfigurer 实现从类路径加载配置的逻辑 定义JsonSchedulingConfigurer


public class JsonSchedulingConfigurer extends AbstractSchedulingConfigurer {

 // 这里把默认的任务配置JSON文件放在CLASSPATH下的scheduling/tasks.json,可以通过配置项scheduling.json.config.location进行覆盖

 Value( ${scheduling.json.config.location:scheduling/tasks.json} )

 private String location;

 Autowired

 private ObjectMapper objectMapper;

 Override

 protected List InternalTaskProperties loadTaskProperties() throws Exception {

 ClassPathResource resource new ClassPathResource(location);

 String content StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);

 ScheduleTaskProperties properties objectMapper.readValue(content, ScheduleTaskProperties.class);

 if (Boolean.FALSE.equals(properties.getEnable()) || null properties.getTasks()) {

 return Lists.newArrayList();

 List InternalTaskProperties target Lists.newArrayList();

 for (ScheduleTasks tasks : properties.getTasks()) {

 if (null ! tasks) {

 List ScheduleTaskMethod taskMethods tasks.getTaskMethods();

 if (null ! taskMethods) {

 for (ScheduleTaskMethod taskMethod : taskMethods) {

 if (!Boolean.FALSE.equals(taskMethod.getEnable())) {

 if (ScheduleTaskType.CRON taskMethod.getTaskType()) {

 target.add(CronTaskProperties.builder()

 .taskMethod(taskMethod.getTaskMethod())

 .cronExpression(taskMethod.getCronExpression())

 .timeZone(taskMethod.getTimeZone())

 .taskDescription(taskMethod.getTaskDescription())

 .taskHostKlass(tasks.getTaskKlass())

 .build());

 if (ScheduleTaskType.FIXED_DELAY taskMethod.getTaskType()) {

 target.add(FixedDelayTaskProperties.builder()

 .taskMethod(taskMethod.getTaskMethod())

 .intervalMilliseconds(taskMethod.getIntervalMilliseconds())

 .initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds())

 .taskDescription(taskMethod.getTaskDescription())

 .taskHostKlass(tasks.getTaskKlass())

 .build());

 if (ScheduleTaskType.FIXED_RATE taskMethod.getTaskType()) {

 target.add(FixedRateTaskProperties.builder()

 .taskMethod(taskMethod.getTaskMethod())

 .intervalMilliseconds(taskMethod.getIntervalMilliseconds())

 .initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds())

 .taskDescription(taskMethod.getTaskDescription())

 .taskHostKlass(tasks.getTaskKlass())

 .build());

 return target;

复制代码


添加一个配置类和任务类


 Configuration

public class SchedulingAutoConfiguration {

 Bean

 public JsonSchedulingConfigurer jsonSchedulingConfigurer(){

 return new JsonSchedulingConfigurer();

// club.throwable.schedule.Tasks

 Slf4j

 Component

public class Tasks {

 public void processTask1() {

 log.info( processTask1触发.......... 

复制代码


启动SpringBoot应用 某次执行的部分日志如下


2020-03-22 16:24:17.248 INFO 22836 --- [ main] c.t.s.AbstractSchedulingConfigurer : 装载FixedDelayTask[club.throwable.schedule.Tasks#processTask1()]成功,固定延迟间隔:5000 ms,初始延迟执行时间:0 ms,任务描述:processTask1任务

2020-03-22 16:24:22.275 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........

2020-03-22 16:24:27.277 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........

2020-03-22 16:24:32.279 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........

......

复制代码


这里有些细节没有完善 例如配置文件参数的一些非空判断、配置值是否合法等等校验逻辑没有做 如果要设计成一个工业级的类库 这些方面必须要考虑。


JDBC数据源配置


JDBC数据源这里用MySQL举例说明 先建一个调度任务配置表schedule_task


CREATE TABLE schedule_task 

 id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT 主键 ,

 edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 更新时间 ,

 create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 创建时间 ,

 editor VARCHAR(32) NOT NULL DEFAULT admin COMMENT 修改者 ,

 creator VARCHAR(32) NOT NULL DEFAULT admin COMMENT 创建者 ,

 deleted BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT 软删除标识 ,

 task_host_class VARCHAR(256) NOT NULL COMMENT 任务宿主类全类名 ,

 task_method VARCHAR(128) NOT NULL COMMENT 任务执行方法名 ,

 task_type VARCHAR(16) NOT NULL COMMENT 任务类型 ,

 task_description VARCHAR(64) NOT NULL COMMENT 任务描述 ,

 cron_expression VARCHAR(128) COMMENT cron表达式 ,

 time_zone VARCHAR(32) COMMENT 时区 ,

 interval_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT 执行间隔时间 ,

 initial_delay_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT 初始延迟执行时间 ,

 UNIQUE uniq_class_method (task_host_class, task_method)

) COMMENT 调度任务配置表 

复制代码


其实具体的做法和JSON配置差不多 先引入spring-boot-starter-jdbc 接着编写MysqlSchedulingConfigurer


// DAO

 RequiredArgsConstructor

public class MysqlScheduleTaskDao {

 private final JdbcTemplate jdbcTemplate;

 private static final ResultSetExtractor List ScheduleTask MULTI r - {

 List ScheduleTask tasks Lists.newArrayList();

 while (r.next()) {

 ScheduleTask task new ScheduleTask();

 tasks.add(task);

 task.setId(r.getLong( id 

 task.setCronExpression(r.getString( cron_expression 

 task.setInitialDelayMilliseconds(r.getLong( initial_delay_milliseconds 

 task.setIntervalMilliseconds(r.getLong( interval_milliseconds 

 task.setTimeZone(r.getString( time_zone 

 task.setTaskDescription(r.getString( task_description 

 task.setTaskHostClass(r.getString( task_host_class 

 task.setTaskMethod(r.getString( task_method 

 task.setTaskType(r.getString( task_type 

 return tasks;

 public List ScheduleTask selectAllTasks() {

 return jdbcTemplate.query( SELECT * FROM schedule_task WHERE deleted 0 , MULTI);

// MysqlSchedulingConfigurer

 RequiredArgsConstructor

public class MysqlSchedulingConfigurer extends AbstractSchedulingConfigurer {

 private final MysqlScheduleTaskDao mysqlScheduleTaskDao;

 Override

 protected List InternalTaskProperties loadTaskProperties() throws Exception {

 List InternalTaskProperties target Lists.newArrayList();

 List ScheduleTask tasks mysqlScheduleTaskDao.selectAllTasks();

 if (!tasks.isEmpty()) {

 for (ScheduleTask task : tasks) {

 ScheduleTaskType scheduleTaskType ScheduleTaskType.fromType(task.getTaskType());

 if (ScheduleTaskType.CRON scheduleTaskType) {

 target.add(CronTaskProperties.builder()

 .taskMethod(task.getTaskMethod())

 .cronExpression(task.getCronExpression())

 .timeZone(task.getTimeZone())

 .taskDescription(task.getTaskDescription())

 .taskHostKlass(task.getTaskHostClass())

 .build());

 if (ScheduleTaskType.FIXED_DELAY scheduleTaskType) {

 target.add(FixedDelayTaskProperties.builder()

 .taskMethod(task.getTaskMethod())

 .intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds()))

 .initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds()))

 .taskDescription(task.getTaskDescription())

 .taskHostKlass(task.getTaskHostClass())

 .build());

 if (ScheduleTaskType.FIXED_RATE scheduleTaskType) {

 target.add(FixedRateTaskProperties.builder()

 .taskMethod(task.getTaskMethod())

 .intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds()))

 .initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds()))

 .taskDescription(task.getTaskDescription())

 .taskHostKlass(task.getTaskHostClass())

 .build());

 return target;

复制代码


记得引入spring-boot-starter-jdbc和mysql-connector-java并且激活MysqlSchedulingConfigurer配置。插入一条记录


INSERT INTO schedule_task ( id , edit_time , create_time , editor , creator , deleted , task_host_class , task_method , task_type , task_description , cron_expression , time_zone , interval_milliseconds , initial_delay_milliseconds ) VALUES (1, 2020-03-30 23:46:10 , 2020-03-30 23:46:10 , admin , admin , 0, club.throwable.schedule.Tasks , processTask1 , FIXED_DELAY , 测试任务 , NULL, NULL, 10000, 5000);

复制代码


然后启动服务 某次执行的输出


2020-03-30 23:47:27.376 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........

2020-03-30 23:47:37.378 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1触发..........

复制代码


混合配置


有些时候我们希望可以JSON配置和JDBC数据源配置进行混合配置 或者动态二选一以便灵活应对多环境的场景 例如要在开发环境使用JSON配置而测试和生产环境使用JDBC数据源配置 甚至可以将JDBC数据源配置覆盖JSON配置 这样能保证总是倾向于使用JDBC数据源配置 这样需要对前面两小节的实现加多一层抽象。这里的设计可以参考SpringMVC中的控制器参数解析器的设计 具体是HandlerMethodArgumentResolverComposite 其实道理是相同的。


其他注意事项


在生产实践中 暂时不考虑生成任务执行日志和细粒度的监控 着重做了两件事

并发控制 多服务节点下 禁止任务并发执行。跟踪任务的日志轨迹。


解决并发执行问题


一般情况下 我们需要禁止任务并发执行 考虑引入Redisson提供的分布式锁


// 引入依赖

 dependency 

 groupId org.redisson /groupId 

 artifactId redisson /artifactId 

 version 最新版本 /version 

 /dependency 

// 配置类

 Configuration

 AutoConfigureAfter(RedisAutoConfiguration.class)

public class RedissonAutoConfiguration {

 Autowired

 private RedisProperties redisProperties;

 Bean(destroyMethod shutdown )

 public RedissonClient redissonClient() {

 Config config new Config();

 SingleServerConfig singleServerConfig config.useSingleServer();

 singleServerConfig.setAddress(String.format( redis://%s:%d , redisProperties.getHost(), redisProperties.getPort()));

 if (redisProperties.getDatabase() 0) {

 singleServerConfig.setDatabase(redisProperties.getDatabase());

 if (null ! redisProperties.getPassword()) {

 singleServerConfig.setPassword(redisProperties.getPassword());

 return Redisson.create(config);

// 分布式锁工厂

 Component

public class DistributedLockFactory {

 private static final String DISTRIBUTED_LOCK_PATH_PREFIX dl: 

 Autowired

 private RedissonClient redissonClient;

 public DistributedLock provideDistributedLock(String lockKey) {

 String lockPath DISTRIBUTED_LOCK_PATH_PREFIX lockKey;

 return new RedissonDistributedLock(redissonClient, lockPath);

复制代码


这里考虑到项目依赖了spring-boot-starter-redis 直接复用了它的配置属性类 RedissonDistributedLock是RLock的轻量级封装 见附录 。使用方式如下


 Autowired

private DistributedLockFactory distributedLockFactory;

public void task1() {

 DistributedLock lock distributedLockFactory.provideDistributedLock(lockKey);

 // 等待时间为20秒,持有锁的最大时间为60秒

 boolean tryLock lock.tryLock(20L, 60, TimeUnit.SECONDS);

 if (tryLock) {

 try {

 // 业务逻辑

 }finally {

 lock.unlock();

复制代码


引入MDC跟踪任务的Trace


MDC其实是Mapped Diagnostic Context的缩写 也就是映射诊断上下文 一般用于日志框架里面同一个线程执行过程的跟踪 例如一个线程跑过了多个方法 各个方法里面都打印了日志 那么通过MDC可以对整个调用链通过一个唯一标识关联起来 例如这里选用slf4j提供的org.slf4j.MDC


 Component

public class MappedDiagnosticContextAssistant {

 * 在MDC中执行

 * param runnable runnable

 public void processInMappedDiagnosticContext(Runnable runnable) {

 String uuid UUID.randomUUID().toString();

 MDC.put( TRACE_ID , uuid);

 try {

 runnable.run();

 } finally {

 MDC.remove( TRACE_ID 

复制代码


任务执行的时候需要包裹成一个Runnale实例


public void task1() {

 mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() - {

 StopWatch watch new StopWatch();

 watch.start();

 log.info( 开始执行...... 

 // 业务逻辑

 watch.stop();

 log.info( 执行完毕,耗时:{} ms...... , watch.getTotalTimeMillis());

复制代码


结合前面一节提到的并发控制 那么最终执行的任务方法如下


public void task1() {

 mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() - {

 StopWatch watch new StopWatch();

 watch.start();

 log.info( 开始执行...... 

 scheduleTaskAssistant.executeInDistributedLock( 任务分布式锁KEY , () - {

 // 真实的业务逻辑

 watch.stop();

 log.info( 执行完毕,耗时:{} ms...... , watch.getTotalTimeMillis());

复制代码


这里的方法看起来比较别扭 其实可以直接在任务装载的时候基于分布式锁和MDC进行封装 方式类似于ScheduledMethodRunnable 这里不做展开 因为要详细展开篇幅可能比较大 ScheduleTaskAssistant见附录 。


小结


其实spring-context整个调度模块完全依赖于TaskScheduler实现 更底层的是JUC调度线程池ScheduledThreadPoolExecutor。如果想要从底层原理理解整个调度模块的运行原理 那么就一定要分析ScheduledThreadPoolExecutor的实现。整篇文章大致介绍了spring-context调度模块的加载调度任务的流程 并且基于扩展接口SchedulingConfigurer扩展出多种自定义配置调度任务的方式 但是考虑到需要在生产环境中运行 那么免不了需要考虑监控、并发控制、日志跟踪等等的功能 但是这样就会使得整个调度模块变重 慢慢地就会发现 这个轮子越造越大 越有主流调度框架Quartz或者Easy Scheduler的影子。笔者认为 软件工程 有些时候要权衡取舍 该抛弃的就应该果断抛弃 否则总是负重而行 还能走多远


参考资料

SpringBoot源码


附录


ScheduleTaskAssistant


 RequiredArgsConstructor

 Component

public class ScheduleTaskAssistant {

 * 5秒

 public static final long DEFAULT_WAIT_TIME 5L;

 * 30秒

 public static final long DEFAULT_LEAVE_TIME 30L;

 private final DistributedLockFactory distributedLockFactory;

 * 在分布式锁中执行

 * param waitTime 锁等着时间

 * param leaveTime 锁持有时间

 * param timeUnit 时间单位

 * param lockKey 锁的key

 * param task 任务对象

 public void executeInDistributedLock(long waitTime, long leaveTime, TimeUnit timeUnit, String lockKey, Runnable task) {

 DistributedLock lock distributedLockFactory.dl(lockKey);

 boolean tryLock lock.tryLock(waitTime, leaveTime, timeUnit);

 if (tryLock) {

 try {

 long waitTimeMillis timeUnit.toMillis(waitTime);

 long start System.currentTimeMillis();

 task.run();

 long end System.currentTimeMillis();

 long cost end - start;

 // 预防锁过早释放

 if (cost waitTimeMillis) {

 Sleeper.X.sleep(waitTimeMillis - cost);

 } finally {

 lock.unlock();

 * 在分布式锁中执行 - 使用默认时间

 * param lockKey 锁的key

 * param task 任务对象

 public void executeInDistributedLock(String lockKey, Runnable task) {

 executeInDistributedLock(DEFAULT_WAIT_TIME, DEFAULT_LEAVE_TIME, TimeUnit.SECONDS, lockKey, task);

复制代码


RedissonDistributedLock


 Slf4j

public class RedissonDistributedLock implements DistributedLock {

 private final RedissonClient redissonClient;

 private final String lockPath;

 private final RLock internalLock;

 RedissonDistributedLock(RedissonClient redissonClient, String lockPath) {

 this.redissonClient redissonClient;

 this.lockPath lockPath;

 this.internalLock initInternalLock();

 private RLock initInternalLock() {

 return redissonClient.getLock(lockPath);

 Override

 public boolean isLock() {

 return internalLock.isLocked();

 Override

 public boolean isHeldByCurrentThread() {

 return internalLock.isHeldByCurrentThread();

 Override

 public void lock(long leaseTime, TimeUnit unit) {

 internalLock.lock(leaseTime, unit);

 Override

 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {

 try {

 return internalLock.tryLock(waitTime, leaseTime, unit);

 } catch (InterruptedException e) {

 Thread.currentThread().interrupt();

 throw new IllegalStateException(String.format( Acquire lock fail by thread interrupted,path:%s , lockPath), e);

 Override

 public void unlock() {

 try {

 internalLock.unlock();

 } catch (IllegalMonitorStateException ex) {

 log.warn( Unlock path:{} error for thread status change in concurrency , lockPath, ex);

}



两种方式实现Spring 业务验证 验证在任何时候都非常关键。考虑将数据验证作为业务逻辑开发有利也有弊,Spring 认为,验证不应该只在Web 端进行处理,在服务端也要进行相应的处理,可以防止脏数据存入数据库中,从而避免为运维同学和测试同学造成更大的困扰,因为数据造成的bug会更加难以发现,而且开发人员关注点也不会放在数据本身的问题上,所以做服务端的验证也是非常有必要的。考虑到上面这些问题,Spring 提供了两种主要类型的验证:
阿里特邀专家徐雷Java Spring Boot开发实战系列课程(第18讲):制作Java Docker镜像与推送到DockerHub和阿里云Docker仓库 立即下载