kafka–基础–4.1–集成springboot–简单案例


1、代码

1.1、结构

在这里插入图片描述

1.2、代码

KafkaConsumer

package feizhou.kafka.demo1.business;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * @ClassName: KafkaConsumer
 * @Description 请描述下该类是做什么的
 * @Author feizhou
 * @Date 2025/10/7  16:04
 * @Verson 1.0
 **/
@Service
public class KafkaConsumer {

    // 监听test主题的消息

    @KafkaListener(topics = "test", groupId = "my-consumer-group")
    public void consumeMessage(String message, Acknowledgment ack) {
        System.out.println("收到消息 in group my-group: " + message);
        // 手动提交偏移量 - 消息处理成功后提交
        ack.acknowledge();
    }
//    // 批量消费手动提交
//    @KafkaListener(topics = "test", groupId = "my-consumer-group")
//    public void consumeBatchMessage(List<String> messages, Acknowledgment ack) {
//        try {
//            System.out.println("收到批量消息,数量: " + messages.size());
//
//            for (String message : messages) {
//                System.out.println("处理消息: " + message);
//                // 批量处理逻辑
//            }
//
//            // 批量处理完成后一次性提交
//            ack.acknowledge();
//            System.out.println("批量偏移量已提交");
//
//        } catch (Exception e) {
//            System.err.println("批量处理失败: " + e.getMessage());
//            // 不提交,整个批次会重新投递
//        }
//    }
}

application-kafka.yml

spring:
  kafka:
    bootstrap-servers: 192.168.187.89:9092   # Kafka集群地址
    # 生产者配置
    producer:
      compression-type: gzip   # 默认不压缩,可选值:"gzip", "snappy", "lz4", "zstd"
      acks: all   # 确认机制
      retries: 3   # 发送失败后重试次数
      retry-backoff-ms: 500    # 每次重试之间的间隔时间(单位:毫秒)
      enable-idempotence: true   # 启用幂等性
      linger-ms: 5   # 延迟发送的时间,单位毫秒,默认 0ms(立即发送)
      batch-size: 32768   # 生产者每个批次发送的最大字节数,默认 16KB
      buffer-memory: 67108864   # 生产者缓冲区内存大小,单位字节,默认32MB
      key-serializer: org.apache.kafka.common.serialization.StringSerializer   # 键的序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer   # 值的序列化器
      transactional-id-prefix: tx-   # 启用事务时的前缀,用于事务标识
      # 生产者拦截器,用于记录日志和指标收集
      interceptors:
        - org.apache.kafka.clients.producer.internals.LogAndMetricsProducerInterceptor
    # 消费者配置
    consumer:
      group-id: my-consumer-group   # 消费者组ID
      concurrency: 10   # 设置消费者并发数
      enable-auto-commit: false   # 禁用自动提交,手动提交偏移量
      auto-offset-reset: earliest   # 如果偏移量无效则从最早的消息开始消费。可选"latest"
      session-timeout-ms: 10000   # session会话超时时间(与心跳有关)
      max-poll-records: 1000   # 每次poll的最大记录数
      max-poll-interval-ms: 300000   # 最大轮询间隔 ,确保消费者不会在空闲期间超时
      fetch-max-wait-ms: 300   # 拉取消息时的最大等待时间
      fetch-max-bytes: 104857600   # 拉取的最大字节数,单位字节
      fetch-min-size: 5000   # 拉取消息时的最小字节数,单位字节
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer   # 键的反序列化器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer   # 值的反序列化器
      isolation-level: read_committed   # 事务隔离级别,确保读取已提交的数据。可选 "read_uncommitted"
    # Admin配置,用于创建和管理Kafka的主题
    admin:
      bootstrap-servers: localhost:9092   # Kafka集群地址
      default-topic:
        partitions: 5   # 默认主题的分区数
        replication-factor: 3   # 默认主题的副本数
    # 副本机制
    replication:   # 副本落后超过此时间(毫秒)会被移出 ISR 列表
      replica_lag_time_max_ms: 10000   # 默认为 10000 毫秒,即 10 秒
      # 最少需要多少个同步副本才能进行写入
      min_insync_replicas: 2   # 默认为 1,根据需求设置
      # 是否允许在没有足够 ISR 的情况下进行领导者选举
      unclean_leader_election_enable: false   # 默认为 false,防止数据丢失
    # 监听器的配置
    listener:
      concurrency: 10   # 设置监听器的并发数,控制消费者的并发线程数
      ack-mode: manual   # 手动确认消息,默认是 "auto" 自动确认
      poll-timeout: 3000   # 拉取消息的超时时间,单位毫秒
      missing-topics-fatal: false   # 是否忽略主题缺失的错误,默认是false
      container-factory: kafkaListenerContainerFactory   # 默认监听器容器工厂
    # 错误处理配置
    error-handler:
      # 在消息消费出错时重试的配置
      retry:
        max-attempts: 3   # 最大重试次数
        initial-interval: 1000   # 初始重试间隔,单位毫秒
        multiplier: 1.5   # 重试间隔的增量倍数
    #日志段和日志保留配置
    log:
      # 设置每个日志段的大小
      segment:
        bytes: 1073741824   # 每个日志段的大小,默认为 1GB # 设置日志保留时间(毫秒)
      retention:
        ms: 604800000   # 7天,单位是毫秒
      # 设置日志的最大保留大小
      retention_bytes: 10737418240   # 默认为 10GB
      # 设置日志清理策略(delete 或 compact)
      cleanup_policy: "delete"   # 可选:delete 或 compact
      # 设置日志索引间隔大小
      index_interval_bytes: 4096   # 默认为 4096

MainDemo1Tests

package feizhou.kafka.demo1;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootTest
class MainDemo1Tests {



    @Autowired
    private   KafkaTemplate<String, String> kafkaTemplate;

    @Test
    void sendMessage() {
        kafkaTemplate.send("test","111111111");
    }

}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.5.6</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>feizhou</groupId>
    <artifactId>kafka-demo1</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-demo1</name>
    <description>kafka-demo1</description>
    <url/>
 
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>3.3.10</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.9.1</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>


2、测试结果

在这里插入图片描述

在这里插入图片描述

更多推荐