一、KafKa概述

1.1 定义

KafKa是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据试试处理领域

是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

名称解释
Broker消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
TopicKafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
Producer消息生产者,向Broker发送消息的客户端
Consumer消息消费者,从Broker读取消息的客户端
ConsumerGroup每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息
Partition物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的

2 kafka&zookeper安装/配置/启动

2.1 kafka下载和安装

2.1.1 安装包下载
本次安装的kafka版本是3.5.0, 自带zookeeper
https://archive.apache.org/dist/kafka/3.5.0/kafka_2.12-3.5.0.tgz

# 1 下载安装包到/opt目录下
# 2 进行解压 tar -xzvf kafka_2.12-3.5.0.tgz -C /opt
2.1.2 准备集群节点
192.168.190.147 node1
192.168.190.145 node2
192.168.190.142 node3

2.2 kafka&zookeeper配置

先以node1基点为例进行配置

2.2.1 server.properties配置

先以node1基点为例进行配置

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
#

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0  # 每个kafka服务器是一个broker,如果kafka存在集群则注意填写该值

############################# Socket Server Settings #############################

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://node1:9092  # 这里用来配置本节点的kafka服务的监听端口

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/opt/kafka/kafka-logs  # kafka的log文件实际存储的是kafka的topic中的消息文件

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=node1:2181,node2:2181,node3:2181  # 这里用于配置zookeeper的地址和端口, 如果存在zk集群,则填写集群的多个ip:port

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
2.2.2 zookeeper.properties配置
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/opt/kafka/kafka_2.12-3.5.0/zookeeper/data  # 定义zk的数据存储目录,需要手动 mkdir 
dataLogDir=/opt/kafka/kafka_2.12-3.5.0/zookeeper/log  # 定义zk的日志存储目录,需要手动 mkdir
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

#设置连接参数,添加如下配置
#为zk的基本时间单元,毫秒
tickTime=2000
#Leader-Follower初始通信时限 tickTime*10
initLimit=10
#Leader-Follower同步通信时限 tickTime*5
syncLimit=5

# #设置broker Id的服务地址
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
2.2.3 创建myid文件
在zookeeper.properties配置的数据目录下(/opt/kafka/kafka_2.12-3.5.0/zookeeper/data),创建一个文本文件myid, 内容为每个zookeeper节点的编号。因为是3个节点kafka集群,所以zookeeper集群也是3个节点,他们的编号分别是1、2、3.

经过以上三个步骤就配置好了node1节点,按照同样的方法配置node2和node3节点。

2.3 kafka&zookeeper的启动

# 注意要先启动zookeeper再启动kafka

# 启动zookeeper
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties

# 启动kafka
./kafka-server-start.sh -daemon ../config/server.properties


# 停止zookeeper
./zookeeper-server-stop.sh

# 停止kafka
./kafka-server-stop.sh

3 kafka相关操作命令

3.1 创建kafka的topic

./kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 1 --partitions 1 --topic test
# 1 说明:
# --bootstrap-server 192.168.190.136:9092 表示kafka地址和端口
#--zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可/ 2.20废弃!!!!!!!!!!!!!
#--replication-factor:定义分区副本数,1 代表单副本,建议为 2 
#--partitions:定义分区数 
#--topic:定义 topic 名称

3.2 查看topic列表

./kafka-topics.sh --list --bootstrap-server node1:9092
# 2 说明:
	--bootstrap-server node1:9092 表示kafka地址和端口

3.3 查看topic信息

./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic test

3.4 创建kafka生产者

kafka自带了一个producer的命令客户端,可以从本地文件中读取内容,或者我们也可以一命令行中直接输入内容,并将这些内容以消息的形式送到kafka集群中。在默认情况下,没一行会被当成一个独立的消息。使用kafka的发送消息的客户端,指定发送到kafka服务器地址和topic

# 创建一个kafka控制台的生产者,创建后可在控制台直接输入信息发送到对应的topic下
./kafka-console-producer.sh --broker-list node1:9092 --topic test

3.5 创建kafka消费者

# 创建一个kafka控制台的消费者,创建后可直接取对应的topic的数据并输出
./kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test --from-beginning

#--from-beginning:会把主题中以往所有的数据都读取出来

3.6 查看消费者组列表

./kafka-consumer-groups.sh --bootstrap-server node1:9092 --list

3.7 查看kafka中某一个消费者组的消费情况

./kafka-consumer-groups.sh --bootstrap-server node1 --group console-consumer-6920 --describe

# console-consumer-6920: 表示消费者组
# 返回如下
GROUP TOPIC PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID HOST  CLIENT-ID
console-consumer-6920 test      0          -               12              -               console-consumer-e162ef6d-600f-45d1-8198-af7e230642a7 /192.168.190.136 console-consumer

4 zookeeper相关命令

4.1 连接zookeeper

# ./zookeeper-shell.sh zookeeper_server:port --> 连接zookeeper
./zookeeper-shell.sh node1:2181

# 不进入zookeeper执行相关指令
./zookeeper-shell.sh node1:2181 ls /                  # 查看当前 ZooKeeper 中所包含的内容
./zookeeper-shell.sh node1:2181 ls -s /			   # 查看当前节点数据
./zookeeper-shell.sh node1:2181 ls /brokers/ids       # 
./zookeeper-shell.sh node1:2181 ls /brokers/topics
./zookeeper-shell.sh node1:2181 get /brokers/ids/0

4.2 查看当前Zookeeper中所含的内容

# 查看当前Zookeeper中所含的内容
ls /

4.3 查看当前节点数据

ls -s /
# 返回如下
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
cZxid = 0x0                               # 数据节点创建时的事务 ID
ctime = Wed Dec 31 16:00:00 PST 1969 	  # 数据节点创建时的时间
mZxid = 0x0 							# 数据节点最后一次更新时的事务 ID
mtime = Wed Dec 31 16:00:00 PST 1969 	  # 数据节点最后一次更新时的时间
pZxid = 0x200000057                       # 数据节点的子节点最后一次被修改时的事务 ID
cversion = 18						    # 子节点的更改次数
dataVersion = 0						    # 节点数据的更改次数
aclVersion = 0                            # 节点的 ACL 的更改次数
ephemeralOwner = 0x0                      # 如果节点是临时节点,则表示创建该节点的会话的 SessionID;如果节点是持久节点,则该属性值为 0
dataLength = 0                            # 数据内容的长度
numChildren = 12                          # 数据节点当前的子节点个数

4.4 创建节点

#创建序列化永久节点:
create -s /testnode test
#创建临时节点
create -e /testnode-temp testtemp
#创建永久节点:
create /testnode-p testp

4.5 获取节点

ls path [watch]
get path [watch]
ls -s path [watch]

4.6 修改节点

ls -s /
get -s /testnode-temp
set /testnode-temp 123
get -s /testnode-temp

4.7 监听节点

get /testnode-temp watch
set  /testnode-temp testwatch
#他会回调Watch得到触发结果

4.8 删除节点

#普通删除的命令
delete path [version]
#递归删除的命令
rmr path [version]

deleteall path

二、 知识点

1 主题Topic

​ 在逻辑的对消息的种类进行划分

2 分区partition

当Topic中的消息非常多的时候,会导致kafka的log日志文件特别大(kafka的日志文件存储的是消息)。

2.1 分区的作用

  • 可以分布式存储
  • 可以并行写

2.2 实例说明1

  • 为一个主题创建多个分区
# 创建一个名为test的Topic, 并为其创建2个partition
./kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 1 --partitions 2 --topic mytest
  • 查看topic的分区信息
./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic mytest


# 返回
root@ubuntu:/kafka/bin# ./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic mytest
Topic: mytest   TopicId: ulEszEwcT0i3Iu5TyfIglg PartitionCount: 2       ReplicationFactor: 1    Configs: 
        Topic: mytest   Partition: 0    Leader: 2       Replicas: 2     Isr: 2
        Topic: mytest   Partition: 1    Leader: 1       Replicas: 1     Isr: 1

  • 查看kafka日志文件
当设置mytest的partition==2的时候,会在node1上创建一个mytest-1, 在node2上创建一个mytest-0. 因为kafka的是集群搭建,所以在给mytest创建分区的时候会随机分配到集群中的两个节点上。
可以看到 kafka默认的toipic:__consumer_offsets,具有50个partition,分散在node1和node2的节点上。。

# node1节点
drwxr-xr-x 29 root root 4096 Aug 10 02:21 ./
drwxr-xr-x  4 root root 4096 Aug  7 21:18 ../
-rw-r--r--  1 root root    0 Aug  7 21:18 cleaner-offset-checkpoint
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-1/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-11/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-13/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-15/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-17/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-19/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-21/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-23/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-25/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-27/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-29/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-3/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-31/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-33/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-35/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-37/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-39/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-41/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-43/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-45/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-47/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-49/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-5/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-7/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-9/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 faust-group-01-__assignor-__leader-0/
-rw-r--r--  1 root root    0 Aug  7 21:18 .lock
-rw-r--r--  1 root root    4 Aug 10 02:21 log-start-offset-checkpoint
-rw-r--r--  1 root root   88 Aug 10 02:16 meta.properties
drwxr-xr-x  2 root root 4096 Aug 10 02:18 mytest-1/
-rw-r--r--  1 root root  657 Aug 10 02:21 recovery-point-offset-checkpoint
-rw-r--r--  1 root root  657 Aug 10 02:21 replication-offset-checkpoint


# node2节点
root@ubuntu:/opt/kafka/kafka_2.12-3.5.0/bin# ll /opt/kafka/kafka-logs/
total 144
drwxr-xr-x 32 root root 4096 Aug 10 02:21 ./
drwxr-xr-x  4 root root 4096 Aug  7 21:18 ../
-rw-r--r--  1 root root    0 Aug  7 21:18 cleaner-offset-checkpoint
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-0/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-10/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-12/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-14/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-16/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-18/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-2/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-20/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-22/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-24/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-26/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-28/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-30/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-32/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-34/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-36/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-38/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-4/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-40/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-42/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-44/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-46/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-48/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-6/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 __consumer_offsets-8/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 example-0/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 faust-group-02-__assignor-__leader-0/
-rw-r--r--  1 root root    0 Aug  7 21:18 .lock
-rw-r--r--  1 root root    4 Aug 10 02:21 log-start-offset-checkpoint
-rw-r--r--  1 root root   88 Aug 10 02:18 meta.properties
drwxr-xr-x  2 root root 4096 Aug 10 02:18 myid-__assignor-__leader-0/
drwxr-xr-x  2 root root 4096 Aug 10 02:18 mytest-0/
-rw-r--r--  1 root root  703 Aug 10 02:21 recovery-point-offset-checkpoint
-rw-r--r--  1 root root  703 Aug 10 02:21 replication-offset-checkpoint
drwxr-xr-x  2 root root 4096 Aug 10 02:21 wzp-0/

2.2 实例说明2

​ kafka每个消费者会维护自己消费的主题的偏移量offset, 并把这个offset提交给kafka内部的topic: __consumer_offsets, 提交过去的时候,key是consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清理topic中的消息,最后就保留最新的那条数据。

​ 因为_consumer_offsets可能会接收高并发的请求,kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),这样可以通过加机器的方式增大并发。

​ 通过如下公式可以选出consumer消费的offset要提交到_consumer_offsets的哪个分区。

​ 公式: hash(consumerGroupId) % __consumer_offsets主题的分区数

3 副本replication

副本是对分区的备份。在集群中,不同的副本会被部署到不同的broker上。同一个分区可以设定多个相同的副本,其中一个叫做 leader,其他叫做 follower,生产者和消费者只和 leader 交互,生产者生产的消息发送到 leader 之后,其他 follower 才能同从 leader 中同步消息。当 leader 发生故障的时候会从 follower 中选择一个称为新的leader。

Kafka 的多分区(Partition)以及多副本(Replica)机制有什么好处呢?

Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。

Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能力,不过也相应的增加了所需要的存储空间。

# 创建 1个topic 2个分区 3个副本
./kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 3 --partitions 2 --topic mytest01


# 返回
root@ubuntu:/kafka/bin# ./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic mytest01
Topic: mytest01 TopicId: MSmYL0CXStu0g_29irzH3A PartitionCount: 2       ReplicationFactor: 2    Configs: 
        Topic: mytest01 Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: mytest01 Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2

# 说明
mytest01的0分区的leader在node2节点上
mytest01的1分区的leader在node1节点上


# leader:副本里的概念
每个partition都有一个broker作为leader
生产者和消费者的读写请求都是发生在leader所在的分区,而不是副本上

# fllower: leader处理所有针对这个partition分区的读写请求。follower被动复制leader,不提供读写

10 如何保证消息的消费顺序

Kafka 通过以下机制来保证消费的顺序性:

  1. 分区: Kafka 中的每个主题可以分为多个分区,每个分区都只由一个消费者进行消费。分区是 Kafka 中实现水平扩展和并行处理的基本单位。对于同一分区内的消息,它们的顺序是有序的,所以确保每个分区只被一个消费者处理可以保证该分区内消息的顺序性。
  2. 偏移量(Offset): 每个分区中的消息都有一个唯一标识符,称为偏移量(Offset)。偏移量表示消息在分区内的顺序。消费者可以通过指定偏移量来读取分区中的特定消息。Kafka 使用偏移量来记录消费者的消费进度,并在消费者恢复或重启时从上一次的偏移量处继续消费,保证了消息的有序性。
  3. 消费者组: 多个消费者可以组成一个消费者组,并同时消费同一个主题的不同分区。当启动多个消费者时,Kafka 会自动分配分区给不同的消费者,保证每个分区只由一个消费者处理。这样,每个消费者负责消费自己所分配的分区,而不会出现多个消费者同时消费同一个分区的情况,从而保证了消费的顺序性。

需要注意的是,Kafka 只能在同一个分区内保证消息的顺序性,而不同分区之间的顺序是无法保证的。如果需要对多个分区的消息进行有序处理,可以通过将多个分区的数据合并到一个分区中,或使用其他流处理工具(如 Apache Flink 或 Apache Spark Streaming)来实现。

总结起来,Kafka 通过分区、偏移量和消费者组等机制来保证消费的顺序性。每个分区内的消息是有序的,而消费者组确保每个分区只由一个消费者处理,从而保证了消费整体上的顺序性。

11 zookeeper在kafka中的作用

用于进行broker和topic的注册

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐