简介

本教程介绍了 Apache Kafka、Kafka Producer 和 Kafka Consumer,按照本文的思路,我们将构建两个简单的应用程序;一个作为生产者,在 Kafka 主题上生成消息,另一个作为消费者,使用来自同一主题的消息。

首先,让我们定义一些关键术语。

什么是阿帕奇卡夫卡?

Kafka 是一个分布式事件流平台,可跨多个渠道(移动、Web、IoT 设备等)提供对数据的高可用性和低端到端延迟访问。 Kafka 使用发布者-订阅者模式,其中消息的发送者,通常称为生产者,向主题发送消息,所有订阅者立即收到消息。在 Kafka 中,我们可以有多个生产者生产消息和多个消费者消费消息。

在当今世界,数百万台设备进行实时通信。例如,在物联网世界中,信号在不同设备之间实时发送。 Kafka 提供了一个平台,可以将信号以消息的形式从一个设备(生产者)发送到另一个设备(消费者)在其中消费消息的主题,这会以非常低的延迟发生。 Kafka有几种实现,下面列出了一些 -

  • 实时处理支付和金融交易,例如在证券交易所、银行和保险中。

  • 实时跟踪和监控汽车、卡车、车队和货运,例如物流和汽车行业。

  • 持续捕获和分析来自物联网设备或其他设备的传感器数据,例如工厂和风电场。

  • 收集并立即响应客户交互和订单,例如零售、酒店和旅游行业以及移动应用程序。

  • 对住院病人进行监护,预测病情变化,确保在紧急情况下及时救治。

  • 连接、存储和提供公司不同部门产生的数据。作为数据平台、事件驱动架构和微服务的基础。

资料来源:Apache Kafka 文档。

术语

  • Producer 是能够产生消息的任何设备或客户端应用程序 - 将其视为发射器;它也可能是物联网设备。

  • Consumer 是消费消息的设备或客户端应用程序。

  • Kafka中的一条消息叫做Record,记录是不可变的——记录一旦创建或推送就不能改变。

  • 生产者产生的消息存储在一个Topic中。主题类似于数据库中的表。

  • 主题被分区复制跨代理,即消息存储在不同代理的多个位置。

  • 分区中的每条消息都会获得一个名为 Offset 的增量 id。

  • 每条消息都有一个跨分区的唯一标识符,即一条消息可以通过分区 ID 和偏移量 id 来标识,例如partition-0, offset-1 表示分区号为 0 且偏移量为 1 的消息。

  • 消息在每个分区中按照偏移的顺序读取,即消费者A按照偏移id的自然顺序从partition-0读取。值得注意的是,消息是由消费者并行读取的。

  • 每个消费者都属于一个特定的消费者组,并且只从分区中读取消息。

  • Broker 类似于包含多个主题的服务器。每个代理都用一个 id 标识。

  • zookeeper 编排和管理 Kafka 集群中的不同代理。

截图 2022-06-09 at 21.30.00.pngkafka 集群架构

先决条件

在本教程中,您需要对以下内容有一个大致的了解

  • 弹簧靴

  • 码头工人

搭建Kafka环境

我们将使用 Docker 和 docker-compose 来设置我们的 Kafka 环境并构建应用程序。您可以按照官方文档了解如何在此处安装 docker并在此处查看 docker-compose。您还可以在此处克隆本教程中使用的 docker-compose 文件。克隆 repo 后,让我们看看 docker-compose .yaml 文件里面有什么。

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:6.2.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

该文件使用一些默认配置为 zookeeper 和代理服务器定义图像。我不会深入探讨 docker,只提及文件的一些关键组件。

zookeeper:
    image: confluentinc/cp-zookeeper:6.2.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  • image:这定义了 docker 要拉取的图像。

  • container_name:定义容器的名称。

  • ports:这定义了容器端口和主机端口。容器端口是容器运行的端口,而主机端口是主机上要绑定的端口。

broker:
    image: confluentinc/cp-server:6.2.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
  • image:这定义了 docker 要拉取的镜像。

  • container_name:定义容器的名称。

  • ports:这定义了容器端口和主机端口。容器端口是容器运行的端口,而主机端口是主机上要绑定的端口。

  • depends_on:这告诉 docker broker 依赖于 zookeeper,即 zookeeper 必须在 broker 之前运行。

创建生产者项目

接下来,我们将创建一个简单的生产者项目。让我们继续使用弹簧初始化程序来创建我们的项目。

截图来自 2022-06-09 22-58-03.png

从元数据部分。设置以下内容;

  • Group:出于本教程的目的,将其保留为默认的“com.example”。

  • Artifact:demo 名称:producer,这应该是项目名称,但对于教程,我们将其保留为 producer。

  • 包名:将为我们自动生成。包装:将其保留为 Jar

  • Java:选择 11,注意:你应该在你的机器上安装了这个。

在右侧窗格中添加 Lombok、Spring web 和 Spring for Apache Kafka。之后,单击生成您的项目,解压缩并在您喜欢的 IDE 上打开。如果你打开 pom.xml 文件,你应该有以下依赖。一个

<dependencies>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
   </dependency>

   <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
       <optional>true</optional>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
   </dependency>
   <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka-test</artifactId>
       <scope>test</scope>
   </dependency>
</dependencies>

创建消息对象

现在,让我们创建我们将推送到该主题的模型。我们将创建一个 java 类来表示我们的消息,这将在传输时转换为 JSON。我们将使用 Lombok 来减少我们课程中的样板代码。

src/main/java/com/example/producer/model/Message.java

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
   private String user;
   private String msg;
}

Kafka生产者配置

接下来,我们创建配置文件来定义 Kafka 生产者的配置。

src/main/java/com/example/producer/config/KafkaConfig.java

@Configuration
public class KafkaConfig {
   @Value("${kafka.broker}")
   private String brokerServer;

   // create a topic called chat
   @Bean
   public NewTopic chatTopic(){
       return TopicBuilder.name("chat")
               .build();
   }


   @Bean
    public ProducerFactory<String, Message> chatProducerFactory() {
       Map<String, Object> configs = new HashMap<>();
       configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , brokerServer);
       configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(configs);
     }

   @Bean
   public KafkaTemplate<String, Message> chatKafkaTemplate(){
       return new KafkaTemplate<>(chatProducerFactory());
   }
}

让我们看看 Kafkaconfig.java 中有什么。

我们使用@Configuration注释该类以在 spring 中创建一个构造型配置。

@Bean
public NewTopic chatTopic(){
    return TopicBuilder.name("chat")
        .build();
}

我们使用 TopicBuilder 构建器类命名和创建我们的主题。

@Bean
 public ProducerFactory<String, Message> chatProducerFactory() {
       Map<String, Object> configs = new HashMap<>();
       configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , brokerServer);
       configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
       return new DefaultKafkaProducerFactory<>(configs);
   }

这里需要注意的最重要的配置是代理服务器,它将在我们的 application.yaml 文件中定义为kafka.broker: localhost:9092。这指定了我们要连接的代理服务器。接下来,我们指定KEY_SERIALIZER_CLASS_CONFIG,它定义了密钥的序列化程序。我们使用 StringSerializer.class 因为我们的键是一个字符串。根据用例,您可以指定一些其他类,例如整数键的 IntegerSerializer 和VALUE_SERIALIZER_CLASS_CONFIG定义值的序列化程序,我们使用 JsonSerializer.class 因为我们将传递 JSON 作为我们的值,其他类型也可能基于用例,例如用于字符串值的 StringSerializer。然后我们使用配置来构造 DefaultKafkaProducerFactory。

src/main/java/com/example/producer/api/ChatController.java

@RestController
@RequestMapping("/api/v1/chats")
public class ChatController {

   @Autowired
   private KafkaTemplate<String, Message> kafkaTemplate;

   @PostMapping("send")
   public ResponseEntity<String> sendMessage(@RequestBody Message message){
       kafkaTemplate.send("chat", UUID.randomUUID().toString(),  message);
       return ResponseEntity.ok("Message sent");
   }
}

演示:推送消息到我们的主题

接下来,我们创建我们的控制器。在这里,我们将 KafkaTemplate 自动装配到类中。KafkaTemplate<String, Message>转换为KafkaTemplate <Key, Value>,Key 表示您在序列化程序中定义的类,Value 表示您打算转换为 JSON 的类。我们将从 kafkaTemplate 调用 send() 方法来发送消息。send()方法有不同的实现。kafkaTemplate.send("chat", UUID.randomUUID().toString(), message);这里,第一个参数是topicName,后跟一个key(唯一标识消息),然后是我们发送的消息对象。KafkaTemplate

我们将运行应用程序并将 URL 粘贴到邮递员上,如下面的屏幕截图所示。

2022-06-19 19-41-31.png 截图

设置消费者

现在,我们完成了我们的制作人。让我们开始创建我们的消费者,就像我们为生产者所做的那样。我们回到spring initializer,我们只需将项目名称更改为 consumer 并选择以前的配置。

截图来自 2022-06-09 23-02-08.png

创建消费者 JSON 对象

在其他消费来自主题的消息时,我们将创建在生产者服务中创建的相同消息类。

src/main/java/com/example/consumer/model/Message.java

@AllArgsConstructor
@NoArgsConstructor
public class Message {
   private String user;
   private String msg;
}

Kafka Consumer的配置

接下来,我们转到消费者配置

src/main/java/com/example/consumer/config/KafkaConfig.java

@Configuration
public class KafkaConfig {

    @Value("${kafka.broker}")
    private String brokerServer;

    @Bean
    public Map<String, Object> chatProducerFactory() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServer);
        configs.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        return configs;
    }

    @Bean
    public ConsumerFactory<String, Message> chatKafkaTemplate(){
        JsonDeserializer<Message> deserializer = new JsonDeserializer<>(Message.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);
        return new DefaultKafkaConsumerFactory<>(chatProducerFactory(), new StringDeserializer(), deserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Message> chatFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Message> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(chatKafkaTemplate());
        return factory;
    }
}

正如我们在生产者的配置中看到的,我们需要指定要连接的代理服务器。需要注意的是 JsonDeserializer。

JsonDeserializer<Message> deserializer = new JsonDeserializer<>(Message.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

与我们在生产者中声明为简单的 JsonSerializer 不同,我们将声明一个 Deserializer 和一些其他配置,我现在将解释这些配置。由于生产者和消费者在将 JSON 对象反序列化为 java 对象时存在于不同的应用程序中,因此 Kafka 会抛出错误“ErrorHandlingDeserializer”,这是因为反序列化器类不信任消费者中的 java 对象。另外,为了解决这个问题,我们告诉序列化程序信任所有的包(“*”),如果我们想指定一个特定的包或对象,我们只需指定包名,如下所示:

deserializer.addTrustedPackages("com.example.consumer.model.Message");

创建消费者服务

然后,我们为类创建服务以使用消息。

src/main/java/com/example/consumer/service/ConsumerService.java

@Service
public class ConsumerService {

    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final List<Message> messages = new ArrayList<>();

    @KafkaListener(topics = "chat", groupId = "chat-consumer", containerFactory = "chatFactory")
    private void chatConsumer(Message message){
        messages.add(message);
        logger.info("Received : {}", messages);
    }

}

现在,我们可以再次访问我们的生产者端点,以查看消费者从主题中获取消息。

2022-06-19 21-04-29.png 截图

消费者如何知道已读的消息?

如前所述,消费者组中的每个消费者都只从分区中读取;每条消息都由偏移 id 标识。每当来自消费者组的消费者开始消费来自特定主题的消息时,偏移量 id 都会由 Kafka 在内部提交给另一个主题**__consumer_offsets**,以记录消费者正在从哪里读取。在失败的情况下,消费者可以从中断的地方继续消费消息。

结论

在本教程中,我们了解了 Kafka、一些术语及其用途。我们还创建了一个生产者,对其进行配置,并向一个主题发送消息,并创建一个消费者,对其进行配置,并实时接收/消费从生产者推送到主题的消息。稍后,我们讨论了消费者在从失败中恢复时如何记住它停止的地方。

您可以在此存储库中找到本教程中使用的代码以进行后续操作。

Logo

云原生社区为您提供最前沿的新闻资讯和知识内容

更多推荐