spring batch集成mybatis及quartz
具体代码见:https://gitee.com/hongdu_dudua1/spring_quartz_mq/tree/master/batch_quartzspring batch 主要是任务批处理的框架, 对每个任务分成三个步骤:1: reader2: processor3: writerspring对quartz的集成主要在 spring-context-suppor...
·
具体代码见: https://gitee.com/hongdu_dudua1/spring_quartz_mq/tree/master/batch_quartz
spring batch 主要是任务批处理的框架, 对每个任务分成三个步骤:
1: reader
2: processor
3: writer
spring对quartz的集成主要在 spring-context-support这个包中
quartz的任务调度集成要点:
1: QuartzJobBean
这个是集成Quartz的job类: 使用springbatch进行集成的时候,在这个类的方法中:
protected void executeInternal(JobExecutionContext context) throws JobExecutionException { //todo spring batch的job执行 }
上面这个部分涉及到很重要的一个类:
SchedulerContext:是调度器 Scheduler的一个属性, 可以存储spring管理的bean, 因此我们在配置的时候可以将springbatch的任务启动器及任务名字还有job定位器JobLocator存放在这个调度器的上下文里面: 参考如下:
Scheduler scheduler = context.getScheduler(); SchedulerContext schedulerContext = scheduler.getContext(); String jobName = schedulerContext.getString("jobName"); JobLauncher jobLauncher = (JobLauncher) schedulerContext.get("jobLauncher"); JobLocator jobLocator = (JobLocator) schedulerContext.get("jobLocator"); System.out.println("jobName : " + jobName); System.out.println("jobLauncher : " + jobLauncher); System.out.println("jobLocator : " + jobLocator); SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String date = sf.format(new Date()); System.out.println("Current Time : " + date); Job job = jobLocator.getJob(jobName); if (job != null) { /*启动spring batch的批处理作业*/ JobExecution jobExecution = jobLauncher.run(job, new JobParametersBuilder().addDate("date", new Date()).toJobParameters()); }
---------------------------------springbatch的配置:
1: job配置
参考: 源码
2: springbatch的事务隔离级别配置
package com.batch.quartz.batch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.batch.core.configuration.BatchConfigurationException; import org.springframework.batch.core.configuration.annotation.BatchConfigurer; import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.explore.support.JobExplorerFactoryBean; import org.springframework.batch.core.explore.support.MapJobExplorerFactoryBean; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean; import org.springframework.batch.support.transaction.ResourcelessTransactionManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager; import javax.annotation.PostConstruct; import javax.sql.DataSource; /** * <h3>partitionjob</h3> * <p>myDefaultBatchConfigurer 重写 : 修改事务隔离级别</p> * * @author : hduong * @version : 1.0 * @date : 2019-12-10 17:00 **/ @Configuration public class MyDefaultBatchConfigurer //extends DefaultBatchConfigurer implements BatchConfigurer { private static final Log logger = LogFactory.getLog(MyDefaultBatchConfigurer.class); private DataSource dataSource; private PlatformTransactionManager transactionManager; private JobRepository jobRepository; private JobLauncher jobLauncher; private JobExplorer jobExplorer; @Autowired( required = false ) public void setDataSource(DataSource dataSource) { if(this.dataSource == null) { this.dataSource = dataSource; } if(this.transactionManager == null) { this.transactionManager = new DataSourceTransactionManager(this.dataSource); } } protected MyDefaultBatchConfigurer() { } public MyDefaultBatchConfigurer(DataSource dataSource) { this.setDataSource(dataSource); } @Override public JobRepository getJobRepository() { return this.jobRepository; } @Override public PlatformTransactionManager getTransactionManager() { return this.transactionManager; } @Override public JobLauncher getJobLauncher() { return this.jobLauncher; } @Override public JobExplorer getJobExplorer() { return this.jobExplorer; } @PostConstruct public void initialize() { try { if(this.dataSource == null) { logger.warn("No datasource was provided...using a Map based JobRepository"); if(this.transactionManager == null) { this.transactionManager = new ResourcelessTransactionManager(); } MapJobRepositoryFactoryBean jobRepositoryFactory = new MapJobRepositoryFactoryBean(this.transactionManager); jobRepositoryFactory.afterPropertiesSet(); this.jobRepository = jobRepositoryFactory.getObject(); MapJobExplorerFactoryBean jobExplorerFactory = new MapJobExplorerFactoryBean(jobRepositoryFactory); jobExplorerFactory.afterPropertiesSet(); this.jobExplorer = jobExplorerFactory.getObject(); } else { this.jobRepository = this.createJobRepository(); this.jobExplorer = this.createJobExplorer(); } this.jobLauncher = this.createJobLauncher(); } catch (Exception var3) { throw new BatchConfigurationException(var3); } } protected JobLauncher createJobLauncher() throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(this.jobRepository); jobLauncher.afterPropertiesSet(); return jobLauncher; } protected JobRepository createJobRepository() throws Exception { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); factory.setDataSource(this.dataSource); factory.setTransactionManager(this.transactionManager); factory.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED"); factory.setDatabaseType("mysql"); factory.afterPropertiesSet(); return factory.getObject(); } protected JobExplorer createJobExplorer() throws Exception { JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean(); jobExplorerFactoryBean.setDataSource(this.dataSource); jobExplorerFactoryBean.afterPropertiesSet(); return jobExplorerFactoryBean.getObject(); } } ------------------------------------springbatch集成quartz的重要配置
package com.batch.quartz.config; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; /** * <h3>partitionjob</h3> * <p>通用配置</p> * * @author : hduong * @version : 1.0 * @date : 2019-12-20 11:29 **/ @Configuration public class CommonConfig { /** * 在被容器管理后: 直接自动注入就好了 */ protected JobBuilderFactory jobBuilderFactory; /** * 在被容器管理后: 直接自动注入就好了 */ protected StepBuilderFactory stepBuilderFactory; @Autowired public CommonConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) { this.jobBuilderFactory = jobBuilderFactory; this.stepBuilderFactory = stepBuilderFactory; } }
package com.batch.quartz.config; import com.batch.quartz.batch.listener.JobCompletionNotificationListener; import com.batch.quartz.entity.Person; import com.batch.quartz.job.ImportUserQuartzJob; import com.batch.quartz.reader.PersonItemProcessor; import org.quartz.JobDataMap; import org.quartz.Scheduler; import org.quartz.SchedulerContext; import org.quartz.spi.JobFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.JobLocator; import org.springframework.batch.core.configuration.JobRegistry; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.PropertiesFactoryBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.quartz.CronTriggerFactoryBean; import org.springframework.scheduling.quartz.JobDetailFactoryBean; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import javax.sql.DataSource; import java.io.IOException; import java.util.Properties; import java.util.concurrent.Executor; /** * <h3>partitionjob</h3> * <p>quartz 配置</p> * * @author : hduong * @version : 1.0 * @date : 2019-12-20 11:25 **/ @Configuration public class QuartzConfig extends CommonConfig { public QuartzConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) { super(jobBuilderFactory, stepBuilderFactory); } //1: quartz.properties配置 //2: 任务bean jobDetail配置 //3: 调度器 schedulerBean配置 //4: 注册 启动配置 //5: 线程池配置 //6: autowiringJobFactory() 配置 @Autowired private JobLauncher jobLauncher; @Autowired private JobLocator jobLocator; @Bean public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) { JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor(); jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry); return jobRegistryBeanPostProcessor; } @Bean public SchedulerFactoryBean schedulerFactoryBean() throws IOException { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setSchedulerName("Cluster_Scheduler"); factory.setDataSource(dataSource); factory.setApplicationContextSchedulerContextKey("applicationContext"); //设置任务执行器 factory.setTaskExecutor(schedulerThreadPool()); //配置这个 支持 序列化bean SchedulerContext schedulerContext = new SchedulerContext(); schedulerContext.put("jobName", "importUserJob"); schedulerContext.put("jobLauncher", jobLauncher); schedulerContext.put("jobLocator", jobLocator); factory.setSchedulerContextAsMap(schedulerContext); //设置触发器 : 绑定了 jobDetail 任务定义 factory.setTriggers(quartzTrigger().getObject()); //设置quartz属性 factory.setQuartzProperties(quartzProperties()); //job工厂 factory.setJobFactory(autowiringJobFactory()); factory.setOverwriteExistingJobs(true); factory.setStartupDelay(15); return factory; } /** * 启动任务配置 不能少 * * @param * @return : org.quartz.Scheduler * @date : 2019/12/20 14:43 * @author : hduong */ @Bean public Scheduler scheduler() throws Exception { Scheduler scheduler = schedulerFactoryBean().getScheduler(); scheduler.start(); return scheduler; } @Bean public CronTriggerFactoryBean quartzTrigger() { CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean(); cronTriggerFactoryBean.setJobDetail(importUserJob1().getObject()); cronTriggerFactoryBean.setCronExpression("0/15 * * * * ?"); return cronTriggerFactoryBean; } @Bean public JobDetailFactoryBean importUserJob1() { JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean(); jobDetailFactoryBean.setJobClass(ImportUserQuartzJob.class); jobDetailFactoryBean.setDurability(true); jobDetailFactoryBean.setGroup("Import_User_Group"); jobDetailFactoryBean.setName("Import_User_Name"); //使用 schedulerContext 可以序列化: // SchedulerContext schedulerContext = new SchedulerContext(); // schedulerContext.put("jobName", "importUserJob"); // schedulerContext.put("jobLauncher", jobLauncher); // schedulerContext.put("jobLocator", jobRepository); // jobDetailFactoryBean.setJobDataAsMap(schedulerContext); // jobDetailFactoryBean.set //jobDetailFactoryBean.setJobDataMap(jobDataMap);//配置这个无法序列化 locator //jobDetailFactoryBean.setJobDataAsMap(jobDataMap); jobDetailFactoryBean.setRequestsRecovery(true); return jobDetailFactoryBean; } /** * 未配置启动的任务 : 可以手动启动 TODO * * @param * @return : org.springframework.scheduling.quartz.JobDetailFactoryBean * @date : 2019/12/20 15:04 * @author : hduong */ // @Bean // public JobDetailFactoryBean quartzJob() { // JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean(); // // jobDetailFactoryBean.setJobClass(QuartzJob.class); // jobDetailFactoryBean.setDurability(true); // jobDetailFactoryBean.setGroup("piLiang"); // jobDetailFactoryBean.setName("piLiang_job"); // jobDetailFactoryBean.setRequestsRecovery(true); // return jobDetailFactoryBean; // } @Bean public Properties quartzProperties() throws IOException { PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties")); //在quartz.properties中的属性被读取并注入后再初始化对象 propertiesFactoryBean.afterPropertiesSet(); return propertiesFactoryBean.getObject(); } @Bean public JobFactory autowiringJobFactory() { return new AutowiringJobFactory(); } @Bean public Executor schedulerThreadPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(15); executor.setMaxPoolSize(25); executor.setQueueCapacity(100); return executor; } @Autowired private JobCompletionNotificationListener listener; @Bean(name = "importUserJob") public Job importUserJob() { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1()) .end() .build(); } @Bean public Step step1() { return stepBuilderFactory.get("step1") .<Person, Person>chunk(10) .reader(reader()) .processor(processor()) .writer(writer()) .build(); } // @Bean // @StepScope // public CommonFileReader reader(@Value("#{jobParameters[fileName]}") String fileName) { // return new CommonFileReader(fileName, Person.class, ",",null); // } @Bean public FlatFileItemReader<Person> reader() { return new FlatFileItemReaderBuilder<Person>() .name("personItemReader") .resource(new ClassPathResource("person.txt")) .delimited() .names(new String[]{"firstName", "lastName"}) .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ setTargetType(Person.class); }}) .build(); } @Bean public PersonItemProcessor processor() { return new PersonItemProcessor(); } @Autowired DataSource dataSource; @Bean public JdbcBatchItemWriter<Person> writer() { return new JdbcBatchItemWriterBuilder<Person>() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)") .dataSource(dataSource) .build(); } }
以上作为笔记记录
更多推荐
已为社区贡献1条内容
所有评论(0)