consumer的负载均衡,每一个partition只会同时被consumer group里面的一台机器消费,称为owner,owner分配的逻辑,
1. For each topic T that Ci subscribes to 
2.   let PT be all partitions producing topic T
3.   let CG be all consumers in the same group as Ci that consume topic T
4.   sort PT (so partitions on the same broker are clustered together)
5.   sort CG
6.   let i be the index position of Ci in CG and let N = size(PT)/size(CG)
7.   assign partitions from i*N to (i+1)*N - 1 to consumer Ci // i*N to i*N+N-1
8.   remove current entries owned by Ci from the partition owner registry
9.   add newly assigned partitions to the partition owner registry
     (we may need to re-try this until the original partition owner releases its ownership)


具体实现在ZookeeperConsumerConnector#rebalance,
          
            for (consumerThreadId <- consumerThreadIdSet) {
            val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
            assert(myConsumerPosition >= 0)
            val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
            val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)


            /**
             *   Range-partition the sorted partitions to consumers for better locality.
             *  The first few consumers pick up an extra partition, if any.
             */
            if (nParts <= 0)
              warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
            else {
              for (i <- startPart until startPart + nParts) {
                val partition = curPartitions(i)
                info(consumerThreadId + " attempting to claim partition " + partition)


                // 每个partition只会由一条thread消费,不会有并发的问题
                addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)


                // record the partition ownership decision
                partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
              }
            }
          }




最后当然也是存储在ZK上了,

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)


资料节选自:http://www.sjsjw.com/104/000355MYM018196/


在使用过程中,遇到了一个错误,如标红的地方所示,故查找一下原因。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐