kafka创建topic_使用Spring Boot开发Kafka的生产者和消费者,并理解Kafka Consumer Rebalance...
前面我们在k8s上部署了一个kafka集群,同时也部署了kafka-manager对kafka集群进行监控管理。本篇将先基于spring boot框架编写一个简单的kafka的生产者和消费者的程序,并在此基础上理解kafka的相关组件。1、使用Spring Boot开发Kafka的Producer和Consumer从https://start.spring.io/创建一个名称为kafk...
·
前面我们在k8s上部署了一个kafka集群,同时也部署了kafka-manager对kafka集群进行监控管理。
本篇将先基于spring boot框架编写一个简单的kafka的生产者和消费者的程序,并在此基础上理解kafka的相关组件。
1、使用Spring Boot开发Kafka的Producer和Consumer
从 https://start.spring.io/ 创建一个名称为kafka-demo的spring boot工程,这里基于的是Spring Boot 2.1.6,依赖中选择添加Spring for Apache Kafka。 项目创建完毕后,首先需要在spring boot的配置文件application.yml通过配置spring.kafka.bootstrap-servers指定kafka代理地址。spring: kafka: bootstrap-servers: kafka.kafka.svc.cluster.local:9092 consumer.group-id: myGroup
接下来在Spring boot配置文件application.yml通过配置spring.kafka.bootstrap-servers指定访问kafa在k8s中Service的地址:
上面的配置指定了bootstrap-servers为kafka.kafka.svc.cluster.local:9092,这就要求如果是本地调试的话需要本地开发网络和k8s集群内部网络互通,否则只能将Spring Boot程序构建成docker镜像并部署到k8s集群内才可进行测试。
上面的配置同时还配置了Consumer的group-id为myGroup。
Spring Boot已经对Kafka的支持做了开箱即用,从生成的项目中可以可以看到引入的依赖如下:
dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.kafka:spring-kafka-test'}
使用kafka-manager在Kafka上创建一个名称为testTopic的Topic,分区数(Partitions)为3,副本数(Replication Factor)为3。
简单起见我们直接使用创建项目时生成的KafkademoApplicationTests作为Producer,加入Producer代码:
package com.example.kafkademo;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)@SpringBootTestpublic class KafkademoApplicationTests { @Autowired private KafkaTemplate kafkaTemplate; @Test public void producer() { kafkaTemplate.send("testTopic", "msgValue"); }}
运行上面的单元测试,如果前面配置没有问题的话,就可以向testTopic中发送了一条消息。
KafkaTemplate是Spring Boot自动配置的,直接使用即可。
简单起见,在KafkademoApplication中添加Consumer代码:
package com.example.kafkademo;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.kafka.annotation.KafkaListener;@SpringBootApplicationpublic class KafkademoApplication { public static void main(String[] args) { SpringApplication.run(KafkademoApplication.class, args); } @KafkaListener(topics = "testTopic") public void processMessage(String content) { System.out.println("Recieved msg :" + content); }}
消费者程序KafkademoApplication启动后,会立即消费之前生产者投递的消息,并打印Recieved msg : msgValue。
2、理解Kafka集群的组成
前面我们在Kubernetes上部署了一个3节点的Kafka集群和监控和管理工具kafka-manager,并使用Spring Boot编写了简单的生产者和消费者程序。 下面一起来理解一下目前出现的一些与kafka相关的组件:2.1 Broker
Broker: Broker是kafa的服务端进程。 一个kafka的服务端由多个Broker组成。 前面使用helm部署的kafka集群,Broker以Statefulset的形式运行:kubectl get pod -n kafka -l app=kafkaNAME READY STATUS RESTARTS kafka-0 1/1 Running 0 kafka-1 1/1 Running 0 kafka-2 1/1 Running 0
即一个Kafka集群由多个Broker组成,多个Broker会被部署到不同的机器上(这里通过k8s StatefulSet实现)以实现高可用。
Broker作为Kafka的服务端处理客户端发送的请求,同时对消息进行持久化。
2.2 Topic
Topic是发布订阅的主题,可以针对具体业务创建一个或多个Topic。 Topic相对于Kafka集群可以理解为一个”逻辑整体“,Topic会被分区(Partition),即数据会被分成多份,不同的分区可分布在不同的Broker上,每个分区(Partition)还可被创建副本(Replication)。 例如前面我们使用kafka-managerKafka上创建一个名称为testTopic的Topic,分区数(Partitions)为3,副本数(Replication Factor)为3。2.3 客户端 - Producer
Producer是向Topic发送消息的客户端程序。 前面例子中我们使用KafkademoApplicationTests作为生成者,向testTopic发送了值为msgValue的消息: kafkaTemplate.send("testTopic", "msgValue");。 Producer向Topic发送的消息只会被发送到Topic的一个分区中,而每个分区即成为一组有序的消息,所以分区里需要有一个编号,从0开始,被成为分区位移。2.4 客户端 - 生产者
Consumer是订阅Topic,并消费消息的客户端程序。 前面例子中的Producer程序如下:@KafkaListener(topics = "testTopic")public void processMessage(String content) { System.out.println("Recieved msg :" + content);}
2.5 Consumer Group和Consumer Rebalance
在Kafka中多个Consumer可以组成一个组来消费多个主题。 这些主题中的每个分区只会被Consumer Group内的一个Consumer实例消费。 但是一个Consumer可以从多个分区中消费消息。 前面例子中我们配置了consumer.group-id为myGroup,所以每启动一个kafka-demo这个spring boot应用的实例都将作为一个Consumer实例被加入到myGroup这个Consumer Group中。 前面我们在启动kafka-demo的时候,Spring Boot的日志中会打印partitions assigned: [testTopic-0, testTopic-1, testTopic-2],可以看出只启动一个Consumer实例的时候,testTopic的3个分区都被分配给了这个Consumer实例。 下面来做个试验:- 启动第一个kafka-demo实例,testTopic的分区都被分配给了这个实例,partitions assigned: [testTopic-0, testTopic-1, testTopic-2]。
- 继续启动第二个kafka-demo实例,此时testTopic的3个分区会被2个Consumer实例重新分配,第二个实例被分配了0和1两个分区,partitions assigned: [testTopic-0, testTopic-1],前面启动的第一个实例被分配了2这个分区partitions assigned: [testTopic-2]
- 继续启动第三个kafka-demo实例,此时testTopic的3个分区被3个Consumer实例重新分配,每个Consumer实例个分配到一个分区。
更多推荐
已为社区贡献1条内容
所有评论(0)