Apache Kafka 官方文档

1. 介绍

Apache Kafka 是一个分布式事件流平台,用于构建高性能的数据管道、流式分析和关键任务应用程序。它由 Apache 软件基金会开发,使用 Java 和 Scala 编写,旨在提供高吞吐量、低延迟的实时数据处理能力。截至2025年4月29日,最新版本为 Kafka 4.0.0,引入了多项新功能,如默认使用 KRaft 协议替代 ZooKeeper 进行元数据管理。

1.1 关键概念

  • 主题(Topics):数据流的分组,类似于数据库中的表,用于组织消息。
  • 分区(Partitions):主题的子集,用于并行处理和扩展,每个分区是一个有序的不可变记录序列。
  • 生产者(Producers):向主题发送数据的应用程序。
  • 消费者(Consumers):从主题读取数据的应用程序。
  • 代理(Brokers):运行 Kafka 的服务器,组成集群,负责存储和管理分区。
  • ZooKeeper:用于协调和管理 Kafka 集群(在 Kafka 4.0 中,KRaft 协议成为默认选择)。
  • 消费者组(Consumer Groups):一组消费者共同处理主题中的数据,每个消费者处理部分分区。
  • 副本(Replicas):分区的备份,确保数据的高可用性。

1.2 用例

Kafka 的灵活性使其适用于多种场景,包括:

  • 消息队列:替代传统消息代理(如 ActiveMQ 或 RabbitMQ),提供更高的吞吐量。
  • 网站活动跟踪:实时捕获用户行为数据,用于分析和推荐系统。
  • 日志聚合:从分布式系统收集日志,集中存储和处理。
  • 流处理:结合 Kafka Streams 或其他框架进行实时数据处理。
  • 事件溯源:记录应用程序状态变化,支持回溯和审计。
  • 指标监控:收集和处理系统性能指标。

1.3 核心特性

  • 高吞吐量:每秒处理数百万条消息。
  • 低延迟:消息传递延迟低至 2 毫秒。
  • 容错性:通过分区副本确保数据可靠性。
  • 可扩展性:通过增加代理和分区实现水平扩展。

2. 架构

Kafka 的架构基于分布式日志设计,数据存储在主题的分区中,分区分布在多个代理上以实现高可用性和可扩展性。以下是详细的架构说明。

2.1 组件

Kafka 的核心组件包括:

  • 生产者(Producers):负责将消息发布到主题,可以选择特定的分区。
  • 消费者(Consumers):订阅主题并从分区读取消息,通常以消费者组的形式工作。
  • 主题(Topics):消息的逻辑分类,每个主题包含多个分区。
  • 分区(Partitions):主题的物理存储单元,分布在代理上,支持并行处理。
  • 代理(Brokers):Kafka 集群中的服务器,存储分区并处理生产者和消费者的请求。
  • ZooKeeper:传统上用于管理集群元数据和协调代理(Kafka 4.0 默认使用 KRaft)。
  • 消费者组(Consumer Groups):允许多个消费者协同处理主题的消息,每个分区仅由组内一个消费者处理。
  • 副本(Replicas):分区的备份,包括一个领导者(Leader)和多个跟随者(Followers),确保容错。

2.2 关系

Kafka 组件之间的关系可以通过以下类图表示,展示了生产者、消费者、主题、分区、代理和 ZooKeeper 的交互:

sends to
contains
leader
followers
reads from
manages
Producer
+sendMessage(topic, partition, message)
Topic
+name
+partitions : List
Partition
+id
+leader : Broker
+followers : List
Broker
+id
+topics : List
Consumer
+readMessage(topic, partition)
ZooKeeper
+manageBrokers()

2.3 数据流

Kafka 的数据流涉及生产者将消息发送到主题(存储在代理的分区中),消费者从分区读取消息。以下是详细的流程。

生产者消息流
  1. 生产者连接到 Kafka 代理。
  2. 生产者向特定主题(可选指定分区)发送消息。
  3. 代理将消息存储在对应的分区中。
  4. 代理向生产者确认消息存储成功。

生产者序列图

Producer Broker Partition connect send message to topic store message acknowledge success Producer Broker Partition
消费者消息流
  1. 消费者连接到 Kafka 代理。
  2. 消费者订阅一个或多个主题。
  3. 代理根据消费者组分配分区。
  4. 消费者从分配的分区中获取消息。
  5. 代理将消息发送给消费者。

消费者序列图

Consumer Broker Partition connect subscribe to topic assign partitions fetch messages from partition return messages Consumer Broker Partition

2.4 可扩展性和容错性

  • 可扩展性:Kafka 通过将主题分区分布在多个代理上实现并行处理。增加代理或分区可以轻松扩展集群。
  • 容错性:每个分区有多个副本(一个领导者和多个跟随者)。如果领导者代理失败,ZooKeeper(或 KRaft)会选举新的领导者,确保数据可用性。

2.5 KRaft 协议

从 Kafka 2.8 开始,Kafka 引入了 KRaft(Kafka Raft)协议,用于替代 ZooKeeper 进行元数据管理。KRaft 使用 Raft 一致性算法,提供以下优势:

  • 简化架构:无需单独的 ZooKeeper 集群。
  • 提高性能:元数据操作更快。
  • 增强可扩展性:支持更大的集群规模。

Kafka 4.0 默认使用 KRaft,推荐新部署采用此模式。

2.6 集群架构

Kafka 集群由多个代理组成,每个代理存储部分主题的分区。以下是集群的简化架构图:

Topic1 Partitions
Kafka Cluster
ZooKeeper
Partition 1
Partition 2
Broker 1
Broker 2
Broker 3
ZooKeeper
Producer
Consumer

3. 安装与设置

3.1 系统要求

  • Java 版本
    • Kafka 代理、Connect 和工具:Java 17 或更高版本。
    • Kafka 客户端和 Streams:Java 11 或更高版本。
  • 操作系统:Linux、Windows 或 macOS。
  • 硬件:至少 4GB 内存,建议多核 CPU 和 SSD 存储。

3.2 下载

Kafka 4.0.0 可从以下链接获取:

3.3 安装步骤

  1. 下载并解压 Kafka 二进制包。
  2. 配置环境变量(如 JAVA_HOME)。
  3. 修改 config/server.properties 文件,设置代理 ID 和监听地址。

3.4 快速入门

以下是快速设置和运行 Kafka 的步骤:

  1. 启动 ZooKeeper(如果不使用 KRaft):
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
  2. 启动 Kafka 服务器
    bin/kafka-server-start.sh config/server.properties
    
  3. 创建主题
    bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
    
  4. 发送消息
    bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
    
  5. 消费消息
    bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
    

4. 配置

Kafka 的配置通过属性文件或程序化方式完成。以下是关键配置参数。

4.1 服务器配置

config/server.properties 中配置:

参数 描述 默认值
broker.id 代理的唯一 ID 0
listeners 客户端连接的监听器 PLAINTEXT://:9092
num.partitions 新主题的默认分区数 1
log.retention.hours 日志保留时间 168
zookeeper.connect ZooKeeper 连接地址(KRaft 模式无需配置) localhost:2181

4.2 客户端配置

生产者和消费者的配置包括:

  • 生产者
    • bootstrap.servers:代理地址列表。
    • key.serializer:键序列化器。
    • value.serializer:值序列化器。
  • 消费者
    • group.id:消费者组 ID。
    • key.deserializer:键反序列化器。
    • value.deserializer:值反序列化器。

4.3 最佳实践

  • 分区数:根据吞吐量需求设置,通常为 CPU 核心数的倍数。
  • 副本因子:至少设置为 2,以确保高可用性。
  • 日志保留:根据数据量和存储容量调整保留时间。

5. 操作 Kafka

5.1 启动与停止

  • 启动代理
    bin/kafka-server-start.sh config/server.properties
    
  • 停止代理
    bin/kafka-server-stop.sh
    

5.2 主题管理

  • 创建主题
    bin/kafka-topics.sh --create --topic <topic> --bootstrap-server <server> --partitions <num> --replication-factor <num>
    
  • 列出主题
    bin/kafka-topics.sh --list --bootstrap-server <server>
    
  • 描述主题
    bin/kafka-topics.sh --describe --topic <topic> --bootstrap-server <server>
    

5.3 监控

Kafka 提供 JMX 指标,用于监控集群健康状态。推荐工具包括:

  • Kafka Manager:开源的集群管理工具。
  • Confluent Control Center:商业化的监控解决方案。

5.4 扩容与故障恢复

  • 添加代理:更新 server.properties 并启动新代理。
  • 故障恢复:依赖副本机制,自动选举新的分区领导者。

6. 开发者指南

6.1 编写生产者

以下是使用 Java 编写生产者的示例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        producer.send(new ProducerRecord<>("test", "key", "value"));
        producer.close();
    }
}

6.2 编写消费者

以下是使用 Java 编写消费者的示例:

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

6.3 Kafka Streams

Kafka Streams 是一个用于流处理的客户端库,支持状态管理和窗口化操作。以下是一个简单的示例:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;

public class SimpleStream {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("application.id", "stream-app");
        props.put("bootstrap.servers", "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream("input-topic");
        stream.to("output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

6.4 Kafka Connect

Kafka Connect 是一个用于连接外部系统的数据导入/导出框架,支持多种连接器(如 JDBC、HDFS)。

7. API 文档

详细的 API 参考请查看 Kafka Clients Javadoc。支持的客户端语言包括 Java、Python(kafka-python)、C++(librdkafka)等。

8. 教程与示例

8.1 实时日志处理

使用 Kafka 收集分布式系统日志并进行实时分析。

8.2 流处理示例

使用 Kafka Streams 处理实时数据流,示例包括单词计数和数据过滤。

官方教程请参考 Kafka Tutorials

9. 常见问题解答

  • 如何优化 Kafka 性能?
    • 增加分区数以提高并行性。
    • 使用压缩(如 gzip)减少网络开销。
    • 调整生产者的批量大小和消费者拉取间隔。
  • 如何处理消息丢失?
    • 确保生产者使用 acks=all
    • 正确管理消费者偏移量提交。

更多问题解答请参考 Kafka FAQ

10. 发布说明

Kafka 4.0.0(2025年3月发布)引入了以下主要变化:

  • 默认使用 KRaft 协议,移除 ZooKeeper 依赖。
  • 消费者重新平衡协议改进(KIP-848)。
  • 升级要求:Java 11(客户端/Streams),Java 17(代理/工具)。

详细升级指南请查看 Kafka Release Notes

11. 社区与生态系统

12. 性能调优

12.1 生产者调优

  • 批量大小:增大 batch.size 以提高吞吐量,适合高频消息场景。
  • 延迟时间:调整 linger.ms,较长的延迟适合更多消息批处理。

12.2 消费者调优

  • 分区分配:消费者数量应小于或等于分区数,避免空闲消费者。
  • 拉取间隔:调整 max.poll.interval.ms 以平衡延迟和吞吐量。

12.3 代理调优

  • 负载均衡:监控代理负载,确保分区均匀分布。
  • 日志清理:优化 log.retention.byteslog.retention.hours

13. 局限性

  • 消息调整:频繁修改消息可能导致性能问题。
  • 主题选择:不支持通配符主题选择,需精确匹配。
  • 大消息:大消息会因压缩/解压缩影响性能。

14. 真实案例

  • CDC(美国疾控中心):使用 Kafka Streams 和 Connect 处理 COVID-19 数据。
  • Netflix:Keystone 数据管道每天处理超过 5000 亿事件。
  • LinkedIn:用于新闻动态和实时推荐。
  • Spotify:日志传递系统。
Logo

本社区面向用户介绍CSDN开发云部门内部产品使用和产品迭代功能,产品功能迭代和产品建议更透明和便捷

更多推荐