Kafka笔记:Kafka CLI命令
这些部分包含运行 Kafka 的有用命令。请注意,我在使用命令时遇到了一些错误。我已经在底部包含了错误以及我如何修复它们。本节内容如下: 卡夫卡主题 kafka-console-producer 卡夫卡控制台消费者 从特定分区和偏移量读取消息 运行多个消费者 消费群 卡夫卡消费者组 复位偏移 故障排除 其他 CLI 选项 备忘单 资源 如需快速阅读备忘单,您也可以跳到本页底部,到Cheat She
这些部分包含运行 Kafka 的有用命令。请注意,我在使用命令时遇到了一些错误。我已经在底部包含了错误以及我如何修复它们。本节内容如下:
-
卡夫卡主题
-
kafka-console-producer
-
卡夫卡控制台消费者
-
从特定分区和偏移量读取消息
-
运行多个消费者
-
消费群
-
卡夫卡消费者组
-
复位偏移
-
故障排除
-
其他 CLI 选项
-
备忘单
-
资源
如需快速阅读备忘单,您也可以跳到本页底部,到Cheat Sheet部分。
卡夫卡主题
当我们讨论在 EC2 上安装 Kafka 时,我们最初在我们的机器上安装了v2.7.0
,但是如果您要使用其他人构建的 Kafka 基础架构,以下命令会很有帮助。截至撰写本文时,最新版本和当前稳定版本为v2.8.0
。
检查版本
[root@hcptstkafka1 kafka]# kafka-topics.sh --version
2.7.0 (Commit:448719dc99a19793)
进入全屏模式 退出全屏模式
请注意,在创建主题之前,您必须同时运行 Zookeeper 和 Kafka。您可以查看之前关于单节点(单代理)Kafka 集群Kafka on EC2的文章。我强烈建议您阅读上一篇,因为本文建立在上一篇文章的知识基础之上。
同样,您可以打开三个终端。 首先在第一个终端启动 Zookeeper,然后使用以下命令在第二个终端启动 Kafka。然后您可以在第三个终端上进行的所有其他操作。
# On terminal 1
zookeeper-server-start.sh /$KAFKA-HOME/zookeeper.properties
# On terminal 2
kafka-server-start.sh /$KAFKA-HOME/server.properties
进入全屏模式 退出全屏模式
要创建主题,我们需要指定:
-
kafka-topics.sh
-
--create --topic
,后跟主题名 -
--partition
,后面是你想要多少个分区, -
--replication-factor
,后跟一个应该等于或小于经纪人数量的数字。 -
--zookeeper
,后跟本地机器和端口 2181 -
从 v2.2 开始,我们使用
--bootstrap-server
代替,它在端口9092
上运行。
在下面的示例中,我们使用不推荐的方式和新的方式创建了两个主题。请注意,localhost
和127.0.0.1
可以互换使用。
[root@hcptstkafka1 ~]# kafka-topics.sh \
--topic test-topic-1 --create \
--partitions 3 --replication-factor 1 \
--zookeeper 127.0.0.1:2181
Created topic test-topic-1.
[root@hcptstkafka1 ~]# kafka-topics.sh \
--create --topic test-topic-2 \
--partitions 4 --replication-factor 1 \
--bootstrap-server 127.0.0.1:9092
Created topic test-topic-2.
[root@hcptstkafka1 ~]# kafka-topics.sh \
--create --topic test-topic-3 \
--partitions 5 --replication-factor 1 \
--zookeeper localhost:2181
Created topic test-topic-3.
进入全屏模式 退出全屏模式
要列出主题,
[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--bootstrap-server localhost:9092
test-topic-1
test-topic-2
test-topic-3
[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--zookeeper localhost:2181
test-topic-1
test-topic-2
test-topic-3
进入全屏模式 退出全屏模式
要删除主题,请运行以下命令。请注意,当您使用新方法运行--delete
时,不会返回任何输出。
另一方面,对--delete
使用不推荐使用的方法将返回一条消息,指出该主题已标记为删除。
当您再次尝试列出主题时,任何一种方法都将起作用,并且已删除的主题将不再出现。让我们尝试删除 test-topic-2 和 test-topic-3
[root@hcptstkafka1 ~]# kafka-topics.sh \
--delete --topic test-topic-2 \
--zookeeper localhost:2181
Topic test-topic-2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@hcptstkafka1 ~]#
[root@hcptstkafka1 ~]# kafka-topics.sh \
--delete --topic test-topic-3 \
--bootstrap-server localhost:9092
[root@hcptstkafka1 ~]#
[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--bootstrap-server localhost:9092
test-topic-1
进入全屏模式 退出全屏模式
要获取有关某个主题的更多详细信息,
[root@hcptstkafka1 ~]# kafka-topics.sh \
--describe --topic test-topic-1 \
--bootstrap-server localhost:9092
Topic: test-topic-1 PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test-topic-1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
进入全屏模式 退出全屏模式
让我们再创建两个示例主题。
[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create --topic test-eden-1 \
--partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092
Created topic test-eden-1.
[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create --topic test-tina-1 \
--partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092
Created topic test-tina-1.
进入全屏模式 退出全屏模式
回到我们的 $KAFKA-HOME 文件夹,这是之前安装 Kafka 包的位置,我们可以检查在 data/kafka 文件夹中创建的新日志。
在这里,我们可以看到为第一个主题 test-topic-1 和两个新主题创建了日志。 test-eden-1 和 test-tina-1。请注意,每个分区也会有一个日志。
由于我们为每个主题设置了三个分区,总共有三个主题,我们应该会看到 9 个新日志。
[root@hcptstkafka1 ~]# cd /usr/local/bin/kafka/data/kafka/
[root@hcptstkafka1 kafka]# ll
total 20
-rw-r--r-- 1 root root 4 Aug 15 07:25 cleaner-offset-checkpoint
-rw-r--r-- 1 root root 4 Aug 15 07:38 log-start-offset-checkpoint
-rw-r--r-- 1 root root 88 Aug 15 06:13 meta.properties
-rw-r--r-- 1 root root 151 Aug 15 07:38 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 151 Aug 15 07:39 replication-offset-checkpoint
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-0
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-1
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-2
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-0
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-1
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-2
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-0
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-1
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-2
[root@hcptstkafka1 kafka]#
[root@hcptstkafka1 kafka]# kafka-topics.sh --list --bootstrap-server localhost:9092
test-eden-1
test-tina-1
test-topic-1
进入全屏模式 退出全屏模式
kafka-console-producer
要运行此命令,需要以下内容
-
使用
broker-list
指定 brokers - 已弃用 -
作为 brokers-list 的替换,指定
bootstrap-server
-
使用
topic
指定主题
当您使用所需参数运行命令时,它应该返回一个 caret (>)。您现在可以向该主题发送消息。要退出,请按 Ctrl-c。
[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test-topic-1
>Hello world!
>This is the first message that we sent to a topic! D D
>^Z
[root@hcptstkafka1 ~] kafka-console-producer.sh \
--bootstrap-server localhost:9092
>Awesome job on this!
>Alright!
>^Z
进入全屏模式 退出全屏模式
如果我向不存在的主题发送消息怎么办
当您生成一条消息并指定一个仍未创建的主题时,它将返回一条错误消息“LEADER_NOT_AVAILABLE”。在退出之前,您仍然可以输入消息。
这里发生的是生产者尝试向不存在的主题发送消息,但收到警告。但是,生产者足够聪明,可以恢复并等待创建 Kafka 主题。创建主题后,您可以继续向现在创建的主题发送消息。
[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 --topic test-topic-10
> Third set of messages!
[2021-06-29 19:57:39,480] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test-topic-10=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>Wait there's error but I'm still able to type in a message
>^Z
# the new topic should now appear when you try to list the topics
[root@hcptstkafka1 ~] kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
test-topic-1
test-topic-10
进入全屏模式 退出全屏模式
主题是使用默认值创建的。当您尝试运行describe
时,您可以看到。
[root@hcptstkafka1 ~] kafka-topics.sh \
--describe --topic test-topic-10 \
--bootstrap-server localhost:9092
Topic: test-topic-10 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test-topic-10 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
进入全屏模式 退出全屏模式
该主题将默认为 1 个分区,复制因子为 1。这不是最佳实践,因此建议确保创建目标主题。
您还可以通过编辑config/server.properties
来更改此默认值
[root@hcptstkafka1 ~] pwd
/usr/local/bin/kafka
[root@hcptstkafka1 kafka]
[root@hcptstkafka1 kafka] vi config/server.properties
## Some of the output omitted ##
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/usr/local/bin/kafka/data/kafka
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=4
进入全屏模式 退出全屏模式
每当我们编辑属性文件时,我们都需要停止 Kafka 服务,然后再次运行它以使更改生效。在运行kafka-server-start.sh
的终端上,按 Ctrl-C 然后再次运行该命令。
现在,如果我们尝试再次向不存在的消息发送消息,则会使用新的默认值创建该主题。
# Send the message
[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test-topic-100
>Another message sent!
[2021-06-29 20:12:57,142] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test-topic-100=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>Oops! But hey, still sending message!
>^Z
# List the topics and get the details of the newly-created topic.
[root@hcptstkafka1 ~] kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
test-topic-1
test-topic-10
test-topic-100
[root@hcptstkafka1 ~] kafka-topics.sh \
--describe --topic test-topic-100 \
--bootstrap-server localhost:9092
Topic: test-topic-100 PartitionCount: 4 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test-topic-100 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-100 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-100 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-100 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
进入全屏模式 退出全屏模式
回想一下,生产者可以选择接受来自它已收到消息的主题的确认。这是通过 acks 完成的。
我们将再次发送一条消息,但这次我们指定我们需要来自主题的确认。
[root@hcptstkafka1 ~] kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic-1 --producer-property acks=all
> This is my second batch of message sent to the topic.
>I should get confirmation from the topic
进入全屏模式 退出全屏模式
kafka-console-consumer
在这里,我们将验证发送到主题的消息。与生产者类似,您还需要指定一些参数:
-
bootstrap-server
-
topic
但是,它只会读取 AFTERkafka-console-consumer
运行后发送的实时消息。
如果我们尝试运行kafka-console-consumer
然后在另一个窗口上运行kafka-console-producer
并再次向主题发送消息,控制台使用者现在将能够实时看到它。
# In producer's window
[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-10
>Sending a message to the second topic - test-topic-10!
>yeahboii!
>over and out!
# In consumer's window.
# Note that each line of message arrives in real-time.
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:90192 \
--topic test-topic-10 sr: 0
Sending a message to the second topic - test-topic-10! sr: 0
yeahboii!
over and out!
进入全屏模式 退出全屏模式
要读取发送到主题的总消息,我们可以附加--from-beginning
。要查看我们之前发送到 test-topic-1 的消息,
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--from-beginning
Alright!
I should get confirmation from the topic
This is the first message that we sent to a topic!
This is my second batch of message sent to the topic.
进入全屏模式 退出全屏模式
注意当您收到消息时,它不是按顺序检索的。如果您有多个分区,则无法保证顺序。这是因为当生产者向主题发送消息时,消息会分布在分区中。
如果您有一个单分区主题,则可以保证检索到的消息的顺序。
从特定分区和偏移量读取消息
我们还可以通过在分区号后面添加--partition
参数来指定要读取的分区。
让我们首先生成一些消息到 test-eden-1 主题
[root@hcptstkafka1 ~]# kafka-console-producer.sh \
--topic test-eden-1 \
--bootstrap-server localhost:9092
>This topic will contain a list of names
>Barney
>Marshall
>Ted
>Robin
>Lily
>Sheldon
>Rajesh
>Penny
>Leonard
>Howard
>Bernadette
>Amy
进入全屏模式 退出全屏模式
现在让我们打开第二个终端并从头开始阅读消息。同样,消息的显示顺序与发送顺序不同。
[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-eden-1 \
--from-beginning \
--bootstrap-server localhost:9092
This topic will contain a list of names
Marshall
Lily
Rajesh
Leonard
Amy
Barney
Ted
Sheldon
Penny
Bernadette
Robin
Howard
进入全屏模式 退出全屏模式
这个主题是用 3 个分区创建的。如果我们只想查看去往分区 1 的消息,我们可以执行以下操作:
[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--partition 1 \
--from-beginning \
--bootstrap-server localhost:9092
This topic will contain a list of names
Marshall
Lily
Rajesh
Leonard
Amy
进入全屏模式 退出全屏模式
类似地,我们可以读取同一分区中的消息,但从特定偏移量开始。请注意,如果要指定偏移量,请删除--from-beginning
参数。
[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--partition 1 \
--offset 2\
--bootstrap-server localhost:9092
Lily
Rajesh
Leonard
Amy
进入全屏模式 退出全屏模式
问题:我可以从偏移量开始读取消息并在不同的分区上执行此操作吗?
如果我们尝试查询所有分区并显示从特定偏移量开始的所有消息,我们将得到一个错误。
[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--offset 2\
--bootstrap-server localhost:9092
The partition is required when offset is specified.
进入全屏模式 退出全屏模式
运行多个消费者
现在让我们尝试运行一个生产者和两个消费者。让我们为此示例创建 test-topic-10。
# Create new topic
[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create \
--topic test-topic-10 \
--partitions 3 \
--replication-factor 1 \
--bootstrap-server localhost:9092
Created topic test-tina-1.
进入全屏模式 退出全屏模式
在第一个和第二个终端上,运行消费者 1 和消费者 2:
# Runs consumer 1 on terminal 1
[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092
进入全屏模式 退出全屏模式
# Runs consumer 2 on terminal 2
[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092
进入全屏模式 退出全屏模式
在第三个终端上,运行生产者。然后开始发送消息。
[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092
>10
>20
>30
>40
>50
>60
进入全屏模式 退出全屏模式
回到前两个终端。你会看到进来的消息。
[](https://res.cloudinary.com/practicaldev/image/fetch/s--rNIuSwMQ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev- to-uploads.s3.amazonaws.com/uploads/articles/gdtxjuu5tcen404pdxdm.jpg)
消费群
如果我们有多个生产者向单个消费者发送消息,则传入消息的速度可能会压倒消费者,消费者可能无法跟上并落后。为了解决这个问题,我们可以在两个订阅相同主题的消费者之间拆分数据。
我们之前已经这样做了,但这次我们将使用消费者组。但是,使用多个消费者与使用消费者组之间究竟有什么区别?
使用多个消费者
正如我们之前看到的,两个消费者订阅了同一个主题,我们将收到生产者发送的相同的消息。它并没有真正进行任何并行消费,因为每个消费者都被视为完全不同的应用程序。
使用消费者组
正如我们将看到的,到达消费者组的消息在消费者之间是负载平衡的。注册到同一消费者组的消费者将具有相同的组 ID。
理想情况下,一个消费者组中的消费者数量应该等于一个主题拥有的分区数量。
[](https://res.cloudinary.com/practicaldev/image/fetch/s--LU74E-0r--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https:// dev-to-uploads.s3.amazonaws.com/uploads/articles/4n6taftvds416kf73oxp.JPG)
照片取自卡夫卡:权威指南
如果消费者比现有分区多,多余的消费者就会闲置,只会浪费资源。
[](https://res.cloudinary.com/practicaldev/image/fetch/s--U-vIk9Hn--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https:// dev-to-uploads.s3.amazonaws.com/uploads/articles/aexri9ju8aj06l9s4fwo.JPG)
照片取自卡夫卡:权威指南
Kafka如何在消费者之间分配分区?
一开始,消费者组订阅主题,然后 Kafka 将主题“映射”到消费者组的 group-id。 Kafka 然后检查是否有任何现有消费者具有映射的 group-id。
如果有两个消费者具有相同的 group-id,这意味着它们都是同一个消费者组的一部分,那么主题的分区在两者之间是平衡的。请注意,这些消费者不能拥有相同的分配分区。
[](https://res.cloudinary.com/practicaldev/image/fetch/s--d4siLKF---/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev -to -uploads.s3.amazonaws.com/uploads/articles/jqrposuq3mnn574lkhy1.JPG)
照片取自Kafka 消费者如何并行化超出分区数量
也可以有多个消费者组订阅同一个主题。如果第二个消费者组正在执行与第一个消费者组不同的任务,就会出现这种情况。
[](https://res.cloudinary.com/practicaldev/image/fetch/s--dvL6zzMe--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev- to-uploads.s3.amazonaws.com/uploads/articles/qj3231dscoyae5sgv4jx.JPG)
照片取自Kafka 消费者如何并行化超出分区数量
要将消费者注册到消费者组,我们可以在运行消费者时使用参数--group
。
如前所述,如果我们在一个消费者组中有多个消费者从一个主题中检索消息,则检索到的消息将在两个消费者之间进行负载平衡。这意味着一些消息将发送给消费者 1,而一些消息将发送给消费者 2。
为了证明这一点,打开三个终端:一个用于生产者,两个用于消费者。首先,在两个消费者上运行kafka-console-consumer
和--group
。
# Run command in two terminals for the consumers
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group
# Run command in terminal for producer
[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1
进入全屏模式 退出全屏模式
然后在生产者终端上,运行kafka-console-producer
并开始发送消息。您将看到消息将在两个消费者之间分发。
[](https://res.cloudinary.com/practicaldev/image/fetch/s--R1JBmsLL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev- to-uploads.s3.amazonaws.com/uploads/articles/n7q2jjd92cygg18l25vc.JPG)
现在,回想一下,当消费者从主题中检索消息时,它会提交偏移量。当您将--from-beginning
与kafka-console-consumer
一起使用并指定--group
时,您将能够看到发送到该主题的所有消息。
如果您尝试使用--from-beginning
再次运行相同的命令,它将不会返回任何内容。这是因为第一次运行此命令时,从一开始的所有消息都已提交。
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group \
--from-beginning
let try sending messages! which consumer will receive this color?
consumer will receive this color?
red
orange
yellow
green
blue
# When you rerun the command again, it returns nothing.
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group \
--from-beginning
进入全屏模式 退出全屏模式
kafka-consumer-groups
这些命令可用于列出、创建和基本上对消费者组做很多事情。要列出消费者组,
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 --list
my-awesome-app-group
进入全屏模式 退出全屏模式
要获取消费者组的详细信息,请附加--describe
并指定组。
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--describe
Consumer group 'my-awesome-app-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-awesome-app-group test-topic-1 2 9 11 2 - - -
my-awesome-app-group test-topic-1 1 10 10 0 - - -
my-awesome-app-group test-topic-1 0 19 20 1 - - -
进入全屏模式 退出全屏模式
请注意 lag 列。这指定了特定分区中有多少消息仍未被消费者组检索和处理。要确保消费者组赶上所有消息,请再次运行kafka-console-consumer
。
[root@hcptstkafka1 kafka] kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1
violet
white
black
进入全屏模式 退出全屏模式
现在,当您尝试在消费者组上运行describe
时,滞后现在将为零。
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--describe
Consumer group 'my-awesome-app-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-awesome-app-group test-topic-1 2 11 11 0 - - -
my-awesome-app-group test-topic-1 1 10 10 0 - - -
my-awesome-app-group test-topic-1 0 20 20 0 - - -
进入全屏模式 退出全屏模式
重置偏移
如前所述,运行kafka-console-consumer.sh
并指定组将从偏移量中读取 - 这是它停止的分区中的最后一个位置。
实际上有一个选项可以重置偏移量并指定我们希望在分区中的哪个位置开始读取数据,这在某种程度上是重放数据。
当我们只运行没有任何参数的kafka-consumer-group
命令时,它不会推送,而是会返回您可以附加到命令的可用参数。
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh
# Some parts of the output omitted
--reset-offsets Reset offsets of consumer group.
Supports one consumer group at the
time, and instances should be
inactive
Has 2 execution options: --dry-run
(the default) to plan which offsets
to reset, and --execute to update
the offsets. Additionally, the --
export option is used to export the
results to a CSV format.
You must choose one of the following
reset specifications: --to-datetime,
--by-period, --to-earliest, --to-
latest, --shift-by, --from-file, --
to-current.
To define the scope use --all-topics
or --topic. One scope must be
specified unless you use '--from-
file'.
进入全屏模式 退出全屏模式
让我们尝试使用--reset-offsets
将偏移量重置为开头,并指定我们要从开头--to-earliest
开始。我们还需要定义是否要为一个主题或所有主题重置偏移量。
最后,我们只想要预览/试运行而不是实际重置偏移量,因此我们还将附加--execute
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets --to-earliest \
--topic test-topic-1 \
--execute
GROUP TOPIC PARTITION NEW-OFFSET
my-awesome-app-group test-topic-1 0 0
my-awesome-app-group test-topic-1 1 0
my-awesome-app-group test-topic-1 2 0
进入全屏模式 退出全屏模式
现在,如果我们尝试从test-topic-1
检索消息,我们将能够从头看到所有消息。
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group
Alright!
I should get confirmation from the topic
the first
ahh right,
orange
orange
green
This is the first message thatwe sent to a topic!
This is my second batch of message sent to the topic.
Let's see where this will go
now lets try colors
yellow
red
violet
Hello world!
Awesome job on this!
which consumer
or second
its distributes
red\
green
blue
let try sending messages! which consumer will receive this color?
yellow
blue
white
进入全屏模式 退出全屏模式
我们还可以使用--shift-by
向前或向后移动偏移量,并附加一个正数以向前移动步数,或者附加一个负数以向后移动。
# Moving the offset three steps forward
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by 3 \
--topic test-topic-1 \
--execute
GROUP TOPIC PARTITION NEW-OFFSET
my-awesome-app-group test-topic-1 0 3
my-awesome-app-group test-topic-1 1 3
my-awesome-app-group test-topic-1 2 3
# Moving the offset another step.
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by 1 \
--topic test-topic-1 \
--execute
GROUP TOPIC PARTITION NEW-OFFSET
my-awesome-app-group test-topic-1 0 4
my-awesome-app-group test-topic-1 1 4
my-awesome-app-group test-topic-1 2 4
# Moving the offset three steps backward.
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by -3 \
--topic test-topic-1 \
--execute
GROUP TOPIC PARTITION NEW-OFFSET
my-awesome-app-group test-topic-1 0 1
my-awesome-app-group test-topic-1 1 1
my-awesome-app-group test-topic-1 2 1
进入全屏模式 退出全屏模式
故障排除
Kafka Socket 服务器失败 - 绑定异常
这意味着端口冲突问题。
ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: Socket server failed to bind to 0.0.0.0:9092: Address already in use
进入全屏模式 退出全屏模式
要解决此问题,您可以在不同的端口(如 9093)上运行 KafkaServer idu003d0,或者找到侦听该端口的处理并将其终止(最简单)。
要查找 PID,您可以使用netstat
或lsof
。
[root@hcptstkafka1 kafka]# netstat -tulpn |grep 9092
tcp6 0 0 :::9092 :::* LISTEN 23999/java
[root@hcptstkafka1 kafka]# lsof -n -i :9092 | grep LISTEN
java 23999 root 134u IPv6 52852 0t0 TCP *:XmlIpcRegSvc (LISTEN)
# send a kill signal
[root@hcptstkafka1 kafka]# kill -9 23999
# process should now disappear
[root@hcptstkafka1 kafka]# lsof -n -i :9092 | grep LISTEN
[root@hcptstkafka1 kafka]# netstat -tulpn |grep 9092
# retry running kafka
[root@hcptstkafka1 kafka]# kafka-server-start.sh config/server.properties
进入全屏模式 退出全屏模式
其他 CLI 选项
我们已经讨论过消息将在生产者发送消息时随机分发到分区。一种通过使用消息密钥确保消息始终到达特定分区的方法。消费者也可以利用它从特定分区中检索消息。
带钥匙的生产者
kafka-console-producer \
--broker-list 127.0.0.1:9092 \
--topic first_topic \
--property parse.key=true \
--property key.separator=,
> key,value
> another key,another value
进入全屏模式 退出全屏模式
带钥匙的消费者
kafka-console-consumer \
--bootstrap-server 127.0.0.1:9092 \
--topic first_topic --from-beginning \
--property print.key=true \
--property key.separator=,
进入全屏模式 退出全屏模式
备忘单
这个简单易懂的命令备忘单来自 Bogdan Stashchuk 的Github 页面,用于他的 Apache Kafka 课程。
Basic KAFKA Commands
START ZOOKEEPER
bin/zookeeper-server-start.sh config/zookeeper.properties
START KAFKA BROKER
bin/kafka-server-start.sh config/server.properties
CREATE TOPIC
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--replication-factor 1 \
--partitions 3 \
--topic test
LIST TOPICS
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
TOPIC DETAILS
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--describe \
--topic test
START CONSOLE PRODUCER
bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test
START CONSOLE CONSUMER
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test
START CONSOLE CONSUMER AND READ MESSAGES FROM BEGINNING
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test \
--from-beginning
START CONSOLE CONSUMER WITH SPECIFIC CONSUMER GROUP
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test \
--group test \
--from-beginning
LIST CONSUMER GROUPS
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
CONSUMER GROUP DETAILS
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group test \
--describe
进入全屏模式 退出全屏模式
资源
我知道这篇文章很长,我可能会继续回来再读一遍,但如果你有兴趣了解更多,我所有的笔记都是基于以下课程。
-
Apache Kafka 系列 - 为初学者学习 Apache Kafka v2,作者 Stephane Maarek
-
Apache Kafka 入门由 Ryan Plant
-
Apache Kafka A-Z 手把手学习由 Learnkart Technology Private Limited
-
完整的 Apache Kafka 实用指南由 Bogdan Stashchuk
除此之外,您还可以尝试另一个 Kafka CLI 的开源替代方案,KafkaCat。完成课程后,我可能会尝试这个。
-
KafkaCat-github
-
使用 KafkaCat 调试
如果您喜欢我的笔记,或者它们带来了一些价值,我很高兴在Twitter 上与您联系!。 😃
更多推荐
所有评论(0)