kafka入门到精通
kafka,java,zk
文章整理自:尚硅谷kafka教程
一、kafka概述?
1.定义
kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。
1.2消息队列
1.2.1 传统消息队列的使用场景
1.2.2 消息队列好处
1.解耦
允许程序独立的扩展或两边的处理过程,只要确保它们遵守相同的接口约束
2.可恢复性
*系统一部分组件失效时,不会影响整个系统。消息队列降低了进程间的耦合度,所以即使一个消息处理进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理
3.缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息远大于消费消息处理速度不一致的问题
4.灵活性(kafka服务扩缩容)&削峰处理
在访问量突增的情况下,应用仍然要继续发挥作用,但这样的突发流量并不常见。
如果因为以处理这类峰值的标准来部署应用则会造成巨大的资源浪费。使用消息队列能够使关键应用顶住突发访问压力,而不会因为突发的超负荷请求使整个服务崩溃。
5.异步通信
很多情况,用户也不需要立即处理消息。消息队列提供了异步处理机制,允许程序把一个消息放入队列,但并不需要立即处理。向消息队列中放入大量消息,在需要的时候程序再去进行处理
1.2.3 消息队列两种模式
点对点模式(一对一,消费者主动拉取数据,消息收到后清除消息)
消息生产者发送消息到queue中,消息消费者从queue中取出消息并消费消息。
消息被消费后,queue中不再存储该消息,所以消费者不可能消费到已经被消费的消息。
queue支持存在多个消费者,但对于一个消息,只会有一个消费者进行消费。
发布/订阅模式(一对多,消费者消费数据后不会清除消息)
消息生产者(发布)将消息发布到topic中,同时有多个消费者(订阅)消费该消息。
和点对点模式不同,发布到topic的消息会被所有订阅者消费
发布订阅模式也有两种:
1.消费者主动拉取;
缺点:消费者不知道topic中是否有消息,需要定时去轮询,比较浪费资源,所以出现了主动推送的模式
2.消息队列主动推送(如微信公众号)
1.3 kafka基础架构
topic:主题,对消息进行分类
partition:分区,主要提高kafka集群负载均衡,同时也提高了并发量
leader:针对于分区,消费者连接kafa只会连接leader
follower:针对于分区,仅仅数据备份,同一个partition的leader和follower一定不在同一台kafka服务上
comsumer goup:消费组,提高消费能力。多个消费者在同一个group时,消费消息时,一个分区的消息只能被同一个消费组的同一个消费者消费;消费者group中消费者的个数等于topic的partition数时,消费能力最合理,group中消费者数量大于partition分区数时,会造成资源浪费,多余消费者依然无法消费到消息。
zookeeper:管理kafa集群信息,存储消费位置信息(0.9版本前);
如:消费者A需要消费topic-partition0的10条消息,在消费到第5条时挂了,这时候消费进度的信息保存在zk里和内存中;
0.9版本后,offset存储在kafka集群的系统级topic中(默认存储磁盘7天),有kafka集群维护,主动拉取时高并发情况下对zk访问压力较大。
二、kafka快速入门
1.1使用docker-compose安装kafka
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.58.100 # 不能通过hostname来配置(消费者和生产者识别不到)
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
kafka-manager:
image: sheepkiller/kafka-manager
environment:
ZK_HOSTS: zookeeper:2181
ports:
- "19000:9000"
1.2测试访问kafka-manager
192.168.253.100:19000
出现以上问题 一般是docker-compose.yml文件中的kafka配置ip地址有误
添加kafka节点
1.3 查看kafka版本号
docker exec kafka_compose-kafka-1 find / -name *kafka_* | head -1 | grep -o ‘\kafka[^\n]*’
1.4 查看zookeeper版本号
docker exec kafka_compose-zookeeper-1 pwd
1.5 扩展kafka的broker
启动时指定kafka的broker数量
#端口一致所以只能启动一个broker,可能需要配置swarm网络。未亲测
docker-compose up --scale kafka=3 -d
查看kafka扩展后的容器名称
docker ps;
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0b32ee5a3744 wurstmeister/kafka "start-kafka.sh" 9 minutes ago Up 9 minutes 0.0.0.0:32777->9092/tcp, :::32777->9092/tcp kafka_compose-kafka-4
0a655887b672 wurstmeister/kafka "start-kafka.sh" 9 minutes ago Up 9 minutes 0.0.0.0:32778->9092/tcp, :::32778->9092/tcp kafka_compose-kafka-1
e8fcd6cafa2c sheepkiller/kafka-manager "./start-kafka-manag…" 9 minutes ago Up 9 minutes 0.0.0.0:19000->9000/tcp, :::19000->9000/tcp kafka_compose-kafka-manager-1
402829709dc9 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 9 minutes ago Up 9 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp kafka_compose-zookeeper-1
fbee5d80662b wurstmeister/kafka "start-kafka.sh" 9 minutes ago Up 9 minutes 0.0.0.0:32779->9092/tcp, :::32779->9092/tcp kafka_compose-kafka-2
8c78ffde4538 wurstmeister/kafka "start-kafka.sh" 9 minutes ago Up 9 minutes 0.0.0.0:32780->9092/tcp, :::32780->9092/tcp kafka_compose-kafka-3
1.6 使用kafka
需要进入一个容器
docker exec -it kafka_compose-kafka-1 /bin/sh
创建topic:
kafka-topics.sh --create --topic topic001 --partitions 4 --zookeeper zookeeper:2181 --replication-factor 2
查看topic
#查看一个topic
kafka-topics.sh --list --zookeeper zookeeper:2181 topic001
#查看所有topic
kafka-topics.sh --list --zookeeper zookeeper:2181
删除topic
# 需要配置server.properties的delete.topic.enable=true
kafka-topics.sh --delete --topic topic002 --zookeeper zookeeper:2181
查看刚刚创建的topic的情况,borker和副本情况
kafka-topics.sh --describe --zookeeper zookeeper:2181 topic001
1.7 测试生产者和消费者
(1) 启动消费者客户端
kafka-console-consumer.sh --topic topic001 --bootstrap-server kafka_compose-kafka-1:9092,kafka_compose-kafka-2:9092,kafka_compose-kafka-3:9092,kafka_compose-kafka-4:9092
# --from-beginning 参数会从topic开始的位置消费,如果不指定,不会消费当前时刻之前的信息
启动后控制台不会打印消息,因为没有生产者生产消息。
(2) 启动生产者并且发送消息客户端
kafka-console-producer.sh --topic topic001 --broker-list kafka_compose-kafka-1:9092,kafka_compose-kafka-2:9092,kafka_compose-kafka-3:9092,kafka_compose-kafka-4:9092
现在已经进入了生产消息的命令行模式,输入一些字符串然后回车,再去消费消息的控制台窗口看看,已经有消息打印出来,说明消息的生产和消费都成功了。
创建kafka集群时,需要勾选启动JMX PORT,broker通讯需要用到
1.8 到zk 中查看节点信息,如下
1.安装prettyZoo可视化软件
prettyZoo可视化安装
2.连接并查看zookeeper情况
1.9 kafka群起脚本
#/bin/bash
case #1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo "***********************$1***************************"
ssh $1 "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/mudole/kafka/config/server.properties"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo "***********************$1***************************"
ssh $1 "/opt/module/kafka/bin/kafka-server-stop.sh "
done
};;
esac
kafka无法启动时,主要是看server.log日志
三、kafka架构深入
生产者生产消息时,如主题不存在,默认会创建一个分区和一个副本。在server.properties中进行修改
kafka集群可以保证分区有序,不能保证全局数据有序
1.kafka数据文件的存储
.log文件和.index文件
kafka分区的两个重要文件
0000000000000000.log只存储数据
0000000000000000.index记录消费数据offet
1.log文件默认存储大小1g,超过1个g了,生成新文件讲如何命名?
.index 和 .log 文件的命名规则就是当前文件的最小offset值(偏移量值)
2.怎么快速定位到想要消费的位置?
引入分片和索引机制。
分片规则:log文件1g后会新增分片,每片段都包含一个对应的.log和.index文件
索引机制:index文件中,存储了每条消息的id、起始偏移量和消息的大小。
文件索引原理:index文件通过二分查找找到是哪个消息,通过消息去log文件中找到消息内容
2.kafka生产者
2.1 分区策略
1.分区原因
方便在集群扩展
提高并发,可以以partition为单位进行读写
2.分区原则(3种)
1).在指定partition;直接将数据存放在指定的分区中
2).未指定partition分区时,hash;根据key的值hash后取余topic的分区数,得到存放数据的partition
3).未指定partition分区和key时,随机;第一次调用时随机生成一个整数(后面如果没有指定partition和key时,会在这个整数的基础上自增),根据这个值与topic的partion数量取余得到存放数据的partition。
这是round-robin算法。
2.2 数据可靠性保证
topic的每个partition收到producer发送的数据后,都会向producer发送akc(acknowledgment确认收到),如果producer收到ack,就会发送下一轮数据,否则重复发送
kafka何时发送akc方案:
2.2.1.副本数据同步策略
全同步策略:
5台机器最多容忍4台机器故障,topicA分区下的5个副本(leader+follower)分别位于5台机器上且都是全量数据,5个副本中只要保证1个正常,那么kafka数据就是稳定的。则需要n+1个副本
超半数同步策略:
同样如果要容忍4台机器故障且kafka可用,则至少有一个副本在4台故障后依然是有topicA的全量数据,最坏情况下,这4台故障机器都是半数已经同步的机器,那么为了保证kafka数据稳定且可用需要4*2+1个副本。
kafka最终选择的方案:所有fowoller同步完成后发送ack
1.优点:
解决数据冗余:同样为了容忍n台节点故障,超半数同步策略需要2n+1个副本,而全同步策略只需要n+1个副本,针对kafka的使用场景,每个分区都有大量的数据,第一种方案会造成大量数据冗余
2.缺点:
网络延迟:虽然全量同步方案网络延迟高,但对kafka的影响较小
2.2.2.ISR队列
针对kafka选择全量同步方案时,网络延迟较高的问题,kafka做了优化
ISR队列(in-sync replicats):作用1在生产消息时快速响应、2在leader选举时做了优化
在0.9版本以前:ISR队列中存储的副本取决于两个因素,即同步数据快的follower、同步数据多的follower将会优先存储在ISR队列中,以备leader选举时进行挑选,那些同步慢、同步数据小的副本将会被剔除,不被考虑作为下一任leader
在0.9版本及以后:ISR队列只考同步快的副本进入队列
- 为什么0.9版本后ISR队列取消了follower同步数量的条件?
producer批量发送消息时,如果这个batch数量大于ISR同步的数量,那么久会造成延迟且数据不在范围内,可能就会从ISR中批量剔除副本,会造成频繁的ISR队列进出和zk的写入
现任版本:Leader维护了一个动态的ISR队列,意为和leader保持同步的follower集合。当ISR中的leader存储完producer的消息后,leader会给follower发送ack,如果follower长时间没有从leader同步数据,将会被剔除ISR队列(该时间阈值由replicat.lag.time.max.ms参数设定),一旦收到了ISR队列中所有follower的ACK,该消息就被确认commit了,leader将增加HW并且想producer发送ACK。
leader故障后,会从ISR队列中重新选区leader。
2.2.3 ack应答机制
对于某些不太重要的数据,对数据可靠性不是很高,能够容忍少量数据丢失,所以没有必要等ISR中所有follower全部接收成功。
kafka提供了3中可靠性的级别,用户根据可靠性和延迟的要求进行权衡进行选择
acks参数配置:
acks=0时,producer不等待broker的ack,这种操作延迟最低,broker接收到消息还没写入磁盘就返回,当broker发生故障时有可能丢失数据
acks=1时,producer等待broker的ack,partition的leader落盘成功后返回ack,如果follower在同步成功前leader发生故障有可能丢失数据
acks=-1(all)时,producer等待broker的ack,partition的leader和follower全部落盘成功后才会返回ack。但是如果follower同步完成后,broker返回producer的ack之前,leader发生故障,那么会造成数据重复;当ISR中没有可用的follower时,leader返回ack后此时leader宕机并未将消息同步给其他ISR之外的follower时,就会丢失数据
2.2.4 故障处理细节
LEO:每个副本最大的offset
HW:高水位,是指消费者能见到的最大offset,ISR队列中最小的LEO;保证消费者获取数据一致性;保证副本之间的数据一致性
1.follower故障
follower发生故障后会被临时剔除ISR,等待follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件中高于HW的部分截取掉,从HW开始想leader同步。等该follower的LEO大于等于该topic的HW,即follower追上leader后,就可以重新加入ISR
2.leader故障
leader发生故障后,会从ISR中选举一个新的leader,之后为保证副本之前的一致性,其余的follower会先将各自的log文件中高于HW部分截取掉,然后重新想leader同步数据
这里只能保证副本之间数据的一致性。并不能保证数据不丢失或重复(ack决定)。
2.3Exactly Once语义(精准一次)
将kafkaserver的ack级别设置为-1,可以保证producer到server之间不会丢失数据,即是at least once语义;
相对的将ack级别设置为0,可以保证生产者每条消息只会发送一次,即at most once语义;
at least once可以保证数据不丢失,但是不能保证数据不重复;
at most once可以保证数据不重复,但是不能保证数据不丢失;
但对于一些非常重要的消息,比如说交易数据,下游的消费者要求数据即不能重复也不能丢失,即是Exactly Once语义;
kafka在0.11以前,对此是无能为力,只能保证数据不丢失,在消费者对数据做全局去重。对于多个下游应用的情况下,每个应用都要单独去重,这对性能造成了很大影响。
0.11之后,引入了幂等性;指的是producer无论向server发送多少次重复数据,server端都只会持久化一条。幂等性+at least once = exactly once
3.kafka消费者
3.1消费方式
1.comsumer采用pull的方式从broker拉去数据;pull可以根据消费能力调整消费速率。
2.broker向comsumer push的方式很难适应comsumer,因为push的频率是由broker决定的。push频率过快会comsumer来不及消费,表现为拒绝服务或网络阻塞。
pull不足:kafka没有消息被消费时,会陷入循环中,一直返回空数据,会造成资源浪费。kafka引入了timeout参数,comsumer pull返回空后,在timeout时间之后再去拉去。
3.2分区分配策略(针对group)
一个消费组有多个comsumer,一个topic有多个partition,消费时会涉及到partiion分配问题。
1.round robin
按照消费组来进行轮训分配
使用前提:消费组消费的是一个topic
2.range(默认分区分配策略)
根据topic来进行范围分配,多个主题被消费时,消费者消费数据不对等问题
当comsumer个数发生变化时,都会触发分配策略,重新分配消费分区。
当comsumer个数大于partition个数时,也会触发重新分配,会有多余的comsumer分配不到partition。
3.3offet的维护
comsumer group+topic+partition确定一个offset
3.4消费者组案例
4.kafka高效读写数据
1.os页缓存
2.磁盘顺序写
3.零拷贝
5.zk在kafka中的作用
controller:broker抢占zk中的controller,谁先来谁就是controller,controller是哪个broker无所谓,kafka的数据都是共享的,只是由这个身份为controller的broker来维护与zk的信息
6.kafka事务
kafka从0.11版本开始支持事务。事务可以保证kafka在exactly once语义的基础上,生产者和消费者可以跨分区和会话,要么全部成功,要么全部失败。
6.1producer事务
为了实现跨分区会话的事务,需要引入一个全局的transactionID(producer客户端自己生成),并且producer的pid和tid绑定,这样当producer重启后就可以通过正在进行的tid来获得原来的pid;
为了管理transaction,kafka引入了一个新的组件transaction coordinator。producer就是通过transaction coordinator交互获得TID对应的任务状态;coordinator还负责将事务所有写入kafka内部的一个topic,这样即使整个服务重启,由于事务状态可以保存,进行中的事务状态可以恢复,从而继续运行。
6.2comsumer事务(很少聊)
上述的事务机制主要是从producer方面去考虑,对于comsumer而言,事务的保证相对较弱,尤其是无法保证commit的信息被精确消费。这是由于comsumer可以通过offset访问任何信息,而且segmentFile的生命周期不同,同一事务的消息可能会出现重启后被删除的情况。(如segmnetFile01刚好7天过期,consumer批量消费时,恰好跨了两个segmentFile,重启后发现segmentFile01刚好过期,则无法读取到过期的数据)
四、kafka的API
1.producer API
1.1消息发送流程
kafka的producer消息时异步发送的。在消息发送过程中涉及到了两个线程main和sender,以及线程共享的一个变量RecordAccumulator。main线程将消息发送给RecordAccumulator,sender线程不断从RecordAccumulator中拉去并发送到broker。
相关参数:
batch.size: 只有数据累计到这个值的时候,sender才会发送数据
linger.ms:当batch.size迟迟没有累计到这个值,到了linger.ms时间是,sender也会发送
1.2.异步发送api
1.导入依赖
2.编写代码
需要用到的类:
KafkaProducer:创建一个生产者对象用来发送数据
ProducerConfig:获取所需的一系列参数
ProducerRecord:每一条消息封装成为一个对象
1.3.同步发送api
2.consumer API
3.自定义interceptor
五、kafka监控
kafka eagle
六、flume对接kafka
七、kafka面试题
1.Kafka中的ISR(InSyncRepli)、OSR(OutSyncRepli)、AR(AllRepli)代表什么?
ISR:副本同步队列,速率和leader相差小于10秒的follower集合
OSR:非副本同步队列,速率和leader相差大于10秒的follower集合
AR:所有分区的follower,AR=ISR+OSR
2.Kafka 中的 HW、LEO 等分别代表什么?
HW:高水位,根据同一分区中,最低的LEO所决定,是消费者能见可消费的数据
LEO: 每个副本最高的offset
在leader、follower节点故障后,partition会对HW和每个副本的LEO进行调整
3.Kafka的用途有哪些?使用场景如何?
1.用户追踪:根据用户在web或app上的操作,将这些操作记录到topic中,消费者订阅这些消息做实时的分析和数据挖掘
2.日志收集:通过kafka对各个服务的日志进行收集,再开放给comsumer
3.系统消息:缓存消息
运营指标:记录运营监控数据,搜集操作应用数据的集中反馈,如报错和报告
4.Kafka中是怎么体现消息顺序性的?
每个分区内,每条消息都有offset,所以只能在同一分区内有序;不同的分区无法做到消息的顺序性
5.“消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?
是。超过分区数的消费者是接受不到数据的,只要有消费者接入 就会触发分区分配策略
6.有哪些情形会造成重复消费?或丢失信息?
重复消费:先处理业务后提交offset,可能会造成重复消费
丢失信息:先提交offset再处理业务,可能会造成信息丢失
7.Kafka 分区的目的?
对kafka集群来说,分区做到负载均衡;对于消费者来说,可以提高并发度,提高读取效率
8.Kafka 的高可靠性是怎么实现的?
为了实现高可靠性,kafka使用了订阅模式,并且使用ISR和ack应答机制
能进入ISR中的follower和leadeer之间同步速率相差小于10秒
当ack=0时, producr不等待broker的ack,不管数据有没有写成,都不再重发这个数据
当ack=1时,broker会等leader写完数据后想producer发送ack,但不会等follower同步数据;在follower数据未同步前,leader挂掉,producer会再次发送新的消息到新的leader中,old的leader未同步的消息就会丢失
当ack=all(-1)时,broker会等到leader和isr中的所有follower都同步完成后再想producer发送ack;当follower数据同步完成返回producer ack前,leader挂掉,producer数据重发就会造成数据重复。
9.topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?
可以增加
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3
10.topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
不可以,现有的分区数据难以处理
11.简述Kafka的日志目录结构?
每一个分区对应一个文件夹,命名为topic-0或topic-1,,每个文件夹内有.index文件和.log文件;
.log文件存储数据
.index文件存储.log文件的数据id,起始偏移量和数据大小;通过index
12.如何解决消费者速率低的问题?
增加topic分区的数量、增加消费者个数
13.Kafka的那些设计让它有如此高的性能??
1.kafka是分布式的消息队列
2.对log文件进行了segment,并对segment文件进行索引
3.对于单节点使用了顺序读写,速度可达600M/S
4.引入了零拷贝,在os系统上就能完成读写操作,无需进入kafka应用(用户态)
14.kafka启动不起来的原因?
在关闭kafka时,先关闭了zk,zk中保留了kafka的id信息,会导致kafka下一次启动时报节点已经存在
把zk中的zkdata/version-2的文件删除就可以了
15.聊一聊Kafka Controller的作用?
负责kafka集群上下线工作,所有topic的副本分区分配和leader选举
16.Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
1.kafka的controller选举,先到先得
2.partitioin的leader选举,从isr中随机选取
17.失效副本是指什么?有那些应对措施?
失效副本:同步速率比leader相差大于10秒的副本
将失效的副本剔除ISR队列,进入OSR队列
失效副本等于leader同步速率小于10秒后从新进入ISR队列
18.Kafka消息是采用Pull模式,还是Push模式?
在producer阶段,采用的是push模式
在comsumer阶段,采用的是pull模式
comsumer在pull模式下:
优点:
comsumer可以根据自己消费能力调整消费速率,避免了broker push到comsumer时消费不及时而导致的崩溃问题;
缺点:consumer要时不时的去询问broker是否有新数据,容易发生死循环,内存溢出
解决办法:拉去不到数据时,增加下次拉去的时间,有api
19.Kafka创建Topic时如何将分区放置到不同的Broker中?
1.副本数不能超过broker数量
2.第一个分区是controller从broker中随机选取一个,然后其他分区相对0号分区依次向后移,第一个分区是用nextReplicatShift决定,而这个数也是随机产生
20.Kafka中的事务是怎么实现的?☆☆☆☆☆
kafka有两种事务:producer事务和consumer事务
producer事务是为了解决kafka跨分区跨会话的问题
kafka早起版本不能跨分区是因为producer的pid是kafka server根据producer生成的
为了解决这个问题,在java代码中给producer指定id,也就是transaction id,简称TID
我们将TID和PID进行绑定,在producer带着TID和PID第一次想broker注册时,broker会记录TID,并生成一个新的组件_transaction_state用来保存TID的事务状态信息
当producer重启后,就会带着TID和新的PID想broker发起请求,当发现TID一致时,producer就会获取之前的PID,将新的PID覆盖掉,并且去上一次事务的状态信息,从而继续上次的工作;
consumer事务相对于producer的事务相对弱一点,需要先确保consumer的消费和提交位置为一致且具有事务功能,才能保证数据的完成,不然就会造成数据的丢失或重复
21.Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
拦截器>序列化器>分区器
拦截器拦截处理无效信息
序列化加密数据
分区分配原则
22.Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?
使用了2个线程:main线程和sender线程
main线程会依次经过拦截器、序列化器、分区器、将数据发送到RecordAccumlator(x线程共享变量),再有sender线程从RecordAccumlate中拉取数据并发送到kafka broker,
batch.size:只有数据累计到batch.size后,sender才会发送数据。
linger.ms:如果数据迟迟未达到batch.size,sender线程等待linger.ms之后发送数据
23.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
答案是:offset+1;测试证明:
生产者发送数据offset是从0开始的:如下
消费者消费的数据offset是从offset+1开始的:如下
24.当你使用 kafka-topics.sh 创建(删除)了一个 topic 之后,Kafka 背后会执行什么逻辑?
1)会在 zookeeper 中的/brokers/topics 节点下创建一个新的 topic 节点,如:/brokers/topics/first
2)触发 Controller 的监听程序
3)kafka Controller 负责 topic 的创建工作,并更新 metadata cache
25.Kafka 有内部的 topic 吗?如果有是什么?有什么所用?
有 __consumer_offsets,保存消费者offset
26.kafka如何保证数据不丢失?3个维度去回答
1.生产者发送数据不丢失?
2.broker存储数据不丢失?
3.消费者消费数据不丢失?
25.kafka如何保证高可用?可以从以下4个维度去回答
1.zk中kafka的controller选举
2.副本选举机制
3.选举后HW,LEO水位恢复
4.副本同步队列的概念
更多推荐
所有评论(0)