Spring Boot分段处理List集合多线程批量插入数据
Spring Boot分段处理List集合多线程批量插入数据
·
项目场景:
大数据量的List集合,需要把List集合中的数据批量插入数据库中。
解决方案:
拆分list集合后,然后使用多线程批量插入数据库
1.实体类
package com.test.entity;
import lombok.Data;
@Data
public class TestEntity {
private String id;
private String name;
}
2.Mapper
如果数据量不大,用foreach标签就足够了。如果数据量很大,建议使用batch模式。
package com.test.mapper;
import java.util.List;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import com.test.entity.TestEntity;
public interface TestMapper {
/**
* 1.用于使用batch模式,ExecutorType.BATCH开启批处理模式
* 数据量很大,推荐这种方式
*/
@Insert("insert into test(id, name) "
+ " values"
+ " (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR})")
void testInsert(TestEntity testEntity);
/**
* 2.使用foreach标签,批量保存
* 数据量少可以使用这种方式
*/
@Insert("insert into test(id, name) "
+ " values"
+ " <foreach collection='list' item='item' index='index' separator=','>"
+ " (#{item.id,jdbcType=VARCHAR}, #{item.name,jdbcType=VARCHAR})"
+ " </foreach>")
void testBatchInsert(@Param("list") List<TestEntity> list);
}
3.spring容器注入线程池bean对象
package com.test.config;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@EnableAsync
public class ExecutorConfig {
/**
* 异步任务自定义线程池
*/
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(50);
//配置最大线程数
executor.setMaxPoolSize(500);
//配置队列大小
executor.setQueueCapacity(300);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("testExecutor-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//调用shutdown()方法时等待所有的任务完成后再关闭
executor.setWaitForTasksToCompleteOnShutdown(true);
//等待所有任务完成后的最大等待时间
executor.setAwaitTerminationSeconds(60);
return executor;
}
}
4.创建异步线程业务类
package com.test.service;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import com.test.entity.TestEntity;
import com.test.mapper.TestMapper;
@Service
public class AsyncService {
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Async("asyncServiceExecutor")
public void executeAsync(List<String> logOutputResults, CountDownLatch countDownLatch) {
//获取session,打开批处理,因为是多线程,所以每个线程都要开启一个事务
SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH);
try{
TestMapper mapper = session.getMapper(TestMapper.class);
//异步线程要做的事情
for (int i = 0; i < logOutputResults.size(); i++) {
System.out.println(Thread.currentThread().getName() + "线程:" + logOutputResults.get(i));
TestEntity test = new TestEntity();
//test.set()
//.............
//批量保存
mapper.testInsert(test);
//每1000条提交一次防止内存溢出
if(i%1000==0){
session.flushStatements();
}
}
//提交剩下未处理的事务
session.flushStatements();
}finally {
countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放
if(session != null){
session.close();
}
}
}
}
5.拆分list调用异步的业务方法
package com.test.service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
@Service
public class TestService {
@Resource
private AsyncService asyncService;
public int testMultiThread() {
List<String> logOutputResults = getTestData();
//按线程数拆分后的list
List<List<String>> lists = splitList(logOutputResults);
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
for (List<String> listSub:lists) {
asyncService.executeAsync(listSub, countDownLatch);
}
try {
countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
// 这样就可以在下面拿到所有线程执行完的集合结果
} catch (Exception e) {
e.printStackTrace();
}
return logOutputResults.size();
}
public List<String> getTestData() {
List<String> logOutputResults = new ArrayList<String>();
for (int i = 0; i < 3000; i++) {
logOutputResults.add("测试数据"+i);
}
return logOutputResults;
}
public List<List<String>> splitList(List<String> logOutputResults) {
List<List<String>> results = new ArrayList<List<String>>();
/*动态线程数方式*/
// 每500条数据开启一条线程
int threadSize = 500;
// 总数据条数
int dataSize = logOutputResults.size();
// 线程数,动态生成
int threadNum = dataSize / threadSize + 1;
/*固定线程数方式
// 线程数
int threadNum = 6;
// 总数据条数
int dataSize = logOutputResults.size();
// 每一条线程处理多少条数据
int threadSize = dataSize / (threadNum - 1);
*/
// 定义标记,过滤threadNum为整数
boolean special = dataSize % threadSize == 0;
List<String> cutList = null;
// 确定每条线程的数据
for (int i = 0; i < threadNum; i++) {
if (i == threadNum - 1) {
if (special) {
break;
}
cutList = logOutputResults.subList(threadSize * i, dataSize);
} else {
cutList = logOutputResults.subList(threadSize * i, threadSize * (i + 1));
}
results.add(cutList);
}
return results;
}
}
5.Controller测试
@RestController
public class TestController {
@Resource
private TestService testService;
@RequestMapping(value = "/log", method = RequestMethod.GET)
@ApiOperation(value = "测试")
public String test() {
testService.testMultiThread();
return "success";
}
}
总结:
注意这里执行插入的数据是无序的。
更多推荐
已为社区贡献1条内容
所有评论(0)