MyBatis MySQL流式传输实战:解决大数据量查询的内存溢出问题
·
背景痛点
最近在做一个数据分析项目时,遇到了一个典型的大数据量查询问题:当查询结果集超过50万条时,应用服务器的内存直接飙升到90%以上,频繁触发Full GC。这让我开始反思传统的JDBC全量加载机制——它像是一次性把整个仓库的货物全搬进内存,显然不适合海量数据处理场景。

尝试过常规的分页查询(limit offset),但发现两个致命缺陷:
- 深度分页性能极差(比如第100万页的offset计算)
- 无法保证数据一致性(分页期间数据变更会导致重复或遗漏)
技术方案
流式传输三剑客
通过调研发现了黄金组合:
- MyBatis的ResultHandler:像流水线工人一样逐条处理数据
- fetchSize参数:控制每次从网络缓冲区读取的批量大小
- MySQL游标:服务端维持结果集指针
关键配置示例:
<select id="streamQuery" fetchSize="1000" resultSetType="FORWARD_ONLY" >
SELECT * FROM large_table WHERE create_time > #{startTime}
</select>
Spring事务特别处理
发现一个坑:流式读取需要保持连接存活,但Spring默认会在方法结束后关闭连接。解决方案:
@Transactional(readOnly = true)
public void processLargeData() {
// 必须加这个配置!!!
TransactionSynchronizationManager.setActualTransactionActive(true);
sqlSession.select("streamQuery", paramMap, new MyResultHandler());
}
代码实战
完整处理器实现(含阿里规范建议):
public class UserStreamHandler implements ResultHandler<User> {
private static final Logger log = LoggerFactory.getLogger(UserStreamHandler.class);
private final Consumer<User> consumer;
public UserStreamHandler(Consumer<User> consumer) {
this.consumer = consumer;
}
@Override
public void handleResult(ResultContext<? extends User> context) {
try {
User user = context.getResultObject();
// 业务处理(建议控制单条处理时间<100ms)
consumer.accept(user);
// 每1000条日志打印
if(context.getResultCount() % 1000 == 0) {
log.info("Processing count: {}", context.getResultCount());
}
} catch (Exception e) {
context.stop(); // 遇到异常终止流
throw new RuntimeException("Process failed at count: " + context.getResultCount(), e);
}
}
}
性能对比
用JMH做的压测数据(单位:MB):
| 数据量 | 传统方式 | 流式传输 | |--------|----------|----------| | 10万 | 285 | 32 | | 50万 | 内存溢出 | 45 | | 100万 | - | 58 |

避坑指南
- 连接泄漏防护:
- 务必使用try-with-resources包裹SqlSession
-
监控连接池:
select * from information_schema.processlist where time > 300 -
超时陷阱:
- 设置合理的transactionTimeout(建议>30分钟)
-
对于特别耗时的流:
@Transactional(timeout = 3600) -
线程安全:
- ResultHandler本身非线程安全
- 推荐方案:每个线程独立处理器实例 + ThreadLocal变量
延伸思考
最近在探索与Flink的集成方案——将MyBatis流作为Flink的SourceFunction实现。一个伪代码示例:
public class MyBatisSource implements SourceFunction<User> {
@Override
public void run(SourceContext<User> ctx) {
sqlSession.select("streamQuery", params, resultContext -> {
ctx.collect(resultContext.getResultObject());
});
}
//...
}
对于千万级数据,下一步准备尝试:
- 结合MySQL的并行查询(8.0.14+)
- 分区键预计算优化
- 结果集直接写入OSS等外部存储
思考题:当流式处理遇到网络闪断,如何实现断点续传?欢迎在评论区分享你的方案。
更多推荐


所有评论(0)