kafka不通过consumer group来消费某个topic
这是偶然发现的不使用consumer group来消费应用程序自动水平扩展的时候(比如k8s下某个应用程序的pod负载大的时候), 每个节点都能消费指定topic的所有partition的数据下面这段go代码其实脱胎于sarama的官方例子, 消费最新的数据, 因为一个应用程序节点拉起来的时候也只需要消费最新的数据package mainimport ("fmt""log""sync""time"
·
-
这是偶然发现的
-
不使用
consumer group
来消费,见stackoverflow, 《kafka权威指南》里也有提及 -
应用程序自动水平扩展的时候(比如k8s下某个应用程序的pod负载大的时候), 每个节点都能消费指定topic的所有partition的数据
-
下面这段go代码其实脱胎于sarama的官方例子, 消费最新的数据, 因为一个应用程序节点拉起来的时候也只需要消费最新的数据
package main import ( "fmt" "log" "sync" "time" ) import "github.com/Shopify/sarama" //var kafkaConsumer sarama.Consumer func main(){ config := sarama.NewConfig() kafkaConsumer, err := sarama.NewConsumer([]string{"192.168.198.130:9092"}, config) if err != nil { log.Printf("创建消费者失败! 错误: %v", err) return } partitions, err := kafkaConsumer.Partitions("main") if err != nil { log.Printf("获取消费者partitions列表失败! 错误: %v", err) return } log.Printf("Kafka 正在等待消息...共:%v个分区", len(partitions)) wg := sync.WaitGroup{} wg.Add(len(partitions)) for _, partition := range partitions { pc, err := kafkaConsumer.ConsumePartition("main", partition, sarama.OffsetNewest) if err != nil { log.Printf("绑定消费者到分区:%v失败! 错误: %v", err, partition) continue } //defer pc.AsyncClose() go func(_pc sarama.PartitionConsumer) { for msg := range pc.Messages() { fmt.Printf("接收时间:%s, 消息:%s, 分区:%d, 消息时间: `%s`, 消息的topic:`%s` \n",time.Now().Format("2006-01-02 15:04:05"), string(msg.Value), msg.Partition,msg.Timestamp,msg.Topic) time.Sleep(time.Second*2) // 故意延迟 } defer wg.Done() }(pc) } wg.Wait() }
-
验证
-
producer
端指定的topic有多个分区root@root:/opt/kafka# bin/kafka-topics.sh --describe --topic main --bootstrap-server 192.168.198.130:9092 Topic: main PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: main Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: main Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: main Partition: 2 Leader: 0 Replicas: 0 Isr: 0
-
consumer
端起多个进程 -
某个进程故意每次消费后
time.Sleep(time.Second*2)
,其他进程没有sleep -
不会漏掉消息,但是那个
sleep
的进程消费的顺序好像有些不一致- 进程1(
sleep 2
的进程) - 进程2
- 进程3
- 进程1(
-
更多推荐
所有评论(0)