限时福利领取


背景痛点

最近在做一个数据分析项目时,遇到了一个典型的大数据量查询问题:当查询结果集超过50万条时,应用服务器的内存直接飙升到90%以上,频繁触发Full GC。这让我开始反思传统的JDBC全量加载机制——它像是一次性把整个仓库的货物全搬进内存,显然不适合海量数据处理场景。

内存溢出示意图

尝试过常规的分页查询(limit offset),但发现两个致命缺陷:

  1. 深度分页性能极差(比如第100万页的offset计算)
  2. 无法保证数据一致性(分页期间数据变更会导致重复或遗漏)

技术方案

流式传输三剑客

通过调研发现了黄金组合:

  1. MyBatis的ResultHandler:像流水线工人一样逐条处理数据
  2. fetchSize参数:控制每次从网络缓冲区读取的批量大小
  3. MySQL游标:服务端维持结果集指针

关键配置示例:

<select id="streamQuery" fetchSize="1000" resultSetType="FORWARD_ONLY" >
  SELECT * FROM large_table WHERE create_time > #{startTime}
</select>

Spring事务特别处理

发现一个坑:流式读取需要保持连接存活,但Spring默认会在方法结束后关闭连接。解决方案:

@Transactional(readOnly = true)
public void processLargeData() {
    // 必须加这个配置!!!
    TransactionSynchronizationManager.setActualTransactionActive(true);
    sqlSession.select("streamQuery", paramMap, new MyResultHandler());
}

代码实战

完整处理器实现(含阿里规范建议):

public class UserStreamHandler implements ResultHandler<User> {
    private static final Logger log = LoggerFactory.getLogger(UserStreamHandler.class);
    private final Consumer<User> consumer;

    public UserStreamHandler(Consumer<User> consumer) {
        this.consumer = consumer;
    }

    @Override
    public void handleResult(ResultContext<? extends User> context) {
        try {
            User user = context.getResultObject();
            // 业务处理(建议控制单条处理时间<100ms)
            consumer.accept(user);

            // 每1000条日志打印
            if(context.getResultCount() % 1000 == 0) {
                log.info("Processing count: {}", context.getResultCount());
            }
        } catch (Exception e) {
            context.stop(); // 遇到异常终止流
            throw new RuntimeException("Process failed at count: " + context.getResultCount(), e);
        }
    }
}

性能对比

用JMH做的压测数据(单位:MB):

| 数据量 | 传统方式 | 流式传输 | |--------|----------|----------| | 10万 | 285 | 32 | | 50万 | 内存溢出 | 45 | | 100万 | - | 58 |

性能对比图

避坑指南

  1. 连接泄漏防护
  2. 务必使用try-with-resources包裹SqlSession
  3. 监控连接池:select * from information_schema.processlist where time > 300

  4. 超时陷阱

  5. 设置合理的transactionTimeout(建议>30分钟)
  6. 对于特别耗时的流:@Transactional(timeout = 3600)

  7. 线程安全

  8. ResultHandler本身非线程安全
  9. 推荐方案:每个线程独立处理器实例 + ThreadLocal变量

延伸思考

最近在探索与Flink的集成方案——将MyBatis流作为Flink的SourceFunction实现。一个伪代码示例:

public class MyBatisSource implements SourceFunction<User> {
    @Override
    public void run(SourceContext<User> ctx) {
        sqlSession.select("streamQuery", params, resultContext -> {
            ctx.collect(resultContext.getResultObject());
        });
    }
    //...
}

对于千万级数据,下一步准备尝试:

  1. 结合MySQL的并行查询(8.0.14+)
  2. 分区键预计算优化
  3. 结果集直接写入OSS等外部存储

思考题:当流式处理遇到网络闪断,如何实现断点续传?欢迎在评论区分享你的方案。

Logo

音视频技术社区,一个全球开发者共同探讨、分享、学习音视频技术的平台,加入我们,与全球开发者一起创造更加优秀的音视频产品!

更多推荐