关于版本

依赖版本
springboot2.4.0
spring batch2.4.0

代码地址

因为每个例子涉及代码较多,且包含测试用例,如果都贴到文章中内容过多,所以只贴出了部分代码。全部的代码在这里: https://gitee.com/daifyutils/springboot-samples

此篇文章所属模块为:base-batch-2.4.0

目录地址

目录测试目录内容
dai.samples.batch.allowstartdai.samples.allow测试任务可以重复执行
dai.samples.batch.basedai.samples.base基础任务配置
dai.samples.batch.skipdai.samples.skip跳过操作
dai.samples.batch.listenerdai.samples.listener任务监听器
dai.samples.batch.processdai.samples.process流程控制的代码
dai.samples.batch.adddai.samples.add任务流程切割
dai.samples.batch.retrydai.samples.retry任务重试
dai.samples.batch.rollbackdai.samples.rollback任务回滚
dai.samples.batch.rwdai.samples.rw数据的读取和输出

Spring Batch其他内容

Spring Boot 整合——Spring batch基本使用

Spring Boot 整合——Spring batch的监听器

Spring Boot 整合——Spring batch任务流程控制以及流程分割

Spring Boot 整合——Spring batch通过不同方式读取数据(ItemReader)

Spring Boot 整合——Spring batch通过不同方式输出数据(ItemWriter)

Spring Boot 整合——Spring batch重试和回滚

Spring batch

spring batch 是spring提供的一个批数据处理的框架。提供了大量信息的自动化和定时处理的操作。其是一个相对轻量级的批处理操作

batch涉及的核心概念

job

表示是Spring batch一次批处理的过程。其定义了批处理执行的具体逻辑,封装整个批处理过程的实体。

Step

批处理任务中的某一步骤,每个处理job由一个或者多个步骤组成。

ExecutionContext

批处理执行上下文,其能够将所需的参数在处理任务的过程中进行传递。

JobRepository

提供对处理任务的持久化操作。

JobLauncher

其主要是为处理任务job提供了一个启动器,同时在启动job的时候我们可以传递自定义的参数。

Item Reader

其可以给Step提供数据的输入,当数据读取结束后, 其会返回null来告知内容已经结束。

Item Writer

作为数据的输出,spring batch提供了一个chunk参数,任务一次会写入一个chunk的数据,而chunk的数量取决于任务启动时候的配置。

Item Processor

此内容复制在数据读入Step到写出之间对数据处理的逻辑。

step的流程

Reader
Processor
Writer

一个简单的Spring batch使用

引入依赖

除了常规的Spring boot依赖,只需要额外引入此内容

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-test</artifactId>
        <scope>test</scope>
    </dependency>

application

在使用内存作为job的持久化策略的时候可以无需配置任何额外参数,而如果希望Job信息被保存到数据库中的时候需要以下配置

spring:
  application:
    name: batch
  datasource:
    url: jdbc:mysql://localhost:3306/batch?charactorEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
    username: root
    password: root
    driver-class-name: com.mysql.jdbc.Driver
  jpa:
    hibernate:
      use-new-id-generator-mappings: true
      ddl-auto: none
      naming:
        physical-strategy: org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy
    properties: 
      hibernate:
        dialect: org.hibernate.dialect.MySQL5InnoDBDialect
        hbm2ddl:
          auto: update
          # 使用create-drop存在问题,jpa会执行一些外键SQL但是此时表不存在而报错
          # auto: create-drop
    show-sql: true
  batch:
    # 使用JDBC的时候为了避免不存在表结果需要配置此内容
    initialize-schema: always
server:
  port: 8000

上面配置中关于datasource的配置需要结合自己项目实际情况。而spring.batch.initialize-schema是因为Spring Batch需要将job相关内容保存到不同的数据表中,此时需要保证数据库中存在此内容,设置always来创建对应表结构

表名作用
batch_job_execution批处理任务处理的相关记录
batch_job_execution_context任务处理的上下文操作
batch_job_execution_params处理任务相关参数
batch_job_execution_seq任务执行的序号表
batch_job_instance处理任务实例
batch_job_seq任务处理的序号表
batch_step_execution任务步骤处理的相关记录
batch_step_execution_context任务步骤处理的上下文操作
batch_step_execution_seq任务步骤处理的序号表

java配置

需要在java中初始化下面组件:
transactionManager:事务管理器

JobRepository:任务持久化操作

JobBuilderFactory:任务创建工厂

StepBuilderFactory:任务步创建工厂

JobLauncher:任务启动器

@Configuration
public class BatchConfig extends DefaultBatchConfigurer {

    public static final String PATH = "target/test-outputs/";

    @Autowired
    private DataSource dataSource;


    /**
     * 初始化资源型事务控制
     * @return
     */
    @Bean
    PlatformTransactionManager transactionManager() {
        return new ResourcelessTransactionManager();
    }


    /**
     * 任务持久化 使用JDBC
     * @return
     * @throws Exception
     */
    @Bean
    protected JobRepository jobRepository() throws Exception {
        setDataSource(dataSource);
        JobRepository jobRepository = createJobRepository();
        return jobRepository;
    }
    
    /**
     * 任务持久化 使用内存的时候JDBC设置
     * @return
     * @throws Exception
     */
    /*@Override
    public void setDataSource(DataSource dataSource) {

    }*/
    
    /**
     * 任务持久化 使用内存
     * @return
     * @throws Exception
     */
    /*@Bean
    protected JobRepository jobRepository() throws Exception {
        MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
        factory.setTransactionManager(transactionManager());
        return factory.getObject();
    }*/

    @Bean
    JobBuilderFactory jobBuilderFactory() throws Exception {
        return new JobBuilderFactory(jobRepository());
    }

    @Bean
    StepBuilderFactory stepBuilderFactory() throws Exception {
        return new StepBuilderFactory(jobRepository(),transactionManager());
    }

    /**
     * 任务启动器
     * @return
     * @throws Exception
     */
    @Bean
    public JobLauncher jobLauncher() throws Exception {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository());
        return simpleJobLauncher;
    }

    @Bean
    JobExplorer jobExplorer() throws Exception {
        JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
        jobExplorerFactoryBean.setDataSource(dataSource);
        jobExplorerFactoryBean.afterPropertiesSet();
        return jobExplorerFactoryBean.getObject();
    }
    
    @Bean
    public ListableJobLocator listableJobLocator() {
        return new MapJobRegistry();
    }


    @Bean
    public SimpleJobOperator jobOperator(@Autowired ObjectProvider<JobParametersConverter> jobParametersConverter) {
        SimpleJobOperator factory = new SimpleJobOperator();
        factory.setJobExplorer(getJobExplorer());
        factory.setJobLauncher(getJobLauncher());
        factory.setJobRegistry(listableJobLocator());
        factory.setJobRepository(getJobRepository());
        jobParametersConverter.ifAvailable(factory::setJobParametersConverter);
        return factory;
    }


}

配置一个简单的处理任务

下面是一个简单的任务处理,一个简单的JOB包含了一个step以及从读-处理-写的流程。模拟读取目标文件夹下json文件然后进行处理后写出到指定目录。

// 创建任务的配置
@Slf4j
@Component
public class BaseJobConfig {

    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    StepBuilderFactory stepBuilderFactory;

    @Autowired
    PlatformTransactionManager transactionManager;

    /**
     * 任务
     * @param jobRepository
     * @return
     */
    @Bean("sampleJob")
    public Job sampleJob(JobRepository jobRepository) {
        return this.jobBuilderFactory.get("sampleJob")
                .repository(jobRepository)
                .start(sampleStep())
                .build();
    }

    /**
     * 步骤处理
     * @return
     */
    @Bean(value = "sampleStep")
    public Step sampleStep() {
        return this.stepBuilderFactory.get("sampleStep")
                .transactionManager(transactionManager)
                .<BatchEntity, BatchEntity>chunk(1)
                .reader(itemReader())
                .processor(getProcessor())
                .writer(itemWriter())
                .build();
    }


    /**
     * JSON写数据
     * 在target/test-outputs目录下写相关结果
     * @return
     */
    public ItemWriter<BatchEntity> itemWriter() {
        long time = System.currentTimeMillis();
        String name = "sampleJob-" + time;
        String patch = "target/test-outputs/" + name + ".json";
        return new JsonFileItemWriterBuilder<BatchEntity>()
                .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
                .resource(new FileSystemResource(patch))
                .name("batchJobWriter")
                .build();
    }

    /**
     * JSON读数据
     * 读取resource/data下的JSON文件
     * @return
     */
    public ItemReader<BatchEntity> itemReader() {
        return new JsonItemReaderBuilder<BatchEntity>()
                .jsonObjectReader(new JacksonJsonObjectReader<>(BatchEntity.class))
                .resource(new ClassPathResource("data/batchJob.json"))
                .name("batchJobReader")
                .build();
    }

    /**
     * 新建任务处理器
     * @return
     */
    public ItemProcessor<? super BatchEntity, ? extends BatchEntity> getProcessor() {
        return new ChangeNameProcessor();
    }


    /**
     * 任务处理器
     */
    class ChangeNameProcessor implements ItemProcessor<BatchEntity, BatchEntity> {

        @Override
        public BatchEntity process(BatchEntity person) {
            String fullName = "sampleJob-" + person.getFirstName() + " " +
                    person.getLastName();
            person.setFullName(fullName);
            log.info(fullName);
            return person;
        }
    }

}

BatchEntityJOSN内容解析的对象


@Data
@NoArgsConstructor
@AllArgsConstructor
public class BatchEntity {

    /**
     * 初始名称
     */
    private String firstName;

    private String lastName;
    /**
     * 初始年龄
     */
    private int age;

    private String fullName;

    private boolean isAdult;

    private String helloMessage;
}

测试用例

通过JobLauncherTestUtils的测试工具,就能测试上述内容是否正常

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {BatchApplication.class})
public class BaseJobConfigTests {

    @Autowired
    @Qualifier(value = "sampleJob")
    private Job sampleJob;

    @Autowired
    private JobLauncher jobLauncher;

    /**
     * 基础任务测试
     * @throws Exception
     */
    @Test
    public void testBaseJob() throws Exception {
        JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils();
        jobLauncherTestUtils.setJobLauncher(jobLauncher);
        jobLauncherTestUtils.setJob(sampleJob);

        File file = new File(BatchConfig.PATH);
        Assert.isTrue(file.exists(),"地址错误");
        Assert.isTrue(file.isDirectory(),"目录错误");
        File[] files = file.listFiles();
        Arrays.stream(files).forEach(item -> item.delete());
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();
        Assert.isTrue(BatchStatus.COMPLETED.equals(jobExecution.getStatus()),"返回状态失败");
        File[] newFiles = file.listFiles();
        Arrays.stream(newFiles).forEach(item -> System.out.println(item.getName()));
    }


}

设置忽略的异常

在进行任务处理的时候有些时候我们希望任务能够忽略一些异常或者是关注一些异常这里可以使用skipLimitskipnoSkip三个方法。

skipLimit 设置需要忽略异常的次数,当忽略的异常大于此数字任务依旧会返回FAILED的结果
skip 设置需要忽略的异常
noSkip 设置不能忽略的异常

java代码

使用上面大致的逻辑,在ChangeNameProcessor中添加产生异常的逻辑,使用下面的step执行任务

    /**
     * 遇见
     * RuntimeException异常进行跳过,但是错误超过5次任务执行中断
     * FileNotFoundException异常不进行跳过,直接中断任务
     * @return
     */
    @Bean("skipStep")
    public Step skipStep() {
        return this.stepBuilderFactory.get("skipStep")
                .<BatchEntity, BatchEntity>chunk(5)
                .reader(itemReader())
                .processor(getExceptionProcessor())
                .writer(itemWriter())
                .faultTolerant()
                .skipLimit(5)
                .skip(RuntimeException.class)
                .noSkip(FileNotFoundException.class)
                .build();
    }

测试

使用下面测试用例在异常没有超过规定的次数上限时任务依旧会成功

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {BatchApplication.class})
@Slf4j
public class SkipJobTests {

    @Autowired
    @Qualifier(value = "skipJob")
    private Job skipJob;

    @Autowired
    @Qualifier(value = "skipJob10")
    private Job skipJob10;

    @Autowired
    private JobLauncher jobLauncher;

    /**
     * 因为存在10条数据,没有达到限制数值,所以此时任务不会被
     * @throws Exception
     */
    @Test
    public void testAllowStartJob10() throws Exception {
        JobLauncherTestUtils jobLauncherTestUtils = new JobLauncherTestUtils();
        jobLauncherTestUtils.setJobLauncher(jobLauncher);
        jobLauncherTestUtils.setJob(skipJob10);

        File file = new File(BatchConfig.PATH);
        Assert.isTrue(file.exists(),"地址错误");
        Assert.isTrue(file.isDirectory(),"目录错误");
        File[] files = file.listFiles();
        Arrays.stream(files).forEach(item -> item.delete());
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();
        Assert.isTrue(BatchStatus.COMPLETED.equals(jobExecution.getStatus()),"返回状态失败");
        File[] newFiles = file.listFiles();
        Arrays.stream(newFiles).forEach(item -> System.out.println(item.getName()));
    }
}

设置任务重复执行以及执行次数

在上面的测试用例中使用的都是JobLauncherTestUtils方法,此方法是为了避免任务在JDBC中被设置为已完成而导致的任务无法重复执行,但是在生产中我们并不想拥有一个只能运行一次的任务,所以我们可以通过allowStartIfComplete来调整此配置

使用上面大致的逻辑,只需要修改Step内参数就可以实现任务的重新执行。

    /**
     * 设置任务可以重启,但是只能重启2次
     * @return
     */
    @Bean(value = "allowLimitStartStep")
    public Step allowLimitStartStep() {
        return this.stepBuilderFactory.get("allowLimitStartStep")
                .transactionManager(transactionManager)
                .<BatchEntity, BatchEntity>chunk(20)
                .reader(itemReader())
                .processor(getProcessor())
                .writer(itemWriter())
                .startLimit(2)
                .allowStartIfComplete(true)
                .build();
    }

此时需要注意startLimit用来控制任务可以重复执行但是不能超过这个值。如果超过会抛出下面异常。当然如果需要创建一个可以无限制的任务,只需要不设置startLimit即可

org.springframework.batch.core.StartLimitExceededException: Maximum start limit exceeded for step: allowLimitStartStepStartMax: 2

个人水平有限,上面的内容可能存在没有描述清楚或者错误的地方,因为每一个例子都提供了测试代码,一般来说不会有问题,但是因为这几篇内容断断续续用了一个半月可能会出现之后的代码影响了之前的例子,假如开发同学发现了,请及时告知,我会第一时间修改相关内容,也希望大家看在这个新春佳节只能宅到家中埋头苦逼的码代码的情况下,能给我点一个赞。你的点赞就是我前进的动力。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐