【Java项目技术亮点】冷热数据分离架构:MySQL表数据过亿后的终极解决方案
·
订单表数据突破5000万,查询从毫秒级降到秒级,DBA天天催你分表——这就是单表数据量过大的噩梦。本文将揭秘美团、饿了么等大厂如何通过冷热数据分离架构,在不改变业务代码的前提下,让MySQL轻松应对亿级数据量,查询性能提升10倍以上。

文章目录
一、场景引入:一张订单表引发的血案
1.1 真实案例
某电商平台,订单表数据量持续增长:
时间线:
第1个月:订单表100万条,查询平均50ms,一切正常
第6个月:订单表1000万条,查询平均200ms,开始变慢
第12个月:订单表5000万条,查询平均2s,DBA告警
第18个月:订单表1亿条,查询平均5-10s,偶尔超时
第24个月:订单表2亿条,主从延迟严重,夜间归档任务跑不动
症状:
- 用户查看"我的订单"需要3-5秒
- 商家查询"今日订单"超时
- 运营后台导出报表直接把库打挂
- 主从延迟从毫秒级飙升到分钟级
问题根源:所有数据(包括几年前的历史订单)都存在同一张表中,MySQL在执行查询时需要扫描大量无用数据。
1.2 传统方案的痛点
| 方案 | 问题 | 后果 |
|---|---|---|
| 加索引 | 索引树过大,写入变慢 | 写入性能下降50%+ |
| 分库分表 | 业务代码侵入性强 | 改造周期长,风险大 |
| 读写分离 | 从库数据量同样大 | 从库查询也慢 |
| 定期删除 | 历史数据丢失 | 运营分析、财务对账无法进行 |
| 数据归档到Hive | 查询延迟高 | 实时性差,不适合在线查询 |
二、解决方案:冷热数据分离架构
2.1 什么是冷热数据分离?
核心思想:根据数据访问频率,将数据分布到不同的存储介质上。
冷热数据分离架构:
┌─────────────────────────────────────────────────────────────────────┐
│ 应用层(业务代码无需改动) │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 数据路由层(中间件) │ │
│ │ │ │
│ │ 根据查询条件自动路由到对应数据源 │ │
│ │ - 近7天 → Redis + MySQL主库 │ │
│ │ - 近3个月 → MySQL从库 │ │
│ │ - 3个月以前 → Elasticsearch │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 热数据层 │ │ 温数据层 │ │ 冷数据层 │ │
│ │ │ │ │ │ │ │
│ │ Redis缓存 │ │ MySQL从库 │ │ Elasticsear │ │
│ │ MySQL主库 │ │ (近3个月) │ │ ch/Hive │ │
│ │ (近7天) │ │ │ │ (3个月以前) │ │
│ │ │ │ │ │ │ │
│ │ 数据量: │ │ 数据量: │ │ 数据量: │ │
│ │ ~500万 │ │ ~5000万 │ │ ~5亿+ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
2.2 冷热数据划分标准
| 数据层级 | 时间范围 | 存储介质 | 查询方式 | 典型场景 |
|---|---|---|---|---|
| 热数据 | 近7天 | Redis + MySQL主库 | 实时查询 | 用户查看近期订单、商家接单 |
| 温数据 | 近3个月 | MySQL从库 | 准实时查询 | 用户查看历史订单、运营统计 |
| 冷数据 | 3个月以前 | Elasticsearch/Hive | 离线/搜索查询 | 财务对账、数据分析、历史查询 |
经验之谈:冷热数据的划分标准不是固定的,要根据业务场景调整。比如外卖平台,7天前的订单基本不会再被用户频繁查看;但电商平台的退款周期可能是15天,所以热数据可能需要保留15天。
三、实战代码:从零实现冷热数据分离
3.1 数据源配置
/**
* 冷热数据源配置
*/
@Configuration
public class DataSourceConfig {
/**
* 热数据源(MySQL主库)
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource.hot")
public DataSource hotDataSource() {
return DataSourceBuilder.create().build();
}
/**
* 温数据源(MySQL从库)
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource.warm")
public DataSource warmDataSource() {
return DataSourceBuilder.create().build();
}
/**
* 冷数据源(Elasticsearch)
*/
@Bean
public RestHighLevelClient coldDataSource() {
return new RestHighLevelClient(
RestClient.builder(
new HttpHost("es-host", 9200, "http")
)
);
}
}
3.2 数据路由注解
/**
* 冷热数据路由注解
* 标注在查询方法上,自动根据时间范围路由到对应数据源
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface HotColdRoute {
/**
* 时间字段名
*/
String timeField() default "create_time";
/**
* 热数据阈值(天)
*/
int hotDays() default 7;
/**
* 温数据阈值(天)
*/
int warmDays() default 90;
}
3.3 数据路由切面
/**
* 冷热数据路由切面
* 拦截查询请求,根据时间条件自动路由
*/
@Aspect
@Component
@Slf4j
public class HotColdRouteAspect {
@Autowired
private HotDataSourceRouter dataSourceRouter;
@Autowired
private OrderHotMapper orderHotMapper;
@Autowired
private OrderWarmMapper orderWarmMapper;
@Autowired
private OrderColdService orderColdService;
@Around("@annotation(route)")
public Object route(ProceedingJoinPoint point, HotColdRoute route) throws Throwable {
// 1. 解析查询参数中的时间范围
TimeRange timeRange = parseTimeRange(point, route);
if (timeRange == null) {
// 没有时间条件,默认查热数据
return point.proceed();
}
// 2. 根据时间范围路由到对应数据源
if (timeRange.isHot(route.getHotDays())) {
// 热数据:MySQL主库
log.debug("路由到热数据源: {}", timeRange);
dataSourceRouter.setDataSource(DataSourceType.HOT);
return point.proceed();
} else if (timeRange.isWarm(route.getWarmDays())) {
// 温数据:MySQL从库
log.debug("路由到温数据源: {}", timeRange);
dataSourceRouter.setDataSource(DataSourceType.WARM);
return point.proceed();
} else {
// 冷数据:Elasticsearch
log.debug("路由到冷数据源: {}", timeRange);
return routeToColdData(point, timeRange);
}
}
/**
* 解析查询参数中的时间范围
*/
private TimeRange parseTimeRange(ProceedingJoinPoint point, HotColdRoute route) {
Object[] args = point.getArgs();
for (Object arg : args) {
if (arg instanceof Map) {
Map<?, ?> params = (Map<?, ?>) arg;
Object startTime = params.get("startTime");
Object endTime = params.get("endTime");
if (startTime != null && endTime != null) {
return new TimeRange(
parseDate(startTime),
parseDate(endTime)
);
}
}
}
return null;
}
/**
* 路由到冷数据(Elasticsearch)
*/
private Object routeToColdData(ProceedingJoinPoint point, TimeRange timeRange) {
// 调用ES查询服务
return orderColdService.searchFromES(timeRange);
}
}
3.4 数据迁移服务
/**
* 冷热数据迁移服务
* 定时将热数据迁移到温数据,温数据迁移到冷数据
*/
@Component
@Slf4j
public class DataMigrationService {
@Autowired
private OrderHotMapper orderHotMapper;
@Autowired
private OrderWarmMapper orderWarmMapper;
@Autowired
private RestHighLevelClient esClient;
@Autowired
private StringRedisTemplate redisTemplate;
private static final String MIGRATION_LOCK_KEY = "lock:data:migration";
private static final int BATCH_SIZE = 1000;
/**
* 热数据 → 温数据(每天凌晨执行)
* 将7天前的数据从主库迁移到从库
*/
@Scheduled(cron = "0 2 * * *")
public void migrateHotToWarm() {
RLock lock = redissonClient.getLock(MIGRATION_LOCK_KEY + ":hot2warm");
try {
if (!lock.tryLock(0, 60, TimeUnit.SECONDS)) {
return;
}
log.info("🔄 开始热数据→温数据迁移...");
LocalDateTime threshold = LocalDateTime.now().minusDays(7);
int totalMigrated = 0;
while (true) {
// 分批查询需要迁移的数据
List<Order> orders = orderHotMapper.selectByCreateTimeBefore(
threshold, BATCH_SIZE);
if (CollUtil.isEmpty(orders)) {
break;
}
// 写入温数据(从库)
orderWarmMapper.batchInsert(orders);
// 从热数据(主库)删除
List<Long> ids = orders.stream()
.map(Order::getId)
.collect(Collectors.toList());
orderHotMapper.deleteByIds(ids);
totalMigrated += orders.size();
log.info("已迁移 {} 条数据", totalMigrated);
}
log.info("✅ 热数据→温数据迁移完成,共迁移 {} 条", totalMigrated);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
/**
* 温数据 → 冷数据(每周执行)
* 将3个月前的数据从MySQL迁移到Elasticsearch
*/
@Scheduled(cron = "0 3 * * 0")
public void migrateWarmToCold() {
RLock lock = redissonClient.getLock(MIGRATION_LOCK_KEY + ":warm2cold");
try {
if (!lock.tryLock(0, 120, TimeUnit.SECONDS)) {
return;
}
log.info("🔄 开始温数据→冷数据迁移...");
LocalDateTime threshold = LocalDateTime.now().minusMonths(3);
int totalMigrated = 0;
while (true) {
List<Order> orders = orderWarmMapper.selectByCreateTimeBefore(
threshold, BATCH_SIZE);
if (CollUtil.isEmpty(orders)) {
break;
}
// 批量写入Elasticsearch
BulkRequest bulkRequest = new BulkRequest();
for (Order order : orders) {
IndexRequest request = new IndexRequest("order_archive")
.id(String.valueOf(order.getId()))
.source(JSON.toJSONString(order), XContentType.JSON);
bulkRequest.add(request);
}
BulkResponse response = esClient.bulk(bulkRequest,
RequestOptions.DEFAULT);
if (response.hasFailures()) {
log.error("❌ ES写入部分失败: {}", response.buildFailureMessage());
}
// 从温数据删除
List<Long> ids = orders.stream()
.map(Order::getId)
.collect(Collectors.toList());
orderWarmMapper.deleteByIds(ids);
totalMigrated += orders.size();
log.info("已迁移 {} 条数据到ES", totalMigrated);
}
log.info("✅ 温数据→冷数据迁移完成,共迁移 {} 条", totalMigrated);
} catch (Exception e) {
log.error("❌ 温数据→冷数据迁移异常", e);
} finally {
lock.unlock();
}
}
}
3.5 冷数据查询服务
/**
* 冷数据查询服务(Elasticsearch)
*/
@Service
@Slf4j
public class OrderColdService {
@Autowired
private RestHighLevelClient esClient;
/**
* 从ES查询历史订单
*/
public PageResult<Order> searchFromES(TimeRange timeRange) {
SearchRequest searchRequest = new SearchRequest("order_archive");
// 构建查询条件
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.filter(QueryBuilders.rangeQuery("create_time")
.gte(timeRange.getStartTime())
.lte(timeRange.getEndTime()));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(boolQuery);
sourceBuilder.from(timeRange.getOffset());
sourceBuilder.size(timeRange.getPageSize());
sourceBuilder.sort("create_time", SortOrder.DESC);
searchRequest.source(sourceBuilder);
try {
SearchResponse response = esClient.search(searchRequest,
RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
List<Order> orders = new ArrayList<>();
for (SearchHit hit : hits) {
Order order = JSON.parseObject(hit.getSourceAsString(), Order.class);
orders.add(order);
}
return new PageResult<>(
orders,
response.getHits().getTotalHits().value
);
} catch (IOException e) {
log.error("❌ ES查询失败", e);
throw new BizException("查询历史订单失败");
}
}
}
四、高级进阶:数据一致性保障
4.1 双写一致性方案
/**
* 数据写入策略
* 确保热数据和缓存的一致性
*/
@Service
@Slf4j
public class OrderWriteService {
@Autowired
private OrderHotMapper orderHotMapper;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ApplicationEventPublisher eventPublisher;
/**
* 创建订单(写热数据 + 缓存)
*/
@Transactional
public Order createOrder(CreateOrderDTO dto) {
// 1. 写入MySQL主库(热数据)
Order order = new Order();
BeanUtils.copyProperties(dto, order);
order.setCreateTime(LocalDateTime.now());
order.setStatus(OrderStatus.PENDING_PAYMENT);
orderHotMapper.insert(order);
// 2. 写入Redis缓存
String cacheKey = "order:" + order.getId();
redisTemplate.opsForValue().set(cacheKey, order, 7, TimeUnit.DAYS);
// 3. 发送事件(异步通知其他系统)
eventPublisher.publishEvent(new OrderCreatedEvent(order));
return order;
}
/**
* 更新订单状态
*/
@Transactional
public void updateOrderStatus(Long orderId, OrderStatus newStatus) {
// 1. 更新MySQL
orderHotMapper.updateStatus(orderId, newStatus);
// 2. 更新Redis
String cacheKey = "order:" + orderId;
Order cached = (Order) redisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
cached.setStatus(newStatus);
redisTemplate.opsForValue().set(cacheKey, cached, 7, TimeUnit.DAYS);
}
}
}
4.2 迁移监控与告警
/**
* 数据迁移监控
*/
@Component
@Slf4j
public class MigrationMonitor {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String MIGRATION_STATUS_KEY = "migration:status";
/**
* 记录迁移状态
*/
public void recordMigration(String type, int count, long durationMs) {
Map<String, String> status = new HashMap<>();
status.put("type", type);
status.put("count", String.valueOf(count));
status.put("duration", String.valueOf(durationMs));
status.put("timestamp", LocalDateTime.now().toString());
redisTemplate.opsForHash().putAll(MIGRATION_STATUS_KEY, status);
// 告警检查
if (durationMs > 300000) { // 超过5分钟
log.warn("⚠️ 数据迁移耗时过长: type={}, duration={}ms", type, durationMs);
// 发送告警通知
}
}
/**
* 获取迁移进度
*/
public MigrationProgress getProgress() {
Map<Object, Object> status = redisTemplate.opsForHash()
.entries(MIGRATION_STATUS_KEY);
MigrationProgress progress = new MigrationProgress();
progress.setType(String.valueOf(status.get("type")));
progress.setCount(Integer.parseInt(String.valueOf(status.get("count"))));
progress.setTimestamp(String.valueOf(status.get("timestamp")));
return progress;
}
}
五、预判问题与解答
Q1:数据迁移过程中,如果有业务查询怎么办?
A:需要保证迁移的原子性:
策略:
1. 分批迁移,每批1000条
2. 每批迁移在一个事务中:
a. 先写入目标数据源(温库或ES)
b. 确认写入成功后,再删除源数据
3. 如果目标写入失败,不删除源数据,下次重试
4. 查询时,路由层会根据时间自动选择数据源
关键:迁移过程中,同一批数据可能短暂存在于两个数据源中,
但路由层只会路由到一个数据源,不会出现数据重复。
Q2:冷数据查询性能如何保证?
A:
Elasticsearch的优势:
1. 全文检索能力强,支持模糊搜索
2. 聚合查询性能优秀(适合运营报表)
3. 水平扩展简单,数据量无上限
4. 近实时查询(延迟通常在100ms以内)
优化策略:
1. 合理设计ES索引mapping
2. 使用分页查询(search_after替代from/size)
3. 冷数据可以压缩存储(减少磁盘占用)
4. 对于离线分析,可以导出到Hive/ClickHouse
Q3:迁移任务失败怎么办?
A:需要多层保障:
1. 幂等性设计:
- 迁移前检查目标是否已存在
- 已存在则跳过,避免重复迁移
2. 断点续传:
- 记录上次迁移的最大ID/时间
- 下次从断点继续
3. 重试机制:
- 单次失败自动重试3次
- 指数退避(1s, 2s, 4s)
4. 死信处理:
- 超过重试次数的数据记录到死信表
- 人工介入处理
5. 监控告警:
- 迁移耗时超过阈值告警
- 迁移失败率超过阈值告警
Q4:如何确定冷热数据的划分标准?
A:
确定标准的方法:
1. 分析访问日志:
- 统计不同时间段数据的访问频率
- 绘制访问频率随时间变化的曲线
- 找到访问频率的"拐点"
2. 业务需求分析:
- 退款周期(如15天)→ 热数据至少保留15天
- 财务结算周期(如月结)→ 温数据保留3个月
- 合规要求(如数据保留2年)→ 冷数据至少保留2年
3. 容量规划:
- 热数据量 = 日均新增 × 热数据天数
- 确保热数据表不超过5000万条
4. 动态调整:
- 定期评估划分标准
- 根据业务变化调整阈值
Q5:冷热分离和分库分表怎么选?
A:
| 对比项 | 冷热分离 | 分库分表 |
|---|---|---|
| 解决的问题 | 历史数据拖慢查询 | 单表数据量过大 |
| 业务侵入 | 低(路由层自动处理) | 高(需要改SQL和代码) |
| 实现复杂度 | 中等 | 高 |
| 适用场景 | 数据有明显的时间冷热特征 | 数据量大且无明显时间特征 |
| 组合使用 | 可以先冷热分离,再对热数据分表 | ✅ 推荐组合使用 |
六、面试高频考点
考点1:冷热数据分离的核心思想是什么?
参考答案:
核心思想:根据数据的访问频率(温度),将数据分布到不同的存储介质上。
热数据(高频访问)→ Redis + MySQL主库,保证读写性能
温数据(低频访问)→ MySQL从库,保证数据可用
冷数据(极少访问)→ Elasticsearch/Hive,保证存储成本和查询能力
优势:
1. MySQL主库数据量可控(只存热数据)
2. 查询性能提升(热数据表小,索引效率高)
3. 存储成本降低(冷数据用更便宜的存储)
4. 业务代码无感知(通过路由层自动切换)
考点2:数据迁移过程中如何保证一致性?
参考答案:
保证一致性的策略:
1. 分批迁移 + 事务保证:
- 每批迁移在一个事务中
- 先写目标,后删源
- 目标写入失败则不删源
2. 路由层保证查询正确性:
- 迁移过程中,路由层根据时间范围选择数据源
- 同一数据可能短暂存在于两个源,但查询只会命中一个
3. 幂等性保证:
- 迁移前检查目标是否已存在
- 避免重复迁移导致数据不一致
4. 对账机制:
- 定期比对各数据源的数据量
- 发现不一致及时修复
考点3:冷数据为什么选Elasticsearch而不是MySQL?
参考答案:
选择ES的原因:
1. 查询能力:
- ES支持全文检索,MySQL不支持
- ES聚合查询性能优秀
- ES水平扩展简单
2. 存储成本:
- ES可以压缩存储
- ES集群可以使用廉价磁盘
- MySQL存储同样数据量成本更高
3. 数据特征:
- 冷数据以查询为主,很少修改
- ES适合读多写少的场景
- MySQL适合频繁修改的场景
4. 运维成本:
- ES集群扩容比MySQL分库分表简单
- ES不需要复杂的分片路由逻辑
考点4:如何设计数据路由层?
参考答案:
数据路由层设计要点:
1. 路由规则:
- 根据查询条件中的时间字段判断数据温度
- 热数据 → 主库,温数据 → 从库,冷数据 → ES
- 没有时间条件 → 默认查热数据
2. 实现方式:
- AOP切面拦截查询方法
- 解析方法参数中的时间范围
- 动态切换数据源(AbstractRoutingDataSource)
3. 降级策略:
- ES不可用时降级到MySQL从库
- 从库延迟过大时降级到主库
- 主库不可用时返回缓存数据
4. 监控:
- 记录每次路由的数据源和耗时
- 统计各数据源的访问比例
七、总结与最佳实践
7.1 核心要点回顾
冷热数据分离核心流程:
┌─────────────────────────────────────────────────────────────┐
│ 1. 数据分层 │
│ ├── 热数据(近7天)→ Redis + MySQL主库 │
│ ├── 温数据(近3个月)→ MySQL从库 │
│ └── 冷数据(3个月以前)→ Elasticsearch │
│ │
│ 2. 数据路由 │
│ ├── AOP切面拦截查询方法 │
│ ├── 根据时间条件自动路由 │
│ └── 降级策略(ES不可用→从库) │
│ │
│ 3. 数据迁移 │
│ ├── 热数据→温数据(每天凌晨) │
│ ├── 温数据→冷数据(每周执行) │
│ └── 分批迁移+事务保证+断点续传 │
│ │
│ 4. 监控告警 │
│ ├── 迁移进度监控 │
│ ├── 迁移耗时告警 │
│ └── 数据量对账 │
└─────────────────────────────────────────────────────────────┘
7.2 性能提升数据
某电商平台实测数据:
| 指标 | 优化前(单表2亿) | 优化后(冷热分离) | 提升 |
|---|---|---|---|
| 热数据查询 | 2-5秒 | 50-100ms | 50倍↑ |
| 历史数据查询 | 5-10秒 | 200-500ms | 20倍↑ |
| 主库数据量 | 2亿 | 500万 | 97%↓ |
| 主从延迟 | 分钟级 | 毫秒级 | 显著改善 |
| 存储成本 | 高(全用MySQL) | 低(冷数据用ES) | 60%↓ |
八、参考与拓展
互动讨论:你们公司的订单表数据量有多大了?有没有做过冷热数据分离?遇到了什么问题?欢迎在评论区分享!
如果本文对你有帮助,欢迎点赞👍、收藏⭐、关注🔔,持续获取更多Java后端技术干货!
更多推荐
所有评论(0)