具体代码见: https://gitee.com/hongdu_dudua1/spring_quartz_mq/tree/master/batch_quartz

spring batch 主要是任务批处理的框架, 对每个任务分成三个步骤: 

1: reader

2: processor

3: writer

spring对quartz的集成主要在 spring-context-support这个包中

quartz的任务调度集成要点:

1: QuartzJobBean  

  这个是集成Quartz的job类: 使用springbatch进行集成的时候,在这个类的方法中:

protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
   //todo spring batch的job执行
}

上面这个部分涉及到很重要的一个类: 

SchedulerContext:是调度器 Scheduler的一个属性, 可以存储spring管理的bean, 因此我们在配置的时候可以将springbatch的任务启动器及任务名字还有job定位器JobLocator存放在这个调度器的上下文里面: 参考如下:
Scheduler scheduler = context.getScheduler();
SchedulerContext schedulerContext = scheduler.getContext();
String jobName = schedulerContext.getString("jobName");
JobLauncher jobLauncher = (JobLauncher) schedulerContext.get("jobLauncher");
JobLocator jobLocator = (JobLocator) schedulerContext.get("jobLocator");
System.out.println("jobName : " + jobName);
System.out.println("jobLauncher : " + jobLauncher);
System.out.println("jobLocator : " + jobLocator);
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String date = sf.format(new Date());
System.out.println("Current Time : " + date);
Job job = jobLocator.getJob(jobName);
if (job != null) {
    /*启动spring batch的批处理作业*/
    JobExecution jobExecution = jobLauncher.run(job,
            new JobParametersBuilder().addDate("date", new Date()).toJobParameters());
}

---------------------------------springbatch的配置:

1: job配置

  参考: 源码

2: springbatch的事务隔离级别配置

package com.batch.quartz.batch;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.configuration.BatchConfigurationException;
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.JobExplorerFactoryBean;
import org.springframework.batch.core.explore.support.MapJobExplorerFactoryBean;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.annotation.PostConstruct;
import javax.sql.DataSource;

/**
 * <h3>partitionjob</h3>
 * <p>myDefaultBatchConfigurer 重写 : 修改事务隔离级别</p>
 *
 * @author : hduong
 * @version : 1.0
 * @date : 2019-12-10 17:00
 **/
@Configuration
public class MyDefaultBatchConfigurer //extends DefaultBatchConfigurer
    implements BatchConfigurer {

    private static final Log logger = LogFactory.getLog(MyDefaultBatchConfigurer.class);

    private DataSource dataSource;
    private PlatformTransactionManager transactionManager;
    private JobRepository jobRepository;
    private JobLauncher jobLauncher;
    private JobExplorer jobExplorer;

    @Autowired(
            required = false
    )
    public void setDataSource(DataSource dataSource) {
        if(this.dataSource == null) {
            this.dataSource = dataSource;
        }

        if(this.transactionManager == null) {
            this.transactionManager = new DataSourceTransactionManager(this.dataSource);
        }

    }

    protected MyDefaultBatchConfigurer() {
    }

    public MyDefaultBatchConfigurer(DataSource dataSource) {
        this.setDataSource(dataSource);
    }

    @Override
    public JobRepository getJobRepository() {
        return this.jobRepository;
    }

    @Override
    public PlatformTransactionManager getTransactionManager() {
        return this.transactionManager;
    }

    @Override
    public JobLauncher getJobLauncher() {
        return this.jobLauncher;
    }

    @Override
    public JobExplorer getJobExplorer() {
        return this.jobExplorer;
    }

    @PostConstruct
    public void initialize() {
        try {
            if(this.dataSource == null) {
                logger.warn("No datasource was provided...using a Map based JobRepository");
                if(this.transactionManager == null) {
                    this.transactionManager = new ResourcelessTransactionManager();
                }

                MapJobRepositoryFactoryBean jobRepositoryFactory = new MapJobRepositoryFactoryBean(this.transactionManager);
                jobRepositoryFactory.afterPropertiesSet();
                this.jobRepository = jobRepositoryFactory.getObject();
                MapJobExplorerFactoryBean jobExplorerFactory = new MapJobExplorerFactoryBean(jobRepositoryFactory);
                jobExplorerFactory.afterPropertiesSet();
                this.jobExplorer = jobExplorerFactory.getObject();
            } else {
                this.jobRepository = this.createJobRepository();
                this.jobExplorer = this.createJobExplorer();
            }

            this.jobLauncher = this.createJobLauncher();
        } catch (Exception var3) {
            throw new BatchConfigurationException(var3);
        }
    }

    protected JobLauncher createJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(this.jobRepository);
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    protected JobRepository createJobRepository() throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(this.dataSource);
        factory.setTransactionManager(this.transactionManager);
        factory.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED");
        factory.setDatabaseType("mysql");
        factory.afterPropertiesSet();
        return factory.getObject();
    }

    protected JobExplorer createJobExplorer() throws Exception {
        JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
        jobExplorerFactoryBean.setDataSource(this.dataSource);
        jobExplorerFactoryBean.afterPropertiesSet();
        return jobExplorerFactoryBean.getObject();
    }
}
------------------------------------springbatch集成quartz的重要配置
package com.batch.quartz.config;

import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

/**
 * <h3>partitionjob</h3>
 * <p>通用配置</p>
 *
 * @author : hduong
 * @version : 1.0
 * @date : 2019-12-20 11:29
 **/
@Configuration
public class CommonConfig {

    /**
     * 在被容器管理后: 直接自动注入就好了
     */
    protected JobBuilderFactory jobBuilderFactory;

    /**
     * 在被容器管理后: 直接自动注入就好了
     */
    protected StepBuilderFactory stepBuilderFactory;

    @Autowired
    public CommonConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
    }
}
package com.batch.quartz.config;

import com.batch.quartz.batch.listener.JobCompletionNotificationListener;
import com.batch.quartz.entity.Person;
import com.batch.quartz.job.ImportUserQuartzJob;
import com.batch.quartz.reader.PersonItemProcessor;
import org.quartz.JobDataMap;
import org.quartz.Scheduler;
import org.quartz.SchedulerContext;
import org.quartz.spi.JobFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.JobLocator;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.JobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import javax.sql.DataSource;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Executor;

/**
 * <h3>partitionjob</h3>
 * <p>quartz 配置</p>
 *
 * @author : hduong
 * @version : 1.0
 * @date : 2019-12-20 11:25
 **/
@Configuration
public class QuartzConfig extends CommonConfig {
    public QuartzConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        super(jobBuilderFactory, stepBuilderFactory);
    }
    //1: quartz.properties配置
    //2: 任务bean jobDetail配置
    //3: 调度器 schedulerBean配置
    //4: 注册 启动配置
    //5: 线程池配置
    //6: autowiringJobFactory() 配置

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private JobLocator jobLocator;

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
        JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
        jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
        return jobRegistryBeanPostProcessor;
    }

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();

        factory.setSchedulerName("Cluster_Scheduler");
        factory.setDataSource(dataSource);
        factory.setApplicationContextSchedulerContextKey("applicationContext");
        //设置任务执行器
        factory.setTaskExecutor(schedulerThreadPool());
        //配置这个 支持 序列化bean
        SchedulerContext schedulerContext = new SchedulerContext();
        schedulerContext.put("jobName", "importUserJob");
        schedulerContext.put("jobLauncher", jobLauncher);
        schedulerContext.put("jobLocator", jobLocator);
        factory.setSchedulerContextAsMap(schedulerContext);

        //设置触发器 : 绑定了 jobDetail 任务定义
        factory.setTriggers(quartzTrigger().getObject());
        //设置quartz属性
        factory.setQuartzProperties(quartzProperties());
        //job工厂
        factory.setJobFactory(autowiringJobFactory());
        factory.setOverwriteExistingJobs(true);
        factory.setStartupDelay(15);
        return factory;
    }

    /**
     * 启动任务配置 不能少
     *
     * @param
     * @return : org.quartz.Scheduler
     * @date : 2019/12/20 14:43
     * @author : hduong
     */
    @Bean
    public Scheduler scheduler() throws Exception {
        Scheduler scheduler = schedulerFactoryBean().getScheduler();
        scheduler.start();
        return scheduler;
    }

    @Bean
    public CronTriggerFactoryBean quartzTrigger() {
        CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();

        cronTriggerFactoryBean.setJobDetail(importUserJob1().getObject());
        cronTriggerFactoryBean.setCronExpression("0/15 * * * * ?");
        return cronTriggerFactoryBean;
    }

    @Bean
    public JobDetailFactoryBean importUserJob1() {
        JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean();

        jobDetailFactoryBean.setJobClass(ImportUserQuartzJob.class);
        jobDetailFactoryBean.setDurability(true);
        jobDetailFactoryBean.setGroup("Import_User_Group");
        jobDetailFactoryBean.setName("Import_User_Name");
        //使用 schedulerContext 可以序列化:
//        SchedulerContext schedulerContext = new SchedulerContext();
//        schedulerContext.put("jobName", "importUserJob");
//        schedulerContext.put("jobLauncher", jobLauncher);
//        schedulerContext.put("jobLocator", jobRepository);
//        jobDetailFactoryBean.setJobDataAsMap(schedulerContext);
//        jobDetailFactoryBean.set
        //jobDetailFactoryBean.setJobDataMap(jobDataMap);//配置这个无法序列化 locator
        //jobDetailFactoryBean.setJobDataAsMap(jobDataMap);
        jobDetailFactoryBean.setRequestsRecovery(true);
        return jobDetailFactoryBean;
    }

    /**
     * 未配置启动的任务 : 可以手动启动 TODO
     *
     * @param
     * @return : org.springframework.scheduling.quartz.JobDetailFactoryBean
     * @date : 2019/12/20 15:04
     * @author : hduong
     */
//    @Bean
//    public JobDetailFactoryBean quartzJob() {
//        JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean();
//
//        jobDetailFactoryBean.setJobClass(QuartzJob.class);
//        jobDetailFactoryBean.setDurability(true);
//        jobDetailFactoryBean.setGroup("piLiang");
//        jobDetailFactoryBean.setName("piLiang_job");
//        jobDetailFactoryBean.setRequestsRecovery(true);
//        return jobDetailFactoryBean;
//    }
    @Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));

        //在quartz.properties中的属性被读取并注入后再初始化对象
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }


    @Bean
    public JobFactory autowiringJobFactory() {
        return new AutowiringJobFactory();
    }

    @Bean
    public Executor schedulerThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        executor.setCorePoolSize(15);
        executor.setMaxPoolSize(25);
        executor.setQueueCapacity(100);
        return executor;
    }

    @Autowired
    private JobCompletionNotificationListener listener;

    @Bean(name = "importUserJob")
    public Job importUserJob() {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(step1())
                .end()
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Person, Person>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }


//    @Bean
//    @StepScope
//    public CommonFileReader reader(@Value("#{jobParameters[fileName]}") String fileName) {
//        return new CommonFileReader(fileName, Person.class, ",",null);
//    }

    @Bean
    public FlatFileItemReader<Person> reader() {
        return new FlatFileItemReaderBuilder<Person>()
                .name("personItemReader")
                .resource(new ClassPathResource("person.txt"))
                .delimited()
                .names(new String[]{"firstName", "lastName"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                    setTargetType(Person.class);
                }})
                .build();
    }

    @Bean
    public PersonItemProcessor processor() {
        return new PersonItemProcessor();
    }

    @Autowired
    DataSource dataSource;

    @Bean
    public JdbcBatchItemWriter<Person> writer() {
        return new JdbcBatchItemWriterBuilder<Person>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
                .dataSource(dataSource)
                .build();
    }


}

 

以上作为笔记记录

 

 

 

 

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐