一、聊聊天
  • ​ 回顾自己工作这五年的时间,能学到知识的时间都是自己意识到确实某一个点比较薄弱,只是停留在会用的阶段,看到很多问题和告警都是束手无策,才下定决心死磕一下。但是为什么平时不学呢,就是因为目标不明确,一边想着通过卷工时就能拿到好的绩效,一边想着每天这么辛苦总归会有成长的。只有到了你去面试的时候才发现,屁都没学会。抱着混年限或者犹豫不决的心态,绝对是自欺欺人,其实你很知道你是什么水平,你只是不敢承认或者不希望有人来戳穿你。所以呢,要明白,工作只给你提供了实践场景,怎么为我所用才是你成长的机会,不要瞻前顾后,也不要怀疑自己。你就去学,每天都花时间去总结。我觉得人还是得有一点盲目自信的那股劲儿,只有你相信了,你才能做到,不管别人相不相信,有想法有计划就勇敢的去做。
二、进入正题
  1. 问题描述:
    最近在做的一个需求是,表迁移,从A库迁到B库,目的是减少数据库/服务的压力。并且发现原来的代码也要相应的迁移到serviceB服务会合适(从服务职能和责任划分来说,不应该放在之前的serivceA)。本来很简单的cv处理。但是我发现三方回调的时候qps会很高,所以想着在迁移代码的时候做了优化,修改一下kafka的配置。改了之后第一天用的好好的,第二天发现明明已经发出去3条消息,但是数据库里却只有1条或2条。

  2. 流程图:

    ServiceA (消息发送服务) 三方服务API Kafka TopicA (消息明细队列) ServiceB (数据持久化服务) Database (消息明细表) ServiceC (回调接收服务) 阶段1: 消息发送与初始化 1. 调用三方API发送消息 2. 返回发送结果(成功) 3. 发布消息发送明细(初始状态) 消息内容: - messageId - userId - content - status: SENT - timestamp 4. 消费消息 5. 插入发送明细记录 初始状态: status = SENT 6. 确认消费(commit offset) 阶段2: 三方回调与状态更新 7. 异步回调(消息投递结果) 回调数据: - messageId - deliveryStatus - deliveryTime - errorCode(如有) 8. 数据校验与组装 组装更新数据: - 批量处理 - 数据校验 - 状态映射 9. 批量发布更新消息到TopicA 更新消息: - messageId - status: DELIVERED/FAILED - deliveryTime - errorInfo 10. 消费更新消息 11. 更新消息状态 更新字段: status = DELIVERED delivery_time update_time 12. 确认消费(commit offset) 流程完成 ServiceA (消息发送服务) 三方服务API Kafka TopicA (消息明细队列) ServiceB (数据持久化服务) Database (消息明细表) ServiceC (回调接收服务)
  3. 问题分析:
    因为前面刚好学过了一些Kafka数据丢失的几种可能的原因,想着正好实践一下,简单说一下kafka可能丢失数据的情况

    1. producer端发送到broker的时候,因为acks设置的为0,可能出现发送消息失败;
    2. broker端leader和follower副本之间数据同步的时候,leader挂了导致数据丢失;
    3. 副本之间HW同步有延迟,leader挂了导致HW没有更新,造成数据丢失;
    4. 操作系统的pageCache写入到磁盘有问题,造成数据丢失;
  4. 问题定位:

    1. 查看了kafka平台的监测工具,发现写入消息没问题,也没有出现消息积压和消费延迟,排除写入的问题;

    2. 消费者端增加日志,发现也都拉取到了,在这里基本上就可以断定kafka没有丢数据;

    3. 那么问题来了,为什么kafka没丢数据,我写入到数据库里就没有了。这时候去看了一下具体的代码逻辑,大致如下

      consume(List<Record>) {
        1.根据Record中的messageId分组得到Map<String, Record> msgMap;
        注:Collectors.toMap(msgId, record -> record, (o1, o2) -> o2)
        2.循环msgMap
        3.如果msgId和Record.taskId都不为空,查询msgId和Record.taskId是否已经在数据库中,存在则更新,不存在则新增:
        3.如果taskId为空,说明是三方回调,则直接更新;
      }
      
    4. 看了上面这段代码,在功能上基本没问题。可能丢数据的情况就是如果taskId为空,但是数据库中不存在,则既不会新增也不会更新;

    5. 有了这个思路之后,又去增加了日志,发现msgMap中的数据有三条,但是两个Record下的taskId为空,定位到问题所在;

    6. 又去平台上看了kafka的消息,发现同一个msgId,发送和回调的明细数据插入时间只相差不到1s,这时候也就相应的得到结论,后面taskId为空的数据,覆盖了前面taskId不为空的,就出现了既没新增也没更新的问题;

    7. 代码没有改动,那就看看是否为配置改动导致的问题;

  5. 相关配置:

    改动前:
      consumerBefore:
        groupId: ***
        autoOffsetReset: latest
        maxPollRecords: 100
        autoCommitInterval: 100
        enableAutoCommit: false
        sessionTimeoutMs: 120000
        requestTimeoutMs: 180000
        
    改动后:
    	consumerAfter:
        groupId: ***
        autoOffsetReset: latest
        maxPollRecords: 50
        autoCommitInterval: 240000
        enableAutoCommit: false
        sessionTimeoutMs: 120000
        requestTimeoutMs: 180000
        fetch.min.bytes: 64 * 1024 // 64KB最小拉取,减少内存占用
        fetch.max.wait.ms: 5000 // 5秒等待时间,减缓拉取频率
        receive.buffer.bytes: 128 * 1024 // 128KB接收缓冲区
        send.buffer.bytes: 128 * 1024
    
  6. 结果分析:

    1. 主要就是增加了这两个配置fetch.min.bytes: 64 * 1024 和 fetch.max.wait.ms: 5000,这两个配置的作用是每次拉取超过64KB或者间隔时间超过5s,之前没有配,默认是1b和500ms。这也就是为什么之前不会出现一次可以同时拉取到多条msgId相同的数据。给我的教训就是有优化的想法很好,但是一定要对每个参数很熟悉,不然看似没什么改动,实际就会影响整个功能。
    2. 刚刚忽然又想了一个点,现在的解决方式只是还原了历史逻辑,保证功能没问题。但是如果想做性能上的优化,还是要在批量拉取消息的基础上实现顺序消费。你可以思考一下,就算是我按顺序把同一个msgId的数据写到同一个分区了,但这样就能保证顺序消费吗?这个后面会单独写一篇文章来讨论。

三、结尾

​ 我深怕自己本非美玉,故而不敢加以刻苦琢磨,却又半信自己是块美玉,故又不肯庸庸碌碌,与瓦砾为伍。

Logo

一座年轻的奋斗人之城,一个温馨的开发者之家。在这里,代码改变人生,开发创造未来!

更多推荐