《Spring batch》实战之实现数据批处理
🍎 微服务系列推荐spring cloud Alibaba实战🍎 消息中间件系列推荐Apache Pulsar云原生消息中间件📅 最近更新 :2022年4月13日🍎 点赞👍 收藏收藏✨留言📑 都是我最大的动力!!spring batch简介spring batch是spring提供的一个数据处理框架。企业域中的许多应用程序需要批量处理才能在关键任务环境中执行业务操作。 这些业务运营包括
🍎 微服务系列推荐 spring cloud Alibaba实战
🍎 消息中间件系列推荐 Apache Pulsar云原生消息中间件
📅 最近更新 :2022年4月13日
🍎 点赞👍 收藏收藏✨ 留言📑 都是我最大的动力!!
spring batch简介
spring batch是spring提供的一个数据处理框架。企业域中的许多应用程序需要批量处理才能在关键任务环境中执行业务操作。 这些业务运营包括:
无需用户交互即可最有效地处理大量信息的自动化,复杂处理。 这些操作通常包括基于时间的事件(例如月末计算,通知或通信)。
在非常大的数据集中重复处理复杂业务规则的定期应用(例如,保险利益确定或费率调整)。
集成从内部和外部系统接收的信息,这些信息通常需要以事务方式格式化,验证和处理到记录系统中。 批处理用于每天为企业处理数十亿的交易。
Spring Batch是一个轻量级,全面的批处理框架,旨在开发对企业系统日常运营至关重要的强大批处理应用程序。 Spring Batch构建了人们期望的Spring Framework特性(生产力,基于POJO的开发方法和一般易用性),同时使开发人员可以在必要时轻松访问和利用更高级的企业服务。 Spring Batch不是一个schuedling的框架。
Spring Batch提供了可重用的功能,这些功能对于处理大量的数据至关重要,包括记录/跟踪,事务管理,作业处理统计,作业重启,跳过和资源管理。 它还提供更高级的技术服务和功能,通过优化和分区技术实现极高容量和高性能的批处理作业。 Spring Batch可用于两种简单的用例(例如将文件读入数据库或运行存储过程)以及复杂的大量用例(例如在数据库之间移动大量数据,转换它等等) 上)。 大批量批处理作业可以高度可扩展的方式利用该框架来处理大量信息。
一、使用flow包装step,一个flow可以包装多个step
package com.example.dzx.springbatch666.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 500007
* @ClassName:
* @Description: 使用flow包装step,一个flow可以包装多个step
* @date 2022年04月12日 14:56:41
*/
@Configuration
@EnableBatchProcessing
@Slf4j
public class JobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job helloWorldJob() {
return this.jobBuilderFactory.get("helloWorldJob")
// .start(step1()).next(step2()).next(step3()).build();
.start(step1()).on("COMPLETED").to(step2()).from(step2())
.on("COMPLETED").to(step3()).from(step3())
.end().build();
}
@Bean
public Step step3() {
return this.stepBuilderFactory.get("step3")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
log.info("step3");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step step2() {
return this.stepBuilderFactory.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
log.info("step2");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
log.info("step1");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Flow splitFlow1(){
return new FlowBuilder<Flow>("splitFlow1")
.start(step1())
.build();
}
@Bean
public Flow splitFlow2(){
return new FlowBuilder<Flow>("splitFlow2")
.start(step2())
.next(step3())
.build();
}
}
二、使用split实现step的并行处理
package com.example.dzx.springbatch666.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
/**
* @author 500007
* @ClassName:
* @Description: 使用split实现step的并行处理
* @date 2022年04月12日 14:56:41
*/
@Configuration
@EnableBatchProcessing
@Slf4j
public class JobFlowConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step step3() {
return this.stepBuilderFactory.get("step3")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
log.info("step3");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step step2() {
return this.stepBuilderFactory.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
log.info("step2");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
log.info("step1");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Flow splitFlow1(){
return new FlowBuilder<Flow>("splitFlow1")
.start(step1())
.build();
}
@Bean
public Flow splitFlow2(){
return new FlowBuilder<Flow>("splitFlow2")
.start(step2())
.next(step3())
.build();
}
@Bean
public Job helloWorldJob() {
return this.jobBuilderFactory.get("helloWorldJob")
.start(splitFlow1())
.split(new SimpleAsyncTaskExecutor())
.add(splitFlow2())
.end().build();
}
}
三、自定义reader类读取数据并输出到控制台

package com.example.dzx.springbatch666.config;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import java.util.Iterator;
import java.util.List;
/**
* @author 500007
* @ClassName:
* @Description: 自定义reader类,读取list数据
* @date 2022年04月13日 09:50:10
*/
public class MyReader implements ItemReader<String> {
private Iterator<String> iterator;
public MyReader(Iterator<String> iterator) {
this.iterator = iterator;
}
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (iterator.hasNext()) {
return iterator.next();
} else {
return null;
}
}
}
package com.example.dzx.springbatch666.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* @author 500007
* @ClassName:
* @Description: 从list中读取数据并输出到控制台
* @date 2022年04月13日 09:24:22
*/
@Configuration
@EnableBatchProcessing
public class ItemReadJobConfig {
//注入创建任务对象的工厂类
@Autowired
private JobBuilderFactory jobBuilderFactory;
//注入创建step对象的工厂类
@Autowired
private StepBuilderFactory stepBuilderFactory;
//声明一个包含step1步骤的job
@Bean
public Job itemReaderDemoJob() {
return this.jobBuilderFactory.get("itemReaderDemoJob").start(step1()).build();
}
//声明一个step
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1").chunk(2)
.reader(itemReaderDemoRead())
.writer(list -> {
//通过writer输出自定义reader读取到的数据
list.forEach(System.out::println);
}).build();
}
//声明一个自定义的reader类
@Bean
public MyReader itemReaderDemoRead() {
List<String> list = Arrays.asList("cat","dog","pig","duck");
return new MyReader(list.iterator());
}
}
四、使用JdbcPagingItemReader 从数据库中读取数据
(1)编写User实体类
package com.example.dzx.springbatch666.jdbc;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author 500007
* @ClassName:
* @Description:
* @date 2022年04月13日 11:22:51
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class User {
private Integer id;
private String username;
private String password;
private Integer age;
}
(2)从数据库中读取用户信息并输出到控制台
package com.example.dzx.springbatch666.jdbc;
import com.example.dzx.springbatch666.config.MyReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.RowMapper;
import javax.jws.soap.SOAPBinding;
import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author 500007
* @ClassName:
* @Description: 从数据库中读取用户信息并输出到控制台
* @date 2022年04月13日 09:24:22
*/
@Configuration
@EnableBatchProcessing
public class ItemReadJobConfig {
//注入创建任务对象的工厂类
@Autowired
private JobBuilderFactory jobBuilderFactory;
//注入创建step对象的工厂类
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Autowired
@Qualifier("userWriter")
private ItemWriter userWriter;
//声明一个包含step1步骤的job
@Bean
public Job itemReaderDemoJob() {
return this.jobBuilderFactory.get("itemReaderDemoJob").start(step1()).build();
}
//声明一个step
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1").chunk(2)
.reader(itemReaderDemoRead())
.writer(userWriter)
.build();
}
//声明一个自定义的reader类
@Bean
@StepScope
public JdbcPagingItemReader<User> itemReaderDemoRead() {
JdbcPagingItemReader<User> userJdbcPagingItemReader = new JdbcPagingItemReader<>();
//设置数据源
userJdbcPagingItemReader.setDataSource(dataSource);
userJdbcPagingItemReader.setFetchSize(2);
//设置读取行和对象的映射关系
userJdbcPagingItemReader.setRowMapper(new RowMapper<User>() {
@Override
public User mapRow(ResultSet resultSet, int i) throws SQLException {
User user = new User();
user.setId(resultSet.getInt(1));
user.setUsername(resultSet.getString(2));
user.setPassword(resultSet.getString(3));
user.setAge(resultSet.getInt(4));
return user;
}
});
//设置查询sql语句
MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
mySqlPagingQueryProvider.setSelectClause("id,username,password,age");
mySqlPagingQueryProvider.setFromClause("from user");
//设置根据id进行升序
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
mySqlPagingQueryProvider.setSortKeys(sortKeys);
userJdbcPagingItemReader.setQueryProvider(mySqlPagingQueryProvider);
return userJdbcPagingItemReader;
}
}
(3)编写自定义writer
package com.example.dzx.springbatch666.jdbc;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
/**
* @author 500007
* @ClassName:
* @Description: 自定义writer
* @date 2022年04月13日 14:02:32
*/
@Component("userWriter")
public class UserWriter implements ItemWriter<User> {
@Override
public void write(List<? extends User> list) throws Exception {
for(User user:list){
System.out.println("输出用户信息:"+user);
}
}
}
启动项目看到 控制台中打印出用户信息如下:

五、使用FlatFileItemReader从文件中读取数据客户信息
(1)现有如下图所示的txt文本文件,里面有1000条客户信息

(2)编写客户Customer实体类
package com.example.dzx.springbatch666.file;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author 500007
* @ClassName:
* @Description:
* @date 2022年04月13日 14:33:54
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthday;
}
(3)编写从txt文件读取客户信息数据的逻辑
package com.example.dzx.springbatch666.file;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.validation.BindException;
import java.util.StringTokenizer;
/**
* @author 500007
* @ClassName:
* @Description: 从txt文件读取客户信息数据
* @date 2022年04月13日 14:26:45
*/
@Configuration
@EnableBatchProcessing
public class FileReadConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("customerWriter")
private CustomerWriter customerWriter;
@Bean
public Job fileReadJob() {
return this.jobBuilderFactory.get("fileReadJob").start(fileReadStep()).build();
}
@Bean
public Step fileReadStep() {
return this.stepBuilderFactory.get("fileReadStep")
.<Customer, Customer>chunk(10)
.reader(fileItemReader())
.writer(customerWriter)
.build();
}
@Bean
@StepScope
public FlatFileItemReader<Customer> fileItemReader() {
FlatFileItemReader<Customer> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setResource(new ClassPathResource("customer.txt"));
//调过第一行表头
flatFileItemReader.setLinesToSkip(1);
//解析数据
DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
delimitedLineTokenizer.setNames(new String[]{"id", "firstName", "lastName", "birthday"});
//把解析出的一行数据映射为Customer对象
DefaultLineMapper<Customer> mapper = new DefaultLineMapper<>();
mapper.setLineTokenizer(delimitedLineTokenizer);
mapper.setFieldSetMapper(new FieldSetMapper<Customer>() {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
return new Customer(fieldSet.readLong("id"),
fieldSet.readString("firstName"),
fieldSet.readString("lastName"),
fieldSet.readString("birthday"));
}
});
mapper.afterPropertiesSet();
//设置映射对象
flatFileItemReader.setLineMapper(mapper);
return flatFileItemReader;
}
}
(4)自定义客户信息writer类
package com.example.dzx.springbatch666.file;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author 500007
* @ClassName:
* @Description:
* @date 2022年04月13日 14:38:59
*/
@Component("customerWriter")
public class CustomerWriter implements ItemWriter<Customer> {
@Override
public void write(List<? extends Customer> list) throws Exception {
for (Customer customer : list) {
System.out.println("客户信息:" + customer);
}
}
}
(5)启动项目,看到控制台读取成功,输出1000条客户信息如下

六、使用 StaxEventItemReader从 xml中读取数据
(1) 编写客户实体类
package com.example.dzx.springbatch666.xml;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author 500007
* @ClassName:
* @Description:
* @date 2022年04月13日 14:33:54
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthday;
}
(2)自定义writer
package com.example.dzx.springbatch666.xml;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author 500007
* @ClassName:
* @Description:
* @date 2022年04月14日 16:41:17
*/
@Component
public class CustomerWriter implements ItemWriter<Customer> {
@Override
public void write(List<? extends Customer> list) throws Exception {
list.forEach(customer -> {
System.out.println("输出客户信息:" + customer);
});
}
}
(3)编写xmlReaderConfig 主配置类
package com.example.dzx.springbatch666.xml;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.xml.StaxEventItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
import java.util.HashMap;
import java.util.Map;
/**
* @author 500007
* @ClassName:
* @Description:
* @date 2022年04月14日 16:34:12
*/
@Configuration
@EnableBatchProcessing
public class XmlReaderConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private CustomerWriter customerWriter;
@Bean
public Job xmlReadJob() throws Exception {
return this.jobBuilderFactory.get("xmlReadJob")
.start(xmlReadStep())
.build();
}
@Bean
public Step xmlReadStep() throws Exception {
return this.stepBuilderFactory.get("xmlReadStep")
.<Customer, Customer>chunk(10)
.reader(staxEventItemReader())
.writer(customerWriter)
.build();
}
@Bean
public StaxEventItemReader staxEventItemReader() throws Exception {
StaxEventItemReader staxEventItemReader = new StaxEventItemReader();
staxEventItemReader.setResource(new ClassPathResource("customer.xml"));
//指定需要处理的跟标签
staxEventItemReader.setFragmentRootElementName("customer");
//把xml转成对象
XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
Map<String, Class> map = new HashMap<>();
map.put("customer", Customer.class);
xStreamMarshaller.setAliases(map);
staxEventItemReader.setUnmarshaller(xStreamMarshaller);
staxEventItemReader.afterPropertiesSet();
return staxEventItemReader;
}
}
(4)启动项目看到控制台,输出从xml中读取到的客户信息如下

七、使用MultiResourceItemReader同时读取多个文件数据
package com.example.dzx.springbatch666.multifile;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.validation.BindException;
import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
/**
* @author 500007
* @ClassName:
* @Description: 从list中读取数据并输出到控制台
* @date 2022年04月13日 09:24:22
*/
@Configuration
@EnableBatchProcessing
public class ItemReadJobConfig {
//注入创建任务对象的工厂类
@Autowired
private JobBuilderFactory jobBuilderFactory;
//注入创建step对象的工厂类
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Autowired
private CustomerWriter customerWriter;
@Value("classpath:/file*.txt")
private Resource[] resources;
//声明一个包含step1步骤的job
@Bean
public Job itemReaderDemoJob() {
return this.jobBuilderFactory.get("itemReaderDemoJob").start(step1()).build();
}
//声明一个step
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(2)
.reader(multiResourceItemReader())
.writer(customerWriter)
.build();
}
//声明一个自定义的reader类
@Bean
@StepScope
public MultiResourceItemReader<Customer> multiResourceItemReader() {
MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>();
reader.setResources(resources);
reader.setDelegate(fileItemReader());
return reader;
}
public FlatFileItemReader<Customer> fileItemReader() {
FlatFileItemReader<Customer> flatFileItemReader = new FlatFileItemReader<>();
// flatFileItemReader.setResource(new ClassPathResource("customer.txt"));
//调过第一行表头
flatFileItemReader.setLinesToSkip(1);
//解析数据
DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
delimitedLineTokenizer.setNames(new String[]{"id", "firstName", "lastName", "birthday"});
//把解析出的一行数据映射为Customer对象
DefaultLineMapper<Customer> mapper = new DefaultLineMapper<>();
mapper.setLineTokenizer(delimitedLineTokenizer);
mapper.setFieldSetMapper(new FieldSetMapper<Customer>() {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
return new Customer(fieldSet.readLong("id"),
fieldSet.readString("firstName"),
fieldSet.readString("lastName"),
fieldSet.readString("birthday"));
}
});
mapper.afterPropertiesSet();
//设置映射对象
flatFileItemReader.setLineMapper(mapper);
return flatFileItemReader;
}
}
八、使用JobLauncher控制job启动时机
#关闭springbatch的job在项目启动时候自动执行
spring:
batch:
job:
enabled: false
package com.example.dzx.springbatch666.launch;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 500007
* @ClassName:
* @Description:
* @date 2022年04月16日 14:33:31
*/
@RestController
public class LaunchController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("itemReaderDemoJob")
private Job itemReaderDemoJob;
@GetMapping("/job/{msg}")
public String processJob(@PathVariable String msg) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
//把参数传给job
JobParameters jobParameters = new JobParametersBuilder().addString("msg", msg)
.toJobParameters();
//启动任务
jobLauncher.run(itemReaderDemoJob,jobParameters);
return "job success";
}
}
package com.example.dzx.springbatch666.jdbc;
import com.example.dzx.springbatch666.config.MyReader;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.RowMapper;
import javax.jws.soap.SOAPBinding;
import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author 500007
* @ClassName:
* @Description: 从list中读取数据并输出到控制台
* @date 2022年04月13日 09:24:22
*/
@Configuration
@EnableBatchProcessing
public class ItemReadJobConfig implements StepExecutionListener {
//注入创建任务对象的工厂类
@Autowired
private JobBuilderFactory jobBuilderFactory;
//注入创建step对象的工厂类
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Autowired
@Qualifier("userWriter")
private ItemWriter userWriter;
private JobParameters jobParameters;
//声明一个包含step1步骤的job
@Bean
public Job itemReaderDemoJob() {
return this.jobBuilderFactory.get("itemReaderDemoJob").start(step1()).build();
}
//声明一个step
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
//设置监听器
.listener(this)
.chunk(2)
.reader(itemReaderDemoRead())
.writer(userWriter)
.build();
}
//声明一个自定义的reader类
@Bean
@StepScope
public JdbcPagingItemReader<User> itemReaderDemoRead() {
JdbcPagingItemReader<User> userJdbcPagingItemReader = new JdbcPagingItemReader<>();
//设置数据源
userJdbcPagingItemReader.setDataSource(dataSource);
userJdbcPagingItemReader.setFetchSize(2);
//设置读取行和对象的映射关系
userJdbcPagingItemReader.setRowMapper(new RowMapper<User>() {
@Override
public User mapRow(ResultSet resultSet, int i) throws SQLException {
User user = new User();
user.setId(resultSet.getInt(1));
user.setUsername(resultSet.getString(2));
user.setPassword(resultSet.getString(3));
user.setAge(resultSet.getInt(4));
return user;
}
});
//设置查询sql语句
MySqlPagingQueryProvider mySqlPagingQueryProvider = new MySqlPagingQueryProvider();
mySqlPagingQueryProvider.setSelectClause("id,username,password,age");
mySqlPagingQueryProvider.setFromClause("from user");
//设置根据id进行升序
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
mySqlPagingQueryProvider.setSortKeys(sortKeys);
userJdbcPagingItemReader.setQueryProvider(mySqlPagingQueryProvider);
return userJdbcPagingItemReader;
}
//step中设置了监听,覆写了beforeStep和afterStep方法,可以获取到jobParameters
@Override
public void beforeStep(StepExecution stepExecution) {
jobParameters = stepExecution.getJobParameters();
System.out.println(jobParameters.getString("msg"));
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
}
更多推荐



所有评论(0)