Zookeeper学习8_关于ZookeeperConsumerConnector#rebalance
consumer的负载均衡,每一个partition只会同时被consumer group里面的一台机器消费,称为owner,owner分配的逻辑,1. For each topic T that Ci subscribes to 2. let PT be all partitions producing topic T3. let CG be all consumers in
·
consumer的负载均衡,每一个partition只会同时被consumer group里面的一台机器消费,称为owner,owner分配的逻辑,
1. For each topic T that Ci subscribes to
2. let PT be all partitions producing topic T
3. let CG be all consumers in the same group as Ci that consume topic T
4. sort PT (so partitions on the same broker are clustered together)
5. sort CG
6. let i be the index position of Ci in CG and let N = size(PT)/size(CG)
7. assign partitions from i*N to (i+1)*N - 1 to consumer Ci // i*N to i*N+N-1
8. remove current entries owned by Ci from the partition owner registry
9. add newly assigned partitions to the partition owner registry
(we may need to re-try this until the original partition owner releases its ownership)
具体实现在ZookeeperConsumerConnector#rebalance,
最后当然也是存储在ZK上了,
1. For each topic T that Ci subscribes to
2. let PT be all partitions producing topic T
3. let CG be all consumers in the same group as Ci that consume topic T
4. sort PT (so partitions on the same broker are clustered together)
5. sort CG
6. let i be the index position of Ci in CG and let N = size(PT)/size(CG)
7. assign partitions from i*N to (i+1)*N - 1 to consumer Ci // i*N to i*N+N-1
8. remove current entries owned by Ci from the partition owner registry
9. add newly assigned partitions to the partition owner registry
(we may need to re-try this until the original partition owner releases its ownership)
具体实现在ZookeeperConsumerConnector#rebalance,
for (consumerThreadId <- consumerThreadIdSet) {
val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
assert(myConsumerPosition >= 0)
val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
/**
* Range-partition the sorted partitions to consumers for better locality.
* The first few consumers pick up an extra partition, if any.
*/
if (nParts <= 0)
warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
else {
for (i <- startPart until startPart + nParts) {
val partition = curPartitions(i)
info(consumerThreadId + " attempting to claim partition " + partition)
// 每个partition只会由一条thread消费,不会有并发的问题
addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
// record the partition ownership decision
partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
}
}
}
最后当然也是存储在ZK上了,
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)
资料节选自:http://www.sjsjw.com/104/000355MYM018196/
在使用过程中,遇到了一个错误,如标红的地方所示,故查找一下原因。
更多推荐
已为社区贡献3条内容
所有评论(0)