kafka控制器,复制与存储小结
【README】1,本文主要总结kafka复制,存储细节;2,本文的kafka集群版本是3.0.0, 有3个broker,分别是 centos201, centos202, centos203 对应的brokerid为 1, 2, 3 ;【1】kafka内部原理【1.1】broker-消息中心点1)broker:一个独立的kafka服务器节点;也称为发送消息的中心点;kafka使用zk维护集群成员
【README】
- 1,本文主要总结kafka复制,存储细节;
- 2,本文的kafka集群版本是3.0.0, 有3个broker,分别是 centos201, centos202, centos203 对应的brokerid为 1, 2, 3 ;
【1】kafka内部原理
【1.1】broker-消息中心点
1)broker:一个独立的kafka服务器节点;也称为发送消息的中心点;
- kafka使用zk维护集群成员关系;
- 每个broker都有自己的id存储在zk;broker启动时,创建zk节点把自己id注册到zk;
2)zk存储的kafka集群信息的节点列表
# zk存储的kafka集群信息的节点
[zk: localhost:2181(CONNECTED) 1] ls /
[cluster,
controller_epoch,
controller,
brokers,
zookeeper,
feature,
admin,
isr_change_notification,
consumers,
log_dir_event_notification,
latest_producer_id_block,
config]
查看zk中的 broker id
# 查看kafka brokerid 和 topic
[zk: localhost:2181(CONNECTED) 2] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/ids
[1, 2, 3]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[hello04, hello05, hello02, hello03, hello01, hello10, __consumer_offsets]
【1.2】控制器
1)控制器定义:集群里第一个启动的broker通过在zk创建临时节点 /controller 让自己成为控制器;
其他broker也尝试创建 controller 节点,若已存在,则报错;其他 broker 会在控制器节点上创建 zk watch 对象,这样非控制器节点可以收到控制器节点状态变更的通知;(干货——这种方式可以确保一个集群只能有一个控制器存在,防止脑裂问题)
2)控制器选举策略:一旦控制器被关闭或与zk断开,其他broker通过watch对象就会收到控制器消失的通知,这些 非控制器broker 会竞争在 zk 上创建 controller节点,谁最先创建成功,谁就是集群控制器; 然后其他broker在控制器节点上创建 zk watch对象;
- 2.1)每次控制器选举后: 控制器纪元值(时代值)controller_epoch 都会递增;其他broker若收到控制器发出的包含旧 epoch 的消息,就会忽略;
3)控制器实验
step1) 查看 控制器和控制器纪元
[zk: localhost:2181(CONNECTED) 5] get /controller_epoch
6
[zk: localhost:2181(CONNECTED) 6] get /controller
{"version":1,"brokerid":1,"timestamp":"1638692039821"}
显然, epoch是6,控制器是broker1;
step2)停止掉 broker1;
这个时候,broker2,3 会竞争选举为控制器;我们再次查看控制器,发现控制器现在是broker2了;且 epoch自增为7;
[zk: localhost:2181(CONNECTED) 7] get /controller_epoch
7
[zk: localhost:2181(CONNECTED) 8] get /controller
{"version":1,"brokerid":2,"timestamp":"1638733315396"}
4)控制器作用
- 控制器负责在broker加入或离开时进行分区首领选举;
- 控制器使用 epoch 避免脑裂问题;
【补充】脑裂指两个节点同时认为自己是集群控制器;
5)zk的作用
- kafka使用zk的临时节点来选举控制器;
- zk在broker加入或退出集群时通知控制器;
【1.3】复制
复制功能是kafka架构的核心;在kafka 文档里,kafka把自己描述为 一个分布式的,可分区的,可复制的提交日志服务;(kakfa的日志就是数据或消息);
【1.3.1】副本
1)数据存储
kafka使用主题来组织数据(逻辑);使用分区为单位来读写数据(物理);
为什么说kakfa以分区为单位读写? 是因为我们创建带有分区数和副本数的主题后, kakfa会创建以这个分区命名的文件夹,分区文件夹下存储消息内容,索引文件等;
2)主题,分区,副本关系
- 1个主题对应多个分区;
- 1个分区对应多个副本;
- 1个副本对应多个分段文件;(分段存储)
3)副本类型
- 3.1)首领副本:每个分区都有一个首领副本,消息读写首先会操作首领副本;
- 3.2)跟随者副本:首领副本以外的副本;它们不处理读写请求,唯一任务是从首领副本复制消息,与首领保持数据同步;如果首领发生崩溃,其中一个同步的跟随者副本被提升为首领副本;
补充1:跟随者副本在成为不同步副本前的时间是通过 replica.lag.time.max.ms 来配置;
补充2:跟随者从首领副本复制消息时的请求,与消费者从首领副本消费消息时发出的请求是一样的;
【1.4】处理请求
1)broker处理请求过程
- step1)broker会在监听端口上运行一个 Acceptor线程(可以理解为服务器套接字 ServerSocket),这个线程会创建一个连接(类似ServerSocket.accept() 方法),把请求交给 Processor线程(网络线程)去处理;
- step2)Processor线程从客户端获取请求消息,把它放进请求队列,然后从响应队列获取响应结果并发送给客户端;
- step3) 在请求被放入请求队列后, IO线程会处理它们,并把处理结果放入 响应队列;
2)常见请求类型
- 生产请求:生产者发送的请求,包含要写入的消息;
- 获取请求:消费者或跟随者副本所在broker需要从首领副本所在broker获取消息而发送的请求;
【注意】
- 生产请求和获取请求都必须发送给分区的首领副本,跟随者副本不参与消息读写,仅做备份和支持集群高可用;
- kafka客户端要自己负责把生产请求和获取请求发送到正确的broker上;否则broker会返回错误响应;
3)客户端怎么知道请求发送到哪里呢?
3.1)客户端在发送请求前,先发送元数据请求;
- 这种请求的响应结果包括 主题,主题分区,分区副本以及首领副本;
3.2)客户端会缓存这些元数据信息;
- 获取元数据信息后,会直接往对应的 broker发送请求和获取请求;
- 当然,客户端需要定时刷新元数据缓存; 刷新时间间隔通过 metadata.max.age.ms 来配置;
【1.4.1】生产请求
1)生产者acks有3个值;
- acks=0 ; 生产者在发送消息后,默认发送成功;而不会等待服务器响应;
- acks=1 ; 只要集群的首领节点收到消息,生产者就会收到发送成功的响应;而不管副本节点是否收到消息;
- acks=all; 需要集群的首领节点和跟随节点(副本节点)都收到消息后,生产者才会收到发送成功的响应;
2)首领副本所在broker收到生产请求后,会对请求做一些验证:
- 发送数据的用户是否有写入权限;
- acks的值是否合法; (只允许出现 0, 1, all);
- 根据acks的值,进行副本复制策略;
【1.4.2】获取请求
1)首领副本所在broker收到获取请求后,会根据客户端指定的请求偏移量从分区里读取消息;
2)kafka使用 零复制技术 向客户端发送消息,即kafka直接把消息从文件发送到网络通道,而不经过任何中间缓冲区;(干货——这是kakfa与大部分数据库不一样的地方,其他数据库在把数据发送到客户端前,会把数据保存到本地缓存)
- 零复制技术优点:避免了字节复制,也不需要管理内存缓冲区,从而获取更好性能;
3)消费者客户端只能读取已经被写入所有同步副本的消息,而不是所有消息
- 因为还没有被足够多副本复制的消息被认为是不安全的;如果首领副本所在broker发送崩溃,另一副本成为新首领,那这些不安全的消息就会丢失;
4)扩展 ISR, HW高水位
- ISR, In-Sync-Replica set, 同步副本集合,即所有与首领副本保持同步的副本集合;
- LEO,log end offset,日志末端偏移量 ,即副本写入下一条消息的位移值;
- HW高水位,High Watermark, 所有副本最小的LEO;
小结: 消费者只能看到已经复制所有副本的消息;
5)在 Kafka 中,高水位的作用主要有 2 个。
- 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
- 帮助 Kafka 完成副本同步。
6)下面这张图展示了多个与高水位相关的 Kafka 术语 。
我们假设这是某个分区 Leader副本的高水位图。
1)首先,请你注意图中的“已提交消息”和“未提交消息”。在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。消费者只能消费已提交消息,即图中位移小于 8 的所有消息。另外,需要关注的是,位移值等于高水位的消息也属于未提交消息。也就是说,高水位上的消息是不能被消费者消费的。
2)图中还有一个日志末端位移的概念,即 Log End Offset,简写是 LEO。
它表示副本写入下一条消息的位移值。注意,数字 15 所在的方框是虚线,这就说明,这个副本当前只有 15 条消息,位移值是从 0 到 14,下一条新消息的位移是 15。显然,介于高水位和 LEO 之间的消息就属于未提交消息。这也从侧面告诉了我们一个重要的事实,那就是:同一个副本对象,其高水位值不会大于 LEO 值。
【高水位小结】高水位和 LEO 是副本对象的两个重要属性
- Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是 Leader 副本。
- Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。
【1.4.3】其他请求
- OffsetCommitRequest, 偏移量提交请求;
- OffsetFetchRequest;
- ListOffsetsRequest;
【1.5】物理存储
1)kafka的基本存储单元是分区; 分区会在所属broker上的kafka数据根目录下新建名为分区名的文件夹,如 hello04-2(主题为hello04的2号分区文件夹),kafka数据根目录由 server.properties 中的 log.dirs 来指定;
2)主题,分区,副本关系
- 1个主题对应多个分区;
- 1个分区对应多个副本;
- 1个副本对应多个分段文件;(分段存储)
【1.5.1】分区分配
1)创建指定分区和副本数的topic来做实验
# 创建分区数3副本数2的主题
kafka-topics.sh --bootstrap-server centos201:9092
--create --topic hello11 --partitions 3 --replication-factor 2
# 副本数量必须小于等于broker数量,但分区数没有这个限制;
查看分区详情
[root@centos201 hello04-1]# kafka-topics.sh --bootstrap-server centos201:9092 \
--describe --topic hello11
Topic: hello11 TopicId: IliU_BDeS8ycreLufxCMMw PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1024
Topic: hello11 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: hello11 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: hello11 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
查看具体存储数据的文件夹,以broker1为例;
根据topic详情,我们知道 broker1 存储了topic hello11的1号和2号分区; 且它是2号分区首领所在的broker ;
进入 broker1的kafka数据根目录,
进入其中一个分区文件夹查看 hello11-1 ,如下:
再查看分区文件夹前,我们先写入10条消息; 指定topic hello11, 1号分区;
for (int i = 0; i < 10; i++) {
Future<RecordMetadata> future = producer.send(
new ProducerRecord<String, String>("hello11", 1,"", String.format("[%s] ", order++) + now + " > " + DataFactory.INSTANCE.genOneHundred()));
try {
System.out.println("[生产者] " + future.get().partition() + "-" + future.get().offset());
} catch (Exception e) {
e.printStackTrace();
}
}
查看分区文件夹下的文件 ;
2) kafka的分段存储;
因为在一个大文件里查找和删除消息很耗时;所以把一个分区分成若干片段进行存储;默认情况下,一个片段存储1g数据,为了实验,这里我修改为 1k,可以在 server.properties文件中设置 log.segment.bytes=1024 来实现;
3)kafka的稀疏索引
- kafka并没有对每条消息建立索引,那样太大了,而是采用稀疏索引(稀疏存储)的方式,即一条索引记录指向一个消息范围;
例如: 索引值 1~100 指向 数据文件1.log中的消息1到消息100的消息范围的起始地址;
refer2 Apache Kafka ;
当消费者指定消费某个offset记录时, kafka集群通过二分查找从索引文件找出包含offset的索引值,通过索引值找到对应数据文件的起始地址,然后从起始地址开始顺序读取对应offset的消息;
【1.5.2】文件格式
1)kafka 使用零复制技术给消费者发送消息,避免了对生产者已经压缩过的消息进行解压和再压缩;
2)普通消息与压缩消息格式
可以看出,多个压缩消息共用同一个消息头,从而减少消息大小;
【References】
- kafka权威指南;
- Apache Kafka
更多推荐
所有评论(0)