在微服务和分布式系统中,消息队列是解耦、削峰、异步处理的核心组件。Apache Kafka 作为高吞吐、分布式的消息平台,与 Spring Boot 的集成非常方便。本文将从零开始,带你手写一个完整的 Kafka 生产者和消费者示例,并深入讲解配置细节、手动提交偏移量、批量消费等进阶用法。

前提:已搭建Kafka服务,参考本专栏文章:Kafka-安装和配置(搭建环境)

一、背景与准备

1.1 为什么要用 Kafka + Spring Boot?

  • 高吞吐:Kafka 支持每秒百万级消息。

  • 持久化:消息落盘,支持重放。

  • 分布式:天然支持水平扩展。

  • Spring 生态spring-kafka 提供了 KafkaTemplate 和 @KafkaListener,开发效率极高。

1.2 环境要求

  • JDK 17 或 21(Spring Boot 3.x 要求 JDK 17+)

  • Maven 3.6+

  • Kafka 集群(单机也可,参考 Kafka 快速入门

  • IDEA 或 VS Code

本文基于 Spring Boot 3.3.5 和 Kafka 3.8.0


二、新建 Spring Boot 项目

使用 Spring Initializr 或 IDEA 新建项目,选择以下依赖:

  • Spring Web(提供 REST API 发送消息)

  • Spring for Apache Kafka(核心依赖)

  • Lombok(可选,简化代码)

生成项目后,pom.xml 中核心依赖如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>provided</scope>
</dependency>

三、配置文件(application.yml)

在 src/main/resources/application.yml 中配置 Kafka 连接信息、生产者和消费者属性。

spring:  
  # Kafka 消息队列配置
  kafka:
    # Kafka 服务端地址(IP:端口)
    bootstrap-servers: 你的服务器IP:9092

    # 生产者配置:负责发送消息到 Kafka
    producer:
      # Key 序列化方式:字符串序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Value 序列化方式:字符串序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 消息确认机制:1 = 仅Leader确认、all = 全部确认
      acks: 1
      # 消息发送失败重试次数
      retries: 3
      # 批量发送的消息字节大小
      batch-size: 16384
      # 消息延迟发送时间(毫秒),提高吞吐量
      linger-ms: 1
      # 生产者可用的缓存总量
      buffer-memory: 33554432

    # 消费者配置:负责从 Kafka 拉取消息
    consumer:
      # Key 反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Value 反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 消费者组ID,同一组内的消费者共同消费消息
      group-id: sx-monitor-group
      # 无初始偏移量时,从头开始消费 earliest / 从最新开始消费 latest
      auto-offset-reset: earliest
      # 是否开启自动提交偏移量
      enable-auto-commit: true
      # 自动提交偏移量的时间间隔(毫秒)
      auto-commit-interval: 1000
      # 每次拉取的最大消息条数
      max-poll-records: 500

    # 监听器配置:Spring Kafka 消费监听规则
    listener:
      # 批量消费确认模式
      ack-mode: batch
      # 消费者并发线程数
      concurrency: 2

    # 自定义 Topic 配置
    topics:
      # 测试调试专用
      test: test-topic
      # 监控业务
      monitor: sx-monitor-topic

四、Kafka配置类

1.KafkaProperties

@Data
@Component
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {

    /** Kafka 服务端地址(快捷读取,等价于 spring.kafka.bootstrap-servers) */
    private String bootstrapServers;

    /** 消费者 group-id(快捷读取,等价于 spring.kafka.consumer.group-id) */
    private Consumer consumer = new Consumer();

    // ==================== 自定义扩展属性 ====================

    /** 自定义 Topic 名称 */
    private final Topics topics = new Topics();

    // ---- 嵌套类 ----

    @Data
    public static class Consumer {
        private String groupId;
    }

    @Data
    public static class Topics {
        /** 测试调试专用 */
        private String test = "test-topic";
        /** 监控业务 */
        private String monitor = "sx-monitor-topic";
    }
}

2.KafkaConfig

/**
 * <h3>Kafka 集中配置</h3>
 * <pre>
 * 职责分层:
 *   1. AdminClient  — 运维管理(健康检查、Topic 列表)
 *   2. Producer     — 消息生产(幂等 + Snappy 压缩 + 超时控制)
 *   3. Consumer     — 消息消费(手动创建 Factory,注入 ErrorHandler)
 *   4. ErrorHandler — 消费异常重试(3 次 + 1s 间隔,耗尽记日志)
 *   5. HealthCheck  — Broker 连通性探测
 * </pre>
 *
 */
@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {

    private final org.springframework.boot.autoconfigure.kafka.KafkaProperties kafkaProperties;

    // ═══════════════════════════════════════════════════════════════════
    // 1. AdminClient — 运维管理
    // ═══════════════════════════════════════════════════════════════════

    /**
     * 原生 AdminClient,用于 Topic 管理、集群元数据查询。
     * 每次调用都创建新实例(轻量),避免长连接空转。
     */
    @Bean
    public AdminClient kafkaAdminClient() {
        Map<String, Object> config = kafkaProperties.buildAdminProperties(null);
        return AdminClient.create(config);
    }

    /**
     * Spring KafkaAdmin Bean,配合 @Bean newTopic() 自动创建 Topic。
     * 调试期间 Broker 不可用时不阻塞启动。
     */
    @Bean
    public KafkaAdmin kafkaAdmin() {
        KafkaAdmin admin = new KafkaAdmin(kafkaProperties.buildProducerProperties(null));
        admin.setFatalIfBrokerNotAvailable(false);
        return admin;
    }

    // ═══════════════════════════════════════════════════════════════════
    // 2. Producer — 消息生产
    // ═══════════════════════════════════════════════════════════════════

    /**
     * Producer 工厂。
     * <ul>
     *   <li>enable.idempotence = true  → 幂等,保证 exactly-once</li>
     *   <li>compression.type = snappy  → 压缩传输,带宽换 CPU</li>
     *   <li>max.in.flight.requests = 5 → 并发请求数(幂等时安全提高)</li>
     *   <li>request.timeout.ms = 30s   → 单次请求超时</li>
     *   <li>delivery.timeout.ms = 60s  → 端到端投递超时(含重试)</li>
     * </ul>
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = kafkaProperties.buildProducerProperties(null);
        props.put("acks", "all");               // 幂等必须 all
        props.put("enable.idempotence", true);
        props.put("compression.type", "snappy");
        props.put("max.in.flight.requests.per.connection", 5);
        props.put("request.timeout.ms", 30000);
        props.put("delivery.timeout.ms", 60000);
        return new DefaultKafkaProducerFactory<>(props);
    }

    /**
     * KafkaTemplate — 线程安全,单例复用。
     * String 泛型,适合 JSON 或纯文本消息。
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    // ═══════════════════════════════════════════════════════════════════
    // 3. Consumer — 消息消费
    // ═══════════════════════════════════════════════════════════════════

    /**
     * Consumer 工厂。
     * <ul>
     *   <li>session.timeout.ms = 30s    → 会话超时,超时触发 rebalance</li>
     *   <li>heartbeat.interval.ms = 3s → 心跳间隔,建议 session 的 1/10</li>
     *   <li>max.poll.interval.ms = 5min → poll 间隔上限,超过触发 rebalance</li>
     * </ul>
     */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = kafkaProperties.buildConsumerProperties(null);
        props.put("session.timeout.ms", 30000);
        props.put("heartbeat.interval.ms", 3000);
        props.put("max.poll.interval.ms", 300000);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    /**
     * Listener 容器工厂,生产环境定制入口。
     * <ul>
     *   <li>concurrency = 2           → 消费线程数(yml 中 listener.concurrency 也配置了)</li>
     *   <li>batchListener = false    → 单条消费(批量场景改为 true)</li>
     *   <li>CommonErrorHandler       → 统一异常重试策略</li>
     * </ul>
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(2);
        factory.setBatchListener(false);
        factory.setCommonErrorHandler(kafkaErrorHandler());
        return factory;
    }

    // ═══════════════════════════════════════════════════════════════════
    // 4. ErrorHandler — 消费异常处理
    // ═══════════════════════════════════════════════════════════════════

    /**
     * 消费者异常处理策略。
     * <p>
     * 重试 3 次,每次间隔 1 秒。全部失败后记录日志并提交 offset(跳过该消息)。
     * <p>
     * 线上加强方案:
     * <ol>
     *   <li>升级为指数退避:{@code new ExponentialBackOff(1000L, 2.0)}</li>
     *   <li>接入 DLT(死信队列):{@code new DeadLetterPublishingRecoverer(template)}</li>
     * </ol>
     */
    private DefaultErrorHandler kafkaErrorHandler() {
        return new DefaultErrorHandler(
                (record, exception) -> log.error(
                        "Kafka 消费失败(重试耗尽)→ topic={}, partition={}, offset={}, key={}, value={}",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value(), exception),
                new FixedBackOff(1000L, 3L)
        );
    }

    // ═══════════════════════════════════════════════════════════════════
    // 5. 运维工具 — 健康检查 & Topic 管理
    // ═══════════════════════════════════════════════════════════════════

    /**
     * 探测 Broker 是否可达。
     * 调用 listTopics() 做轻量 RTT,超时由 AdminClient 默认值兜底。
     *
     * @return true = Broker 连通
     */
    public boolean isBrokerAlive() {
        try (AdminClient client = AdminClient.create(kafkaProperties.buildAdminProperties(null))) {
            client.listTopics().names().get();
            return true;
        } catch (Exception e) {
            log.warn("Kafka Broker 连接失败: {}", e.getMessage());
            return false;
        }
    }

    /**
     * 列出 Broker 上所有 Topic。
     *
     * @return Topic 名称集合
     */
    public Set<String> listTopics() throws ExecutionException, InterruptedException {
        try (AdminClient client = AdminClient.create(kafkaProperties.buildAdminProperties(null))) {
            return client.listTopics().names().get();
        }
    }
}

3.KafkaProducerService

/**
 * Kafka 生产者服务(生产级)
 * <p>
 * 功能:异步/同步发送、消息头注入、超时兜底、链路追踪 ID
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final KafkaProperties kafkaProperties;

    private static final DateTimeFormatter TIME_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

    // ==================== 异步发送 ====================

    /**
     * 发送消息(异步),返回 CompletableFuture 供调用方自行处理
     */
    public CompletableFuture<SendResult<String, String>> sendAsync(String topic, String message) {
        return sendAsync(topic, null, message);
    }

    /**
     * 发送消息(异步,带 key)
     */
    public CompletableFuture<SendResult<String, String>> sendAsync(String topic, String key, String message) {
        ProducerRecord<String, String> record = buildRecord(topic, key, message);
        long start = System.currentTimeMillis();
        return kafkaTemplate.send(record)
                .whenComplete((result, ex) -> {
                    long elapsed = System.currentTimeMillis() - start;
                    if (ex == null) {
                        log.info("Kafka 发送成功 → topic={}, partition={}, offset={}, key={}, size={}B, cost={}ms",
                                result.getRecordMetadata().topic(),
                                result.getRecordMetadata().partition(),
                                result.getRecordMetadata().offset(),
                                key,
                                message.length(),
                                elapsed);
                    } else {
                        log.error("Kafka 发送失败 → topic={}, key={}, cost={}ms, error={}",
                                topic, key, elapsed, ex.getMessage(), ex);
                    }
                });
    }

    // ==================== 同步发送(有超时) ====================

    /**
     * 同步发送,5 秒超时,返回结果或抛异常
     */
    public SendResult<String, String> sendSync(String topic, String message) throws Exception {
        return sendSync(topic, null, message);
    }

    /**
     * 同步发送(带 key),5 秒超时
     */
    public SendResult<String, String> sendSync(String topic, String key, String message) throws Exception {
        try {
            SendResult<String, String> result = sendAsync(topic, key, message).get();
            return result;
        } catch (Exception e) {
            log.error("Kafka 同步发送异常 → topic={}, key={}", topic, key, e);
            throw e;
        }
    }

    // ==================== 便捷方法(兼容旧接口) ====================

    /**
     * 发送消息(异步,不返回值)— 兼容旧调用
     */
    public void send(String topic, String message) {
        sendAsync(topic, message);
    }

    /**
     * 发送消息(异步,带 key)— 兼容旧调用
     */
    public void send(String topic, String key, String message) {
        sendAsync(topic, key, message);
    }

    // ==================== 内部构建 =======================================================================

    /**
     * 构建 ProducerRecord,自动注入消息头
     */
    private ProducerRecord<String, String> buildRecord(String topic, String key, String message) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, System.currentTimeMillis(), key, message);

        // 注入消息头(线上标配:链路追踪 ID + 来源 + 时间戳)
        record.headers().add(new RecordHeader("X-Message-Id", UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)));
        record.headers().add(new RecordHeader("X-Source", "sx-monitor".getBytes(StandardCharsets.UTF_8)));
        record.headers().add(new RecordHeader("X-Timestamp", LocalDateTime.now().format(TIME_FMT).getBytes(StandardCharsets.UTF_8)));

        return record;
    }

    /**
     * 检查 broker 是否可用(简单 probe)
     */
    public boolean isReady() {
        try {
            kafkaTemplate.partitionsFor(kafkaProperties.getTopics().getTest());
            return true;
        } catch (Exception e) {
            return false;
        }
    }
}

4.KafkaConsumerService

/**
 * Kafka 消费者服务
 * <p>
 * 启动即监听,收到消息后处理业务逻辑
 *
 * @author lk
 * @date 2026/6/2
 */
@Slf4j
@Component
public class KafkaConsumerService {

    /**
     * 监听 test-topic
     */
    @KafkaListener(topics = "${spring.kafka.topics.test}", groupId = "${spring.kafka.consumer.group-id}")
    public void listenTestTopic(ConsumerRecord<String, String> record) {
        handle(record);
    }

    /**
     * 监听 monitor-topic
     */
    @KafkaListener(topics = "${spring.kafka.topics.monitor}", groupId = "${spring.kafka.consumer.group-id}")
    public void listenMonitorTopic(ConsumerRecord<String, String> record) {
        handle(record);
    }

    /**
     * 业务处理入口(线上在这里接具体业务逻辑)
     */
    private void handle(ConsumerRecord<String, String> record) {
        log.info("Kafka 消费 → topic={}, partition={}, offset={}, key={}, value={}",
                record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
}

5.KafkaController

/**
 * Kafka 调试控制器
 * <p>
 * 测试地址:localhost:14030/api/doc.html → Kafka调试
 */
@Slf4j
@AllArgsConstructor
@RestController
@RequestMapping("/kafka")
@Tag(name = "Kafka调试")
public class KafkaController {

    private final KafkaProducerService producerService;
    private final KafkaConfig kafkaConfig;

    // ==================== 发送 ====================

    @PostMapping("/send")
    @Operation(summary = "1.发送消息(异步)")
    @ApiOperationSupport(order = 1)
    public R<?> send(@RequestParam(defaultValue = "test-topic") String topic,
                     @RequestParam String message) {
        producerService.sendAsync(topic, message);
        return R.ok("消息已发送 → topic=" + topic + ", message=" + message);
    }

    @PostMapping("/send-with-key")
    @Operation(summary = "2.发送带Key的消息(异步)")
    @ApiOperationSupport(order = 2)
    public R<?> sendWithKey(@RequestParam(defaultValue = "test-topic") String topic,
                            @RequestParam String key,
                            @RequestParam String message) {
        producerService.sendAsync(topic, key, message);
        return R.ok("消息已发送 → topic=" + topic + ", key=" + key + ", message=" + message);
    }

    @PostMapping("/send-batch")
    @Operation(summary = "3.批量发送(N条消息)")
    @ApiOperationSupport(order = 3)
    public R<?> sendBatch(@RequestParam(defaultValue = "test-topic") String topic,
                          @RequestParam(defaultValue = "hello kafka") String prefix,
                          @RequestParam(defaultValue = "10") int count) {
        for (int i = 0; i < count; i++) {
            producerService.sendAsync(topic, prefix + " [" + i + "]");
        }
        return R.ok("已发送 " + count + " 条消息 → topic=" + topic);
    }

    @PostMapping("/send-sync")
    @Operation(summary = "4.发送消息(同步,等待 Broker 确认)")
    @ApiOperationSupport(order = 4)
    public R<?> sendSync(@RequestParam(defaultValue = "test-topic") String topic,
                         @RequestParam String message) {
        try {
            var result = producerService.sendSync(topic, message);
            Map<String, Object> data = new HashMap<>();
            data.put("topic", result.getRecordMetadata().topic());
            data.put("partition", result.getRecordMetadata().partition());
            data.put("offset", result.getRecordMetadata().offset());
            data.put("message", message);
            return R.ok(data);
        } catch (Exception e) {
            return R.error("发送失败: " + e.getMessage());
        }
    }

    // ==================== 运维 ====================

    @GetMapping("/health")
    @Operation(summary = "5.Broker 连通性检查")
    @ApiOperationSupport(order = 5)
    public R<?> health() {
        Map<String, Object> info = new HashMap<>();
        info.put("brokerAlive", kafkaConfig.isBrokerAlive());
        info.put("producerReady", producerService.isReady());
        return R.ok(info);
    }

    @GetMapping("/topics")
    @Operation(summary = "6.Topic 列表")
    @ApiOperationSupport(order = 6)
    public R<?> topics() {
        try {
            Set<String> topics = kafkaConfig.listTopics();
            return R.ok(topics);
        } catch (Exception e) {
            return R.error("获取 Topic 列表失败: " + e.getMessage());
        }
    }
}

五、进阶用法(生产环境必备)

1.手动提交偏移量(确保消息不丢失)

默认情况下,spring-kafka 会在消息监听器正常返回后自动提交偏移量(enable.auto.commit=true)。但如果你希望业务处理成功后才提交,避免处理失败时丢失消息,需要改为手动提交。

配置:

spring:
  kafka:
    listener:
      ack-mode: manual_immediate   # 手动确认模式
    consumer:
      enable-auto-commit: false    # 关闭自动提交

消费者代码(增加 Acknowledgment 参数):

@Component
public class ManualCommitConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(MyMessage message, Acknowledgment ack) {
        try {
            System.out.println("Processing: " + message);
            // 执行业务逻辑(可能抛出异常)
            doBusiness(message);
            // 业务成功,手动提交偏移量
            ack.acknowledge();
        } catch (Exception e) {
            // 处理失败,不提交偏移量,消息会再次被消费
            System.err.println("处理失败,稍后重试: " + e.getMessage());
        }
    }

    private void doBusiness(MyMessage msg) {
        // 模拟业务
    }
}

2.批量消费提升吞吐量

当消息量很大时,可以一次拉取多条消息批量处理。

配置

spring:
  kafka:
    listener:
      type: batch               # 批量模式
    consumer:
      max-poll-records: 10      # 每次拉取最多10条

消费者方法参数改为 List<MyMessage>

@Component
public class BatchConsumer {

    @KafkaListener(topics = "my-topic")
    public void listen(List<MyMessage> messages) {
        System.out.println("Received batch size: " + messages.size());
        messages.forEach(msg -> {
            // 批量处理
        });
    }
}

3.使用 Kafka 事务(生产端 exactly-once)

如果生产者需要保证消息发送到多个分区的原子性,可以开启事务。

spring:
  kafka:
    producer:
      transaction-id-prefix: tx-   # 开启事务必须配置前缀
      properties:
        enable.idempotence: true

然后在业务方法上使用 @Transactional 注解;

(注意是 org.springframework.transaction.annotation.Transactional),KafkaTemplate 会参与到 Spring 事务中。

@Transactional
public void sendInTransaction() {
    kafkaTemplate.send("topic1", "msg1");
    kafkaTemplate.send("topic2", "msg2");
    // 若发生异常,所有消息都不会提交
}

4.消费者异常处理与重试

Spring Kafka 提供了 @RetryableTopic 或 SeekToCurrentErrorHandler 来处理消费失败时的重试。简单示例:配置一个 ErrorHandler 让失败的消息稍后重试。

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
        ConsumerFactory<String, Object> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    // 设置重试策略(最多3次,间隔1秒)
    factory.setRetryTemplate(RetryTemplate.builder()
            .maxAttempts(3)
            .fixedBackoff(1000)
            .build());
    return factory;
}

六、常见问题与最佳实践

1.消息丢失怎么办?

  • 生产者端:设置 acks=all 和 enable.idempotence=true

  • 消费者端:关闭自动提交(enable.auto.commit=false),业务处理成功后再手动提交。

2.消息重复怎么办?

  • 生产者幂等只能防止单生产者内的重复,跨会话或跨分区的事务场景需用事务 API。

  • 消费者重复通常由 rebalance 导致,业务层必须做幂等设计,例如使用数据库唯一键、Redis 去重表等。

3.如何选择 key 和 value 的序列化方式?

  • 简单场景:StringSerializer + 手动构造 JSON 字符串。

  • 对象场景:JsonSerializer 最方便,但要处理好类型信任问题。

  • 高性能场景:考虑 Avro 或 Protobuf,配合 Schema Registry。

4.监听器方法中可以返回 CompletableFuture 吗?

可以,但需要设置 @KafkaListener 的 properties 参数为 async,且返回类型必须是 ListenableFuture 或 CompletableFuture。实际使用较少,因为大多数业务是同步的。


七、总结

本文通过一个完整的 Spring Boot 集成 Kafka 的示例,演示了:

  • 如何添加依赖和配置 application.yml。

  • 如何用 KafkaTemplate 发送消息。

  • 如何用 @KafkaListener 接收消息。

  • 进阶功能:手动提交、批量消费、事务、重试等。

Spring Kafka 将底层的 Kafka 客户端封装得非常优雅,开发者只需要关注业务逻辑。但要注意,生产环境中消息的不丢失、不重复问题必须从配置和业务两方面保证。希望这篇博客能帮助你快速上手 Kafka + Spring Boot,并在实际项目中少踩坑。

项目源码:可访问 GitHub 示例仓库(示例链接请替换为真实地址)。

参考资料

如果你觉得本文有帮助,请点赞、收藏、转发支持!

更多推荐