一、简介

  1. Kafka 是一个分布式流处理平台由 LinkedIn 公司开发的,遵循 Apache 开源协议。
  2. Kafka 主要是用来处理实时数据流,可以发布、订阅、存储和处理数据。

应用场景:

  1. 日志收集:用于分布式日志系统,例如 ELK。
  2. 消息系统:可以将 Kafka 作为消息队列使用。
  3. 流处理:将 Kafka 与 Flink 或 Spark 等流处理引擎配合使用。

二、架构介绍

1. 组件

  • Producer:发送数据到 Kafka 集群。
  • Consumer:从 Kafka 集群消费数据。
  • Broker:Kafka 集群中的每个服务器就叫做 Broker。
  • Topic:物理上不同的消息类别;逻辑上一个 Topic 包含多个 Partition。
  • Partition:物理上的概念,每个 Partition 对应一个文件夹,该文件夹下存储着该 Partition 的所有消息。
  • Offset:Kafka 采用了分布式的提交日志机制,消费者消费数据时会记录已经消费的位置,即 Offset。
  • ZooKeeper:Kafka 使用 ZooKeeper 来存储集群的配置信息,及 Broker、Producer、Consumer 等各种节点的状态信息。

2. 集群

  • Kafka 集群由多个 Broker 组成,每个 Broker 在集群中都有一个唯一的编号。
  • 一个 Broker 可以容纳多个 Partition,同一个 Topic 的不同 Partition 分散到不同的 Broker 上组成分布式集群。
  • Kafka 集群会自动调整 Partition 的数量,并将 Partition 均匀分配给各个 Broker。

3. 数据存储结构

  • Kafka 消息被保存在 Partition 中,每个 Partition 对应一个目录,里面是多个 Segment,Segment 文件的大小和发送速率有关系。一个 Partition 有多个 Segments 是因为 Kafka 采取了文件系统批量读写机制

代码示例:

public class KafkaDemo {

    public static void main(String[] args) {

        //1.创建生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(getProperties());

        //2.创建消息
        ProducerRecord<String, String> record = new ProducerRecord<>("myTopic", "key", "value");

        try {
            //3.发送消息
            producer.send(record).get();
            System.out.println("Sent message successfully");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            //4.关闭连接
            producer.close();
        }

    }

    /**
     * 获取Kafka配置信息
     *
     * @return 配置信息
     */
    private static Properties getProperties() {
        Properties props = new Properties();

        //设置Kafka地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        //设置消息Key和Value的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        return props;
    }
}

三、Kafka消息传递原理

1. 消息生产者

Kafka生产者将数据以消息的形式发送到Kafka集群。生产者可以将消息发送到一个指定的主题(topic),也可以选择在发送时指定分区(partition)。当生产者需要发送消息时,它先与Kafka集群上的一个Broker建立TCP连接,然后将消息发送到该Broker。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) throws InterruptedException {

        // 配置Kafka生产者属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建Kafka生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息到指定主题
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), "Hello World-" + i));
            Thread.sleep(1000); // 每秒发送一条消息
        }

        producer.close(); // 关闭Kafka生产者实例
    }
}

2. 消息消费者

Kafka消费者从Kafka集群中的一个或多个分区中消费消息。消费者可以随时订阅一个或多个主题,并在每个主题中定位到特定分区。

import org.apache.kafka.clients.consumer.*;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group"); // 与生产者所在的组相同
        props.put("enable.auto.commit", "true"); // 自动提交偏移量
        props.put("auto.commit.interval.ms", "1000"); // 自动提交偏移量的时间间隔
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建Kafka消费者实例
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic")); // 订阅指定主题

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                // 处理消息
                System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());
            }
        }

        // consumer.close();
    }
}

3. 主题与分区

Kafka的主题(topic)是Kafka用于区分消息类型和类别的单位。每个主题都由一个或多个分区(partition)组成,分区是存储在Kafka集群中不同节点上的数据容器。每个主题的消息可以分布在不同的分区中。

4. 副本机制

Kafka的副本机制是为了保证消息的高可用性和数据的持久性。当一个分区的消息被发送到Kafka集群后,它会被复制到多个副本(replica)中。每个分区都有一个或多个副本,其中有且仅有一个被标记为“首领副本”(leader replica),负责读写该分区的数据。其他副本被称为“追随者副本”(follower replica),它们只能复制首领副本的数据,并借此提高系统的可靠性和容错性。

四、消息传递过程

1. 消息发送流程

消息发送者将消息发送到Kafka主题(topic),然后由Kafka Producer将消息分区并写入到Broker中的指定分区中。在发送消息之前,Producer需要从Zookeeper中获取集群元数据信息,包括Broker列表和主题分区的信息。具体流程如下:

  1. 消息发送者通过Producer API将消息发送到指定topic中。

    String topic = "test_topic";
    String message = "Hello, Kafka!";
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
    producer.send(record);
    
  2. Producer根据消息的key值使用Partitioner算法将同一个key的消息发送到同一个分区里,保证消息的有序性。

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                Cluster cluster) {
        int numPartitions = cluster.partitionCountForTopic(topic);
        if (keyBytes == null) {
            Random random = new Random();
            return random.nextInt(numPartitions);
        }
        int hash = Utils.murmur2(keyBytes);
        return hash % numPartitions;
    }
    
  3. Producer将记录存储在缓冲区中,如果缓冲区已满,则会调用send方法将缓冲区中的内容批量发送到Broker中。

2. 消息存储流程

消息被存储在Broker的一个或多个分区中,分区中的每条消息都有一个唯一的偏移量(offset),并按照其他参数(如消息的时间戳)进行排序存储。当Consumer消费分区中的消息时,可以根据偏移量来读取消息,保证消息的顺序性。

在Broker上保存的消息是以一种高效而紧凑的格式进行编码的,称为RecordBatch,它可以将多个Producer的相关记录分组在一起,以便有效地压缩提交到Broker的数据传输量。

3. 消息消费流程

Consumer订阅并读取特定主题(topic)的消息。消费者(Consumer)从Broker中拉取特定分区的消息,并对其进行处理。具体流程如下:

  1. 消费者向Kafka集群发送Fetch请求,获取数据。

  2. Broker收到Fetch请求后,从指定的分区和偏移量(offset)开始读取消息,然后将数据返回给Consumer。

  3. Consumer获取到数据后,进行处理和消费,同时记录每个Partition的下一个可拉取的偏移量,并定期将其提交到Zookeeper中,用于在Consumer发生故障或重启后重新读取未被处理的消息。

String topicName = "test_topic";
String groupId = "test_group";
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", groupId);
props.setProperty("auto.commit.enable", "false");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", 
          record.offset(), record.key(), record.value());
    }
    consumer.commitSync();
}

五、性能优化

1.硬件方面的优化

磁盘

  • 在使用Kafka时,建议使用SSD磁盘,因为SSD的I/O性能比HDD磁盘更好。
  • 另外,可以使用多块磁盘,将Kafka数据分散到不同的磁盘上,以减少单个磁盘的负担。

内存

  • 将足够的内存分配给Kafka Broker进程,以便它可以缓存更多的消息。
  • 在Kafka Producer写入消息时,可以开启压缩功能,减少传输的数据量,节省内存空间。

CPU

  • Kafka Broker对CPU的要求通常不高,但在高负载下还是需要注意CPU的使用率。
  • 在多核CPU的机器上,可以通过增加Broker实例数或者增加分区数来充分利用CPU资源。

2.Kafka配置优化

Producer端

  • acks参数:设置消息确认的级别。acks=0表示不等待服务器确认;acks=1表示只需得到Kafka集群中一台服务器的确认;acks=all表示需要得到Kafka集群中全部服务器的确认。确认级别越高,消息的耗时就会增加,但是可以提供更好的数据安全性。
  • batch.size参数:设置批处理大小。较小的批处理大小可以降低延迟,但也会增加CPU开销。建议根据实际情况调整该参数。
  • compression.type参数:设置压缩方式。可选的压缩方式包括none(默认)、gzip、snappy和lz4。生产者使用压缩功能可以减少传输的数据量,提高传输效率。

Broker端

  • message.max.bytes参数:设置单个消息的最大大小。如果消息大小超过了该限制,Kafka将拒绝该消息。
  • num.io.threads参数:设置Broker处理I/O请求的线程数。增加该参数的值可以提高Broker的并发能力,但是也会增加CPU的使用率。

3. 消费者优化

  • Group ID:消费者组是Kafka中消费者的逻辑分组,对于同一组内的消费者来说,每个分区只会被其中一台消费者消费。因此,合理设置Group ID可以提高消费者的有效性。
  • Fetch Size:每次从Kafka Broker读取的消息数量。过大的Fetch Size会导致消费端的延迟增加,过小的Fetch Size则会增加网络开销。可以通过调节该参数来达到最佳的消费效率。
  • 处理策略:消费者有两种处理消息的方式,即poll()和push()。其中,poll()需要由应用主动调用,而push()则是在后台由Kafka的Consumer线程自动触发。通常情况下,使用poll()比push()更为灵活,适用于大部分场景。

4. 代码示例

以下是Kafka Producer端实现批量写入消息的Java代码示例:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerSample {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("batch.size", 16384);

        Producer<String, String> producer = new KafkaProducer<>(props);
        String topic = "myTopic";
        for (int i = 0; i < 10000; i++) {
            String msg = "My message No." + i;
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
            producer.send(record);
        }
        producer.close();
    }
}

六、Kafka的优缺点

1. Kafka的优点

  • 高吞吐量、低延迟: Kafka通过partition和consumer group的概念来实现负载均衡,支持分布式部署,能够实现高吞吐量和低延迟。
  • 高扩展性: Kafka集群内所有节点都是对等的,新的节点可以很容易地加入到集群中,扩展集群的容量,并且不会中断已经运行的服务。
  • 持久化存储: Kafka数据以文件的形式保留在磁盘中,可靠性较高,即使一些节点失效,数据依然不会丢失,非常适合大规模数据的持续存储和离线分析处理。
  • 可靠性高: Kafka支持数据备份和副本机制,通过数据的复制和备份来提高其稳定性,保障数据不会丢失。
  • 消息传输具有多样性: Kafka支持多种协议的传输,可以与不同类型的应用程序集成,例如支持HTTP RESTful API,各种编程语言的客户端和其他一些补充工具等。

2. Kafka的缺点

  • 部署和配置较为复杂: Kafka的集群需要进行配置和部署,需要一定的技术力量,对于较小的企业来说,可能需要投入大量的精力和时间才能完成部署和配置。
  • 需要对数据进行处理: Kafka只是一个消息传递平台,不直接对数据进行处理,需要用户自己编写代码进行数据处理,因此需求较高的技术人员才能使用。
  • 没有自动管理: Kafka集群需要手动做一些管理,例如,当某个节点失效后,需要重新平衡partition的负载等。

七、Kafka的应用案例

Kafka是一个开源的分布式消息系统,在大数据领域有着广泛的应用。下面介绍Kafka的三个应用案例。

1. 网络爬虫

网络爬虫的核心功能是从互联网上抓取数据并进行分析或保存。Kafka可以作为网络爬虫的消息队列,负责将被爬取数据传输给爬虫程序。当网络爬虫处理完数据后,将数据发送到Kafka中,供后续处理程序使用。

具体实现时,需要先创建一个名为spider的Kafka主题,然后在爬虫程序中编写生产者代码,将爬取到的数据发送至该主题。以下是Java示例代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class SpiderProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer =
            new KafkaProducer<String, String>(props);

        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>(
                "spider", Integer.toString(i), "data-" + Integer.toString(i)));

        producer.close();
    }
}

2. 数据统计

Kafka除了可以作为消息队列,还可以作为数据缓存,可以处理大量的数据流。在数据统计过程中,Kafka既可以作为生产者将收集到的数据发送到主题中,也可以作为消费者从主题中获取数据并进行分析、统计等操作。具体实现时,需要先创建一个名为data的Kafka主题,然后在收集数据的程序中编写生产者代码,将数据发送至该主题。再在处理数据的程序中编写消费者代码,从该主题中获取数据以完成数据统计。

以下是Java示例代码:

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class DataConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer =
            new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList("data"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n",
                    record.offset(), record.key(), record.value());
        }
    }
}

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class DataProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer =
            new KafkaProducer<String, String>(props);

        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>(
                "data", Integer.toString(i), "data-" + Integer.toString(i)));

        producer.close();
    }
}

3. 实时监控

Kafka可以在实时监控中作为传输媒介,将源数据流发送到消费者,以满足分布式的需求。具体实现时,需要先创建一个名为metrics的Kafka主题,然后在生产者程序中将监控数据发送至该主题。再在监控中心中编写消费者代码,从该主题中获取数据并进行分析、监控等操作。

以下是Java示例代码:

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class MetricsConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer =
            new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList("metrics"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n",
                    record.offset(), record.key(), record.value());
        }
    }
}

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class MetricsProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer =
            new KafkaProducer<String, String>(props);

        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>(
                "metrics", Integer.toString(i), "metrics-" + Integer.toString(i)));

        producer.close();
    }
}
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐