这些部分包含运行 Kafka 的有用命令。请注意,我在使用命令时遇到了一些错误。我已经在底部包含了错误以及我如何修复它们。本节内容如下:

  • 卡夫卡主题

  • kafka-console-producer

  • 卡夫卡控制台消费者

  • 从特定分区和偏移量读取消息

  • 运行多个消费者

  • 消费群

  • 卡夫卡消费者组

  • 复位偏移

  • 故障排除

  • 其他 CLI 选项

  • 备忘单

  • 资源

如需快速阅读备忘单,您也可以跳到本页底部,到Cheat Sheet部分。


卡夫卡主题

当我们讨论在 EC2 上安装 Kafka 时,我们最初在我们的机器上安装了v2.7.0,但是如果您要使用其他人构建的 Kafka 基础架构,以下命令会很有帮助。截至撰写本文时,最新版本和当前稳定版本为v2.8.0

检查版本

[root@hcptstkafka1 kafka]# kafka-topics.sh --version
2.7.0 (Commit:448719dc99a19793)

进入全屏模式 退出全屏模式

请注意,在创建主题之前,您必须同时运行 Zookeeper 和 Kafka。您可以查看之前关于单节点(单代理)Kafka 集群Kafka on EC2的文章。我强烈建议您阅读上一篇,因为本文建立在上一篇文章的知识基础之上。

同样,您可以打开三个终端。 首先在第一个终端启动 Zookeeper,然后使用以下命令在第二个终端启动 Kafka。然后您可以在第三个终端上进行的所有其他操作。

# On terminal 1
zookeeper-server-start.sh /$KAFKA-HOME/zookeeper.properties

# On terminal 2
kafka-server-start.sh /$KAFKA-HOME/server.properties

进入全屏模式 退出全屏模式

要创建主题,我们需要指定:

  • kafka-topics.sh

  • --create --topic,后跟主题名

  • --partition,后面是你想要多少个分区,

  • --replication-factor,后跟一个应该等于或小于经纪人数量的数字。

  • --zookeeper,后跟本地机器和端口 2181

  • 从 v2.2 开始,我们使用--bootstrap-server代替,它在端口9092上运行。

在下面的示例中,我们使用不推荐的方式和新的方式创建了两个主题。请注意,localhost127.0.0.1可以互换使用。

[root@hcptstkafka1 ~]# kafka-topics.sh \
--topic test-topic-1 --create \
--partitions 3 --replication-factor 1 \
--zookeeper 127.0.0.1:2181

Created topic test-topic-1.

[root@hcptstkafka1 ~]# kafka-topics.sh \
--create --topic test-topic-2 \
--partitions 4 --replication-factor 1 \
--bootstrap-server 127.0.0.1:9092

Created topic test-topic-2.

[root@hcptstkafka1 ~]# kafka-topics.sh \
--create --topic test-topic-3 \
--partitions 5 --replication-factor 1 \
--zookeeper localhost:2181

Created topic test-topic-3.

进入全屏模式 退出全屏模式

要列出主题,

[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--bootstrap-server localhost:9092

test-topic-1
test-topic-2
test-topic-3

[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--zookeeper localhost:2181

test-topic-1
test-topic-2
test-topic-3

进入全屏模式 退出全屏模式

要删除主题,请运行以下命令。请注意,当您使用新方法运行--delete时,不会返回任何输出。

另一方面,对--delete使用不推荐使用的方法将返回一条消息,指出该主题已标记为删除。

当您再次尝试列出主题时,任何一种方法都将起作用,并且已删除的主题将不再出现。让我们尝试删除 test-topic-2test-topic-3

[root@hcptstkafka1 ~]# kafka-topics.sh \
--delete --topic test-topic-2 \
--zookeeper localhost:2181

Topic test-topic-2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@hcptstkafka1 ~]# 
[root@hcptstkafka1 ~]# kafka-topics.sh \
--delete --topic test-topic-3 \
--bootstrap-server localhost:9092
[root@hcptstkafka1 ~]# 
[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--bootstrap-server localhost:9092

test-topic-1

进入全屏模式 退出全屏模式

要获取有关某个主题的更多详细信息,

[root@hcptstkafka1 ~]# kafka-topics.sh \
--describe --topic test-topic-1 \
--bootstrap-server localhost:9092

Topic: test-topic-1     PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: test-topic-1     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: test-topic-1     Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: test-topic-1     Partition: 2    Leader: 0       Replicas: 0     Isr: 0

进入全屏模式 退出全屏模式

让我们再创建两个示例主题。

[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create --topic test-eden-1 \
--partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092

Created topic test-eden-1.

[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create --topic test-tina-1 \
--partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092

Created topic test-tina-1.

进入全屏模式 退出全屏模式

回到我们的 $KAFKA-HOME 文件夹,这是之前安装 Kafka 包的位置,我们可以检查在 data/kafka 文件夹中创建的新日志。

在这里,我们可以看到为第一个主题 test-topic-1 和两个新主题创建了日志。 test-eden-1test-tina-1。请注意,每个分区也会有一个日志。

由于我们为每个主题设置了三个分区,总共有三个主题,我们应该会看到 9 个新日志。

[root@hcptstkafka1 ~]# cd /usr/local/bin/kafka/data/kafka/
[root@hcptstkafka1 kafka]# ll
total 20
-rw-r--r-- 1 root root   4 Aug 15 07:25 cleaner-offset-checkpoint       
-rw-r--r-- 1 root root   4 Aug 15 07:38 log-start-offset-checkpoint     
-rw-r--r-- 1 root root  88 Aug 15 06:13 meta.properties
-rw-r--r-- 1 root root 151 Aug 15 07:38 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 151 Aug 15 07:39 replication-offset-checkpoint   
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-0
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-1
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-2
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-0
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-1
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-2
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-0
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-1
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-2
[root@hcptstkafka1 kafka]# 
[root@hcptstkafka1 kafka]# kafka-topics.sh --list --bootstrap-server localhost:9092
test-eden-1
test-tina-1
test-topic-1

进入全屏模式 退出全屏模式


kafka-console-producer

要运行此命令,需要以下内容

  • 使用broker-list指定 brokers - 已弃用

  • 作为 brokers-list 的替换,指定bootstrap-server

  • 使用topic指定主题

当您使用所需参数运行命令时,它应该返回一个 caret (>)。您现在可以向该主题发送消息。要退出,请按 Ctrl-c。


[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test-topic-1
>Hello world!
>This is the first message that we sent to a topic!                               D                   D           
>^Z

[root@hcptstkafka1 ~] kafka-console-producer.sh \
--bootstrap-server localhost:9092
>Awesome job on this!
>Alright!
>^Z

进入全屏模式 退出全屏模式

如果我向不存在的主题发送消息怎么办

当您生成一条消息并指定一个仍未创建的主题时,它将返回一条错误消息“LEADER_NOT_AVAILABLE”。在退出之前,您仍然可以输入消息。

这里发生的是生产者尝试向不存在的主题发送消息,但收到警告。但是,生产者足够聪明,可以恢复并等待创建 Kafka 主题。创建主题后,您可以继续向现在创建的主题发送消息。

[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 --topic test-topic-10
> Third set of messages!
[2021-06-29 19:57:39,480] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test-topic-10=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>Wait there's error but I'm still able to type in a message
>^Z

# the new topic should now appear when you try to list the topics
[root@hcptstkafka1 ~] kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list

test-topic-1
test-topic-10

进入全屏模式 退出全屏模式

主题是使用默认值创建的。当您尝试运行describe时,您可以看到。

[root@hcptstkafka1 ~] kafka-topics.sh \
--describe --topic test-topic-10 \
--bootstrap-server localhost:9092

Topic: test-topic-10    PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: test-topic-10    Partition: 0    Leader: 0       Replicas: 0     Isr: 0

进入全屏模式 退出全屏模式

该主题将默认为 1 个分区,复制因子为 1。这不是最佳实践,因此建议确保创建目标主题。

您还可以通过编辑config/server.properties来更改此默认值

[root@hcptstkafka1 ~] pwd
/usr/local/bin/kafka
[root@hcptstkafka1 kafka]
[root@hcptstkafka1 kafka] vi config/server.properties 

## Some of the output omitted ##

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/usr/local/bin/kafka/data/kafka

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=4

进入全屏模式 退出全屏模式

每当我们编辑属性文件时,我们都需要停止 Kafka 服务,然后再次运行它以使更改生效。在运行kafka-server-start.sh的终端上,按 Ctrl-C 然后再次运行该命令。

现在,如果我们尝试再次向不存在的消息发送消息,则会使用新的默认值创建该主题。

# Send the message
[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test-topic-100
>Another message sent!
[2021-06-29 20:12:57,142] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test-topic-100=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>Oops! But hey, still sending message!
>^Z

# List the topics and get the details of the newly-created topic.
[root@hcptstkafka1 ~] kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
test-topic-1
test-topic-10
test-topic-100
[root@hcptstkafka1 ~] kafka-topics.sh \
--describe --topic test-topic-100 \
--bootstrap-server localhost:9092 

Topic: test-topic-100   PartitionCount: 4       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: test-topic-100   Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: test-topic-100   Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: test-topic-100   Partition: 2    Leader: 0       Replicas: 0     Isr: 0
        Topic: test-topic-100   Partition: 3    Leader: 0       Replicas: 0     Isr: 0

进入全屏模式 退出全屏模式

回想一下,生产者可以选择接受来自它已收到消息的主题的确认。这是通过 acks 完成的。

我们将再次发送一条消息,但这次我们指定我们需要来自主题的确认。

[root@hcptstkafka1 ~] kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic-1 --producer-property acks=all
> This is my second batch of message sent to the topic.
>I should get confirmation from the topic

进入全屏模式 退出全屏模式


kafka-console-consumer

在这里,我们将验证发送到主题的消息。与生产者类似,您还需要指定一些参数:

  • bootstrap-server

  • topic

但是,它只会读取 AFTERkafka-console-consumer运行后发送的实时消息。

如果我们尝试运行kafka-console-consumer然后在另一个窗口上运行kafka-console-producer并再次向主题发送消息,控制台使用者现在将能够实时看到它。

# In producer's window
[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-10
>Sending a message to the second topic - test-topic-10!
>yeahboii!
>over and out!

# In consumer's window.
# Note that each line of message arrives in real-time.
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:90192 \
--topic test-topic-10                                                         sr: 0
Sending a message to the second topic - test-topic-10!                           sr: 0
yeahboii!
over and out!

进入全屏模式 退出全屏模式

要读取发送到主题的总消息,我们可以附加--from-beginning。要查看我们之前发送到 test-topic-1 的消息,

[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--from-beginning

Alright!
I should get confirmation from the topic
This is the first message that we sent to a topic!
This is my second batch of message sent to the topic.

进入全屏模式 退出全屏模式

注意当您收到消息时,它不是按顺序检索的。如果您有多个分区,则无法保证顺序。这是因为当生产者向主题发送消息时,消息会分布在分区中。

如果您有一个单分区主题,则可以保证检索到的消息的顺序。


从特定分区和偏移量读取消息

我们还可以通过在分区号后面添加--partition参数来指定要读取的分区。

让我们首先生成一些消息到 test-eden-1 主题

[root@hcptstkafka1 ~]# kafka-console-producer.sh \
--topic test-eden-1 \
--bootstrap-server localhost:9092

>This topic will contain a list of names
>Barney
>Marshall
>Ted
>Robin
>Lily
>Sheldon
>Rajesh
>Penny
>Leonard
>Howard
>Bernadette
>Amy

进入全屏模式 退出全屏模式

现在让我们打开第二个终端并从头开始阅读消息。同样,消息的显示顺序与发送顺序不同。

[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-eden-1 \
--from-beginning \
--bootstrap-server localhost:9092

This topic will contain a list of names
Marshall
Lily
Rajesh
Leonard
Amy

Barney
Ted
Sheldon
Penny
Bernadette
Robin
Howard

进入全屏模式 退出全屏模式

这个主题是用 3 个分区创建的。如果我们只想查看去往分区 1 的消息,我们可以执行以下操作:

[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--partition 1 \
--from-beginning \
--bootstrap-server localhost:9092

This topic will contain a list of names
Marshall
Lily
Rajesh
Leonard
Amy

进入全屏模式 退出全屏模式

类似地,我们可以读取同一分区中的消息,但从特定偏移量开始。请注意,如果要指定偏移量,请删除--from-beginning参数。

[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--partition 1 \
--offset 2\
--bootstrap-server localhost:9092

Lily
Rajesh
Leonard
Amy

进入全屏模式 退出全屏模式

问题:我可以从偏移量开始读取消息并在不同的分区上执行此操作吗?

如果我们尝试查询所有分区并显示从特定偏移量开始的所有消息,我们将得到一个错误。

[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--offset 2\
--bootstrap-server localhost:9092

The partition is required when offset is specified.

进入全屏模式 退出全屏模式


运行多个消费者

现在让我们尝试运行一个生产者和两个消费者。让我们为此示例创建 test-topic-10

# Create new topic
[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create \
--topic test-topic-10 \
--partitions 3 \
--replication-factor 1 \
--bootstrap-server localhost:9092
Created topic test-tina-1.

进入全屏模式 退出全屏模式

在第一个和第二个终端上,运行消费者 1 和消费者 2:

# Runs consumer 1 on terminal 1
[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092

进入全屏模式 退出全屏模式

# Runs consumer 2 on terminal 2
[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092

进入全屏模式 退出全屏模式

在第三个终端上,运行生产者。然后开始发送消息。

[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092
>10
>20
>30
>40
>50
>60

进入全屏模式 退出全屏模式

回到前两个终端。你会看到进来的消息。

[Alt](https://res.cloudinary.com/practicaldev/image/fetch/s--rNIuSwMQ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev- to-uploads.s3.amazonaws.com/uploads/articles/gdtxjuu5tcen404pdxdm.jpg)


消费群

如果我们有多个生产者向单个消费者发送消息,则传入消息的速度可能会压倒消费者,消费者可能无法跟上并落后。为了解决这个问题,我们可以在两个订阅相同主题的消费者之间拆分数据。

我们之前已经这样做了,但这次我们将使用消费者组。但是,使用多个消费者与使用消费者组之间究竟有什么区别?

使用多个消费者

正如我们之前看到的,两个消费者订阅了同一个主题,我们将收到生产者发送的相同的消息。它并没有真正进行任何并行消费,因为每个消费者都被视为完全不同的应用程序。

使用消费者组

正如我们将看到的,到达消费者组的消息在消费者之间是负载平衡的。注册到同一消费者组的消费者将具有相同的组 ID

理想情况下,一个消费者组中的消费者数量应该等于一个主题拥有的分区数量。

[Alt](https://res.cloudinary.com/practicaldev/image/fetch/s--LU74E-0r--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https:// dev-to-uploads.s3.amazonaws.com/uploads/articles/4n6taftvds416kf73oxp.JPG)

照片取自卡夫卡:权威指南

如果消费者比现有分区多,多余的消费者就会闲置,只会浪费资源。

[Alt](https://res.cloudinary.com/practicaldev/image/fetch/s--U-vIk9Hn--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https:// dev-to-uploads.s3.amazonaws.com/uploads/articles/aexri9ju8aj06l9s4fwo.JPG)

照片取自卡夫卡:权威指南

Kafka如何在消费者之间分配分区?

一开始,消费者组订阅主题,然后 Kafka 将主题“映射”到消费者组的 group-id。 Kafka 然后检查是否有任何现有消费者具有映射的 group-id。

如果有两个消费者具有相同的 group-id,这意味着它们都是同一个消费者组的一部分,那么主题的分区在两者之间是平衡的。请注意,这些消费者不能拥有相同的分配分区。

[Alt](https://res.cloudinary.com/practicaldev/image/fetch/s--d4siLKF---/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev -to -uploads.s3.amazonaws.com/uploads/articles/jqrposuq3mnn574lkhy1.JPG)

照片取自Kafka 消费者如何并行化超出分区数量

也可以有多个消费者组订阅同一个主题。如果第二个消费者组正在执行与第一个消费者组不同的任务,就会出现这种情况。

[Alt](https://res.cloudinary.com/practicaldev/image/fetch/s--dvL6zzMe--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev- to-uploads.s3.amazonaws.com/uploads/articles/qj3231dscoyae5sgv4jx.JPG)

照片取自Kafka 消费者如何并行化超出分区数量

要将消费者注册到消费者组,我们可以在运行消费者时使用参数--group

如前所述,如果我们在一个消费者组中有多个消费者从一个主题中检索消息,则检索到的消息将在两个消费者之间进行负载平衡。这意味着一些消息将发送给消费者 1,而一些消息将发送给消费者 2。

为了证明这一点,打开三个终端:一个用于生产者,两个用于消费者。首先,在两个消费者上运行kafka-console-consumer--group

# Run  command in two terminals for the consumers
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group

# Run command in terminal for producer
[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 

进入全屏模式 退出全屏模式

然后在生产者终端上,运行kafka-console-producer并开始发送消息。您将看到消息将在两个消费者之间分发。

[Alt](https://res.cloudinary.com/practicaldev/image/fetch/s--R1JBmsLL--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://dev- to-uploads.s3.amazonaws.com/uploads/articles/n7q2jjd92cygg18l25vc.JPG)

现在,回想一下,当消费者从主题中检索消息时,它会提交偏移量。当您将--from-beginningkafka-console-consumer一起使用并指定--group时,您将能够看到发送到该主题的所有消息。

如果您尝试使用--from-beginning再次运行相同的命令,它将不会返回任何内容。这是因为第一次运行此命令时,从一开始的所有消息都已提交。

[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1  \
--group my-awesome-app-group \
--from-beginning

let try sending messages! which consumer will receive this color?
consumer will receive this color?
red
orange
yellow
green 
blue  

# When you rerun the command again, it returns nothing.
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1  \
--group my-awesome-app-group \
--from-beginning


进入全屏模式 退出全屏模式


kafka-consumer-groups

这些命令可用于列出、创建和基本上对消费者组做很多事情。要列出消费者组,

[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 --list

my-awesome-app-group

进入全屏模式 退出全屏模式

要获取消费者组的详细信息,请附加--describe并指定组。

[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--describe 

Consumer group 'my-awesome-app-group' has no active members.

GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
my-awesome-app-group test-topic-1    2          9               11              2               -               -               -
my-awesome-app-group test-topic-1    1          10              10              0               -               -               -
my-awesome-app-group test-topic-1    0          19              20              1               -               -               -

进入全屏模式 退出全屏模式

请注意 lag 列。这指定了特定分区中有多少消息仍未被消费者组检索和处理。要确保消费者组赶上所有消息,请再次运行kafka-console-consumer


[root@hcptstkafka1 kafka] kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1

violet
white 
black

进入全屏模式 退出全屏模式

现在,当您尝试在消费者组上运行describe时,滞后现在将为零。

[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--describe 

Consumer group 'my-awesome-app-group' has no active members.

GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
my-awesome-app-group test-topic-1    2          11              11              0               -               -               -
my-awesome-app-group test-topic-1    1          10              10              0               -               -               -
my-awesome-app-group test-topic-1    0          20              20              0               -               -               -

进入全屏模式 退出全屏模式


重置偏移

如前所述,运行kafka-console-consumer.sh并指定组将从偏移量中读取 - 这是它停止的分区中的最后一个位置。

实际上有一个选项可以重置偏移量并指定我们希望在分区中的哪个位置开始读取数据,这在某种程度上是重放数据。

当我们只运行没有任何参数的kafka-consumer-group命令时,它不会推送,而是会返回您可以附加到命令的可用参数。

[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh 

# Some parts of the output omitted
--reset-offsets                         Reset offsets of consumer group.
                                          Supports one consumer group at the
                                          time, and instances should be
                                          inactive
                                        Has 2 execution options: --dry-run
                                          (the default) to plan which offsets
                                          to reset, and --execute to update
                                          the offsets. Additionally, the --
                                          export option is used to export the
                                          results to a CSV format.
                                        You must choose one of the following
                                          reset specifications: --to-datetime,
                                          --by-period, --to-earliest, --to-
                                          latest, --shift-by, --from-file, --
                                          to-current.
                                        To define the scope use --all-topics
                                          or --topic. One scope must be
                                          specified unless you use '--from-
                                          file'.

进入全屏模式 退出全屏模式

让我们尝试使用--reset-offsets将偏移量重置为开头,并指定我们要从开头--to-earliest开始。我们还需要定义是否要为一个主题或所有主题重置偏移量。

最后,我们只想要预览/试运行而不是实际重置偏移量,因此我们还将附加--execute

[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets --to-earliest \
--topic test-topic-1 \
--execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
my-awesome-app-group           test-topic-1                   0          0
my-awesome-app-group           test-topic-1                   1          0
my-awesome-app-group           test-topic-1                   2          0

进入全屏模式 退出全屏模式

现在,如果我们尝试从test-topic-1检索消息,我们将能够从头看到所有消息。

[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group

Alright!
I should get confirmation from the topic
the first
ahh right,
orange
orange
green

This is the first message thatwe sent to a topic!
 This is my second batch of message sent to the topic.
Let's see where this will go
now lets try colors
yellow
red
violet

Hello world!
Awesome job on this!
which consumer
or second
its distributes
red\
green
blue
let try sending messages! which consumer will receive this color?
yellow
blue
white

进入全屏模式 退出全屏模式

我们还可以使用--shift-by向前或向后移动偏移量,并附加一个正数以向前移动步数,或者附加一个负数以向后移动。

# Moving the offset three steps forward
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by 3 \
--topic test-topic-1 \
--execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
my-awesome-app-group           test-topic-1                   0          3
my-awesome-app-group           test-topic-1                   1          3
my-awesome-app-group           test-topic-1                   2          3

# Moving the offset another step.
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by 1 \
--topic test-topic-1 \
--execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
my-awesome-app-group           test-topic-1                   0          4
my-awesome-app-group           test-topic-1                   1          4
my-awesome-app-group           test-topic-1                   2          4

# Moving the offset three steps backward.
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by -3 \
--topic test-topic-1 \
--execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
my-awesome-app-group           test-topic-1                   0          1
my-awesome-app-group           test-topic-1                   1          1
my-awesome-app-group           test-topic-1                   2          1

进入全屏模式 退出全屏模式


故障排除

Kafka Socket 服务器失败 - 绑定异常

这意味着端口冲突问题。

ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: Socket server failed to bind to 0.0.0.0:9092: Address already in use

进入全屏模式 退出全屏模式

要解决此问题,您可以在不同的端口(如 9093)上运行 KafkaServer idu003d0,或者找到侦听该端口的处理并将其终止(最简单)。

要查找 PID,您可以使用netstatlsof

[root@hcptstkafka1 kafka]# netstat -tulpn |grep 9092
tcp6       0      0 :::9092                 :::*                    LISTEN      23999/java
[root@hcptstkafka1 kafka]# lsof -n -i :9092 | grep LISTEN
java    23999 root  134u  IPv6  52852      0t0  TCP *:XmlIpcRegSvc (LISTEN)

# send a kill signal
[root@hcptstkafka1 kafka]# kill -9 23999

# process should now disappear
[root@hcptstkafka1 kafka]# lsof -n -i :9092 | grep LISTEN
[root@hcptstkafka1 kafka]# netstat -tulpn |grep 9092

# retry running kafka
[root@hcptstkafka1 kafka]# kafka-server-start.sh config/server.properties 

进入全屏模式 退出全屏模式


其他 CLI 选项

我们已经讨论过消息将在生产者发送消息时随机分发到分区。一种通过使用消息密钥确保消息始终到达特定分区的方法。消费者也可以利用它从特定分区中检索消息。

带钥匙的生产者

kafka-console-producer  \
--broker-list 127.0.0.1:9092  \
--topic first_topic  \
--property parse.key=true  \
--property key.separator=,
> key,value
> another key,another value

进入全屏模式 退出全屏模式

带钥匙的消费者

kafka-console-consumer \
--bootstrap-server 127.0.0.1:9092 \
--topic first_topic --from-beginning  \
--property print.key=true  \
--property key.separator=,

进入全屏模式 退出全屏模式


备忘单

这个简单易懂的命令备忘单来自 Bogdan Stashchuk 的Github 页面,用于他的 Apache Kafka 课程。

Basic KAFKA Commands

START ZOOKEEPER
bin/zookeeper-server-start.sh config/zookeeper.properties

START KAFKA BROKER
bin/kafka-server-start.sh config/server.properties

CREATE TOPIC
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--replication-factor 1 \
--partitions 3 \
--topic test

LIST TOPICS
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list

TOPIC DETAILS
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--describe \
--topic test

START CONSOLE PRODUCER
bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test

START CONSOLE CONSUMER
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test

START CONSOLE CONSUMER AND READ MESSAGES FROM BEGINNING
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test \
--from-beginning

START CONSOLE CONSUMER WITH SPECIFIC CONSUMER GROUP
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test \
--group test \
--from-beginning

LIST CONSUMER GROUPS
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list

CONSUMER GROUP DETAILS
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group test \
--describe

进入全屏模式 退出全屏模式


资源

我知道这篇文章很长,我可能会继续回来再读一遍,但如果你有兴趣了解更多,我所有的笔记都是基于以下课程。

  • Apache Kafka 系列 - 为初学者学习 Apache Kafka v2,作者 Stephane Maarek

  • Apache Kafka 入门由 Ryan Plant

  • Apache Kafka A-Z 手把手学习由 Learnkart Technology Private Limited

  • 完整的 Apache Kafka 实用指南由 Bogdan Stashchuk

除此之外,您还可以尝试另一个 Kafka CLI 的开源替代方案,KafkaCat。完成课程后,我可能会尝试这个。

  • KafkaCat-github

  • 使用 KafkaCat 调试


如果您喜欢我的笔记,或者它们带来了一些价值,我很高兴在Twitter 上与您联系!。 😃


Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐