Kafka真的会丢数据吗
文章摘要: 作者分享了一个因Kafka配置调整导致数据丢失的案例。在迁移数据库表时,优化了Kafka消费者配置(如增大fetch.min.bytes和fetch.max.wait.ms),导致相同msgId的多条消息被批量拉取,而代码逻辑中因taskId为空导致数据未正确更新。通过日志和流程分析,定位到问题源于配置变更后消息处理时序的变化。经验教训是:优化需充分理解参数影响,避免看似无害的改动引发
一、聊聊天
- 回顾自己工作这五年的时间,能学到知识的时间都是自己意识到确实某一个点比较薄弱,只是停留在会用的阶段,看到很多问题和告警都是束手无策,才下定决心死磕一下。但是为什么平时不学呢,就是因为目标不明确,一边想着通过卷工时就能拿到好的绩效,一边想着每天这么辛苦总归会有成长的。只有到了你去面试的时候才发现,屁都没学会。抱着混年限或者犹豫不决的心态,绝对是自欺欺人,其实你很知道你是什么水平,你只是不敢承认或者不希望有人来戳穿你。所以呢,要明白,工作只给你提供了实践场景,怎么为我所用才是你成长的机会,不要瞻前顾后,也不要怀疑自己。你就去学,每天都花时间去总结。我觉得人还是得有一点盲目自信的那股劲儿,只有你相信了,你才能做到,不管别人相不相信,有想法有计划就勇敢的去做。
二、进入正题
-
问题描述:
最近在做的一个需求是,表迁移,从A库迁到B库,目的是减少数据库/服务的压力。并且发现原来的代码也要相应的迁移到serviceB服务会合适(从服务职能和责任划分来说,不应该放在之前的serivceA)。本来很简单的cv处理。但是我发现三方回调的时候qps会很高,所以想着在迁移代码的时候做了优化,修改一下kafka的配置。改了之后第一天用的好好的,第二天发现明明已经发出去3条消息,但是数据库里却只有1条或2条。 -
流程图:
-
问题分析:
因为前面刚好学过了一些Kafka数据丢失的几种可能的原因,想着正好实践一下,简单说一下kafka可能丢失数据的情况- producer端发送到broker的时候,因为acks设置的为0,可能出现发送消息失败;
- broker端leader和follower副本之间数据同步的时候,leader挂了导致数据丢失;
- 副本之间HW同步有延迟,leader挂了导致HW没有更新,造成数据丢失;
- 操作系统的pageCache写入到磁盘有问题,造成数据丢失;
-
问题定位:
-
查看了kafka平台的监测工具,发现写入消息没问题,也没有出现消息积压和消费延迟,排除写入的问题;
-
消费者端增加日志,发现也都拉取到了,在这里基本上就可以断定kafka没有丢数据;
-
那么问题来了,为什么kafka没丢数据,我写入到数据库里就没有了。这时候去看了一下具体的代码逻辑,大致如下
consume(List<Record>) { 1.根据Record中的messageId分组得到Map<String, Record> msgMap; 注:Collectors.toMap(msgId, record -> record, (o1, o2) -> o2) 2.循环msgMap 3.如果msgId和Record.taskId都不为空,查询msgId和Record.taskId是否已经在数据库中,存在则更新,不存在则新增: 3.如果taskId为空,说明是三方回调,则直接更新; } -
看了上面这段代码,在功能上基本没问题。可能丢数据的情况就是如果taskId为空,但是数据库中不存在,则既不会新增也不会更新;
-
有了这个思路之后,又去增加了日志,发现msgMap中的数据有三条,但是两个Record下的taskId为空,定位到问题所在;
-
又去平台上看了kafka的消息,发现同一个msgId,发送和回调的明细数据插入时间只相差不到1s,这时候也就相应的得到结论,后面taskId为空的数据,覆盖了前面taskId不为空的,就出现了既没新增也没更新的问题;
-
代码没有改动,那就看看是否为配置改动导致的问题;
-
-
相关配置:
改动前: consumerBefore: groupId: *** autoOffsetReset: latest maxPollRecords: 100 autoCommitInterval: 100 enableAutoCommit: false sessionTimeoutMs: 120000 requestTimeoutMs: 180000 改动后: consumerAfter: groupId: *** autoOffsetReset: latest maxPollRecords: 50 autoCommitInterval: 240000 enableAutoCommit: false sessionTimeoutMs: 120000 requestTimeoutMs: 180000 fetch.min.bytes: 64 * 1024 // 64KB最小拉取,减少内存占用 fetch.max.wait.ms: 5000 // 5秒等待时间,减缓拉取频率 receive.buffer.bytes: 128 * 1024 // 128KB接收缓冲区 send.buffer.bytes: 128 * 1024 -
结果分析:
- 主要就是增加了这两个配置fetch.min.bytes: 64 * 1024 和 fetch.max.wait.ms: 5000,这两个配置的作用是每次拉取超过64KB或者间隔时间超过5s,之前没有配,默认是1b和500ms。这也就是为什么之前不会出现一次可以同时拉取到多条msgId相同的数据。给我的教训就是有优化的想法很好,但是一定要对每个参数很熟悉,不然看似没什么改动,实际就会影响整个功能。
- 刚刚忽然又想了一个点,现在的解决方式只是还原了历史逻辑,保证功能没问题。但是如果想做性能上的优化,还是要在批量拉取消息的基础上实现顺序消费。你可以思考一下,就算是我按顺序把同一个msgId的数据写到同一个分区了,但这样就能保证顺序消费吗?这个后面会单独写一篇文章来讨论。
三、结尾
我深怕自己本非美玉,故而不敢加以刻苦琢磨,却又半信自己是块美玉,故又不肯庸庸碌碌,与瓦砾为伍。
更多推荐


所有评论(0)