• 这是偶然发现的

  • 不使用consumer group来消费,见stackoverflow, 《kafka权威指南》里也有提及
    在这里插入图片描述

  • 应用程序自动水平扩展的时候(比如k8s下某个应用程序的pod负载大的时候), 每个节点都能消费指定topic的所有partition的数据

  • 下面这段go代码其实脱胎于sarama的官方例子, 消费最新的数据, 因为一个应用程序节点拉起来的时候也只需要消费最新的数据

    package main
    
    import (
    	"fmt"
    	"log"
    	"sync"
    	"time"
    )
    import "github.com/Shopify/sarama"
    //var kafkaConsumer sarama.Consumer
    
    func main(){
    	config := sarama.NewConfig()
    	kafkaConsumer, err := sarama.NewConsumer([]string{"192.168.198.130:9092"}, config)
    	if err != nil {
    		log.Printf("创建消费者失败! 错误: %v", err)
    		return
    	}
    	partitions, err := kafkaConsumer.Partitions("main")
    	if err != nil {
    		log.Printf("获取消费者partitions列表失败! 错误: %v", err)
    		return
    	}
    	log.Printf("Kafka 正在等待消息...共:%v个分区", len(partitions))
    	wg := sync.WaitGroup{}
    	wg.Add(len(partitions))
    	for _, partition := range partitions {
    		pc, err := kafkaConsumer.ConsumePartition("main", partition, sarama.OffsetNewest)
    		if err != nil {
    			log.Printf("绑定消费者到分区:%v失败! 错误: %v", err, partition)
    			continue
    		}
    
    		//defer pc.AsyncClose()
    		go func(_pc sarama.PartitionConsumer) {
    			for msg := range pc.Messages() {
    				fmt.Printf("接收时间:%s, 消息:%s, 分区:%d, 消息时间: `%s`, 消息的topic:`%s` \n",time.Now().Format("2006-01-02 15:04:05"), string(msg.Value), msg.Partition,msg.Timestamp,msg.Topic)
    				time.Sleep(time.Second*2) // 故意延迟
    			}
    			defer wg.Done()
    		}(pc)
    
    	}
    	wg.Wait()
    }
    
  • 验证

    • producer端指定的topic有多个分区

      root@root:/opt/kafka# bin/kafka-topics.sh --describe --topic main --bootstrap-server 192.168.198.130:9092
      Topic: main	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824
      Topic: main	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
      Topic: main	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
      Topic: main	Partition: 2	Leader: 0	Replicas: 0	Isr: 0
      
    • consumer端起多个进程

    • 某个进程故意每次消费后time.Sleep(time.Second*2),其他进程没有sleep

    • 不会漏掉消息,但是那个sleep的进程消费的顺序好像有些不一致

      • 进程1(sleep 2的进程)
        在这里插入图片描述
      • 进程2
        在这里插入图片描述
      • 进程3
        在这里插入图片描述
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐