zl程序教程

您现在的位置是:首页 >  Python

当前栏目

Spring Batch多步骤任务、并行执行、任务决策器、任务嵌套

2023-03-07 09:41:06 时间

文章目录

企业中经常会有需要批处理才能完成的业务操作,比如:自动化地处理大批量复杂的数据,如月结计算;重复性地处理大批量数据,如费率计算;充当内部系统和外部系统的数据纽带,中间需要对数据进行格式化,校验,转换处理等。

Spring Batch是一个轻量级但功能又十分全面的批处理框架,旨在支持开发对企业系统的日常运营至关重要的批处理应用程序。

Spring Batch 提供了在处理大量记录时必不可少的可重用功能,包括日志记录/跟踪、事务管理、作业处理统计、作业重新启动、跳过和资源管理。它还提供更先进的技术服务和功能,通过优化和分区技术实现极高容量和高性能的批处理作业。简单和复杂的大批量批处理作业都可以以高度可扩展的方式利用该框架来处理大量信息。

1、框架搭建

新建一个Spring Boot项目,版本为2.5.9

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
</dependencies>

在编写代码之前,我们先来简单了解下Spring Batch的组成:

Spring Batch里最基本的单元就是任务Job,一个Job由若干个步骤Step组成。任务启动器Job Launcher负责运行Job,任务存储仓库Job Repository存储着Job的执行状态,参数和日志等信息。Job处理任务又可以分为三大类:数据读取Item Reader、数据中间处理Item Processor和数据输出Item Writer

任务存储仓库可以是关系型数据库MySQL,非关系型数据库MongoDB或者直接存储在内存中,本篇使用的是MySQL作为任务存储仓库。

新建一个MySql数据库,导入 org.springframework.batch:spring-batch-core目录下的schema-mysql.sql文件

导入后,库表如下图所示:

然后在项目的配置文件application.yml里添加MySQL相关配置:

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3301/springbatch
    username: root
    password: 123456
  batch:
    jdbc:
      initialize-schema: always

接着在Spring Boot的入口类上添加@EnableBatchProcessing注解,表示开启Spring Batch批处理功能:

@SpringBootApplication
@EnableBatchProcessing //开启Spring Batch批处理功能
public class SpringBatchApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBatchApplication.class, args);
    }
}

至此,基本框架搭建好了,下面开始配置一个简单的任务。

2、编写第一个任务

新建job包,然后在该包下新建一个FirstJobDemo类,代码如下所示:

@Component
public class FirstJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job firstJob() {
        return jobBuilderFactory.get("firstJob")
                .start(step())
                .build();
    }

    private Step step() {
        return stepBuilderFactory.get("step")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("执行步骤....");
                    return RepeatStatus.FINISHED;
                }).build();
    }
}

上面代码中,我们注入了JobBuilderFactory任务创建工厂和StepBuilderFactory步骤创建工厂,分别用于创建任务Job和步骤Step

JobBuilderFactoryget方法用于创建一个指定名称的任务,start方法指定任务的开始步骤,步骤通过StepBuilderFactory构建。

步骤Step由若干个小任务Tasklet组成,所以我们通过tasklet方法创建。tasklet方法接收一个Tasklet类型参数,Tasklet是一个函数是接口,源码如下:

public interface Tasklet {
    @Nullable
    RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}

所以我们可以使用lambda表达式创建一个匿名实现:

(contribution, chunkContext) -> {
    System.out.println("执行步骤....");
    return RepeatStatus.FINISHED;
}

该匿名实现必须返回一个明确的执行状态,这里返回RepeatStatus.FINISHED表示该小任务执行成功,正常结束。

此外,需要注意的是,我们配置的任务Job必须注册到Spring IOC容器中,并且任务的名称和步骤的名称组成唯一。比如上面的例子,我们的任务名称为firstJob,步骤的名称为step,如果存在别的任务和步骤组合也叫这个名称的话,则会执行失败。

启动项目,控制台打印日志如下

可以看到,任务成功执行了,数据库的库表也将记录相关运行日志。

重新启动项目,控制台并不会再次打印出任务执行日志,因为Job名称和 Step名称组成唯一,执行完的不可重复的任务,不会再次执行。

3、多步骤任务

一个复杂的任务一般包含多个步骤,下面举个多步骤任务的例子。在job包下新建MultiStepJobDemo类:

@Component
public class MultiStepJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job multiStepJob() {
        return jobBuilderFactory.get("multiStepJob")
                .start(step1())
                .next(step2())
                .next(step3())
                .build();
    }

    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
}

上面代码中,我们通过step1()step2()step3()三个方法创建了三个步骤。Job里要使用这些步骤,只需要通过JobBuilderFactorystart方法指定第一个步骤,然后通过next方法不断地指定下一个步骤即可。

多个步骤在执行过程中也可以通过上一个步骤的执行状态来决定是否执行下一个步骤,修改上面的代码:

@Component
public class MultiStepJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job multiStepJob() {
        // todo 这里执行了前两步,待调试(可能是版本问题)
        return jobBuilderFactory.get("multiStepJob2")
                .start(step1())
                .on(ExitStatus.COMPLETED.getExitCode()).to(step2())
                .from(step2())
                .on(ExitStatus.COMPLETED.getExitCode()).to(step3())
                .from(step3()).end()
                .build();
    }

    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
}

multiStepJob()方法的含义是:multiStepJob2任务先执行step1,当step1状态为完成时,接着执行step2,当step2的状态为完成时,接着执行step3ExitStatus.COMPLETED常量表示任务顺利执行完毕,正常退出,该类还包含以下几种退出状态:

public class ExitStatus implements Serializable, Comparable<ExitStatus> {

	/**
	 * Convenient constant value representing unknown state - assumed not
	 * continuable.
	 */
	public static final ExitStatus UNKNOWN = new ExitStatus("UNKNOWN");

	/**
	 * Convenient constant value representing continuable state where processing
	 * is still taking place, so no further action is required. Used for
	 * asynchronous execution scenarios where the processing is happening in
	 * another thread or process and the caller is not required to wait for the
	 * result.
	 */
	public static final ExitStatus EXECUTING = new ExitStatus("EXECUTING");

	/**
	 * Convenient constant value representing finished processing.
	 */
	public static final ExitStatus COMPLETED = new ExitStatus("COMPLETED");

	/**
	 * Convenient constant value representing job that did no processing
	 * (e.g. because it was already complete).
	 */
	public static final ExitStatus NOOP = new ExitStatus("NOOP");

	/**
	 * Convenient constant value representing finished processing with an error.
	 */
	public static final ExitStatus FAILED = new ExitStatus("FAILED");

	/**
	 * Convenient constant value representing finished processing with
	 * interrupted status.
	 */
	public static final ExitStatus STOPPED = new ExitStatus("STOPPED");

    ...
}

4、Flow的用法

Flow的作用就是可以将多个步骤Step组合在一起然后再组装到任务Job中。举个Flow的例子,在job包下新建FlowJobDemo类:

@Component
public class FlowJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job flowJob() {
        return jobBuilderFactory.get("flowJob")
                .start(flow())
                .next(step3())
                .end()
                .build();
    }

    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    // 创建一个flow对象,包含若干个step
    private Flow flow() {
        return new FlowBuilder<Flow>("flow")
                .start(step1())
                .next(step2())
                .build();
    }
}

上面代码中,我们通过FlowBuilderstep1step2组合在一起,创建了一个名为flowFlow,然后再将其赋给任务Job。使用FlowStep构建Job的区别是,Job流程中包含Flow类型的时候需要在build()方法前调用end()方法。

5、并行执行

任务中的步骤除了可以串行执行(一个接着一个执行)外,还可以并行执行,并行执行在特定的业务需求下可以提供任务执行效率。

任务并行化只需两个简单步骤

1、将步骤Step转换为Flow; 2、任务Job中指定并行Flow

举个例子,在job包下新建SplitJobDemo类:

@Component
public class SplitJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job splitJob() {
        return jobBuilderFactory.get("splitJob")
                .start(flow1())
                .split(new SimpleAsyncTaskExecutor()).add(flow2())
                .end()
                .build();

    }

    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Flow flow1() {
        return new FlowBuilder<Flow>("flow1")
                .start(step1())
                .next(step2())
                .build();
    }

    private Flow flow2() {
        return new FlowBuilder<Flow>("flow2")
                .start(step3())
                .build();
    }
}

上面例子中,我们创建了两个Flowflow1(包含step1step2)和flow2(包含step3)。然后通过JobBuilderFactorysplit方法,指定一个异步执行器,将flow1flow2异步执行(也就是并行)。

注意: 开启并行化后,并行的步骤执行顺序并不能100%确定,因为线程调度具有不确定性。

6、任务决策器

决策器的作用就是可以指定程序在不同的情况下运行不同的任务流程,比如今天是周末,则让任务执行step1step2,如果是工作日,则之心step1step3

使用决策器前,我们需要自定义一个决策器的实现。新建decider包,然后创建MyDecider类,实现JobExecutionDecider接口:

@Component
public class MyDecider implements JobExecutionDecider {
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        LocalDate now = LocalDate.now();
        DayOfWeek dayOfWeek = now.getDayOfWeek();

        if (dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY) {
            // 今天是周末
            return new FlowExecutionStatus("weekend");
        } else {
            // 工作日
            return new FlowExecutionStatus("workingDay");
        }
    }
}

MyDecider实现JobExecutionDecider接口的decide方法,该方法返回FlowExecutionStatus。上面的逻辑是:判断今天是否是周末,如果是,返回FlowExecutionStatus("weekend")状态,否则返回FlowExecutionStatus("workingDay")状态。

下面演示如何在任务Job里使用决策器。在job包下新建DeciderJobDemo

@Component
public class DeciderJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private MyDecider myDecider;

    @Bean
    public Job deciderJob() {
        return jobBuilderFactory.get("deciderJob")
                .start(step1())
                .next(myDecider)
                .from(myDecider).on("weekend").to(step2())
                .from(myDecider).on("workingDay").to(step3())
                .from(step3()).on("*").to(step4())
                .end()
                .build();
    }

    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }


    private Step step4() {
        return stepBuilderFactory.get("step4")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤四操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }
}

上面代码中,我们注入了自定义决策器MyDecider,然后在jobDecider()方法里使用了该决策器:

@Bean
public Job deciderJob() {
    return jobBuilderFactory.get("deciderJob")
            .start(step1())
            .next(myDecider)
            .from(myDecider).on("weekend").to(step2())
            .from(myDecider).on("workingDay").to(step3())
            .from(step3()).on("*").to(step4())
            .end()
            .build();
}

这段代码的含义是:任务deciderJob首先执行step1,然后指定自定义决策器,如果决策器返回weekend,那么执行step2,如果决策器返回workingDay,那么执行step3。如果执行了step3,那么无论step3的结果是什么,都将执行step4

7、任务嵌套

任务Job除了可以由Step或者Flow构成外,我们还可以将多个任务Job转换为特殊的Step,然后再赋给另一个任务Job,这就是任务的嵌套。

举个例子,在job包下新建NestedJobDemo类:

@Component
public class NestedJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private JobRepository jobRepository;
    @Autowired
    private PlatformTransactionManager platformTransactionManager;

    // 父任务
    @Bean
    public Job parentJob() {
        return jobBuilderFactory.get("parentJob")
                .start(childJobOneStep())
                .next(childJobTwoStep())
                .build();
    }


    // 将任务转换为特殊的步骤
    private Step childJobOneStep() {
        return new JobStepBuilder(new StepBuilder("childJobOneStep"))
                .job(childJobOne())
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .build();
    }

    // 将任务转换为特殊的步骤
    private Step childJobTwoStep() {
        return new JobStepBuilder(new StepBuilder("childJobTwoStep"))
                .job(childJobTwo())
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .build();
    }

    // 子任务一
    private Job childJobOne() {
        return jobBuilderFactory.get("childJobOne")
                .start(
                    stepBuilderFactory.get("childJobOneStep")
                            .tasklet((stepContribution, chunkContext) -> {
                                System.out.println("子任务一执行步骤。。。");
                                return RepeatStatus.FINISHED;
                            }).build()
                ).build();
    }

    // 子任务二
    private Job childJobTwo() {
        return jobBuilderFactory.get("childJobTwo")
                .start(
                    stepBuilderFactory.get("childJobTwoStep")
                            .tasklet((stepContribution, chunkContext) -> {
                                System.out.println("子任务二执行步骤。。。");
                                return RepeatStatus.FINISHED;
                            }).build()
                ).build();
    }
}

上面代码中,我们通过childJobOne()childJobTwo()方法创建了两个任务Job,这里没什么好说的,前面都介绍过。关键在于childJobOneStep()方法和childJobTwoStep()方法。在childJobOneStep()方法中,我们通过JobStepBuilder构建了一个名称为childJobOneStepStep,顾名思义,它是一个任务型Step的构造工厂,可以将任务转换为“特殊”的步骤。在构建过程中,我们还需要传入任务执行器JobLauncher、任务仓库JobRepository和事务管理器PlatformTransactionManager

将任务转换为特殊的步骤后,将其赋给父任务parentJob即可,流程和前面介绍的一致。

配置好后,启动项目,控制台输出如下所示:

文章出处

完整代码已上传 Gitee

到此,本章内容就介绍完啦