Kafka-简单使用(SpringBoot集成)
在微服务和分布式系统中,消息队列是解耦、削峰、异步处理的核心组件。Apache Kafka 作为高吞吐、分布式的消息平台,与 Spring Boot 的集成非常方便。本文将从零开始,带你手写一个完整的 Kafka 生产者和消费者示例,并深入讲解配置细节、手动提交偏移量、批量消费等进阶用法。
前提:已搭建Kafka服务,参考本专栏文章:Kafka-安装和配置(搭建环境)
一、背景与准备
1.1 为什么要用 Kafka + Spring Boot?
-
高吞吐:Kafka 支持每秒百万级消息。
-
持久化:消息落盘,支持重放。
-
分布式:天然支持水平扩展。
-
Spring 生态:
spring-kafka提供了KafkaTemplate和@KafkaListener,开发效率极高。
1.2 环境要求
-
JDK 17 或 21(Spring Boot 3.x 要求 JDK 17+)
-
Maven 3.6+
-
Kafka 集群(单机也可,参考 Kafka 快速入门)
-
IDEA 或 VS Code
本文基于 Spring Boot 3.3.5 和 Kafka 3.8.0。
二、新建 Spring Boot 项目
使用 Spring Initializr 或 IDEA 新建项目,选择以下依赖:
-
Spring Web(提供 REST API 发送消息)
-
Spring for Apache Kafka(核心依赖)
-
Lombok(可选,简化代码)
生成项目后,pom.xml 中核心依赖如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
三、配置文件(application.yml)
在 src/main/resources/application.yml 中配置 Kafka 连接信息、生产者和消费者属性。
spring:
# Kafka 消息队列配置
kafka:
# Kafka 服务端地址(IP:端口)
bootstrap-servers: 你的服务器IP:9092
# 生产者配置:负责发送消息到 Kafka
producer:
# Key 序列化方式:字符串序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Value 序列化方式:字符串序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消息确认机制:1 = 仅Leader确认、all = 全部确认
acks: 1
# 消息发送失败重试次数
retries: 3
# 批量发送的消息字节大小
batch-size: 16384
# 消息延迟发送时间(毫秒),提高吞吐量
linger-ms: 1
# 生产者可用的缓存总量
buffer-memory: 33554432
# 消费者配置:负责从 Kafka 拉取消息
consumer:
# Key 反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Value 反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费者组ID,同一组内的消费者共同消费消息
group-id: sx-monitor-group
# 无初始偏移量时,从头开始消费 earliest / 从最新开始消费 latest
auto-offset-reset: earliest
# 是否开启自动提交偏移量
enable-auto-commit: true
# 自动提交偏移量的时间间隔(毫秒)
auto-commit-interval: 1000
# 每次拉取的最大消息条数
max-poll-records: 500
# 监听器配置:Spring Kafka 消费监听规则
listener:
# 批量消费确认模式
ack-mode: batch
# 消费者并发线程数
concurrency: 2
# 自定义 Topic 配置
topics:
# 测试调试专用
test: test-topic
# 监控业务
monitor: sx-monitor-topic
四、Kafka配置类
1.KafkaProperties
@Data
@Component
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {
/** Kafka 服务端地址(快捷读取,等价于 spring.kafka.bootstrap-servers) */
private String bootstrapServers;
/** 消费者 group-id(快捷读取,等价于 spring.kafka.consumer.group-id) */
private Consumer consumer = new Consumer();
// ==================== 自定义扩展属性 ====================
/** 自定义 Topic 名称 */
private final Topics topics = new Topics();
// ---- 嵌套类 ----
@Data
public static class Consumer {
private String groupId;
}
@Data
public static class Topics {
/** 测试调试专用 */
private String test = "test-topic";
/** 监控业务 */
private String monitor = "sx-monitor-topic";
}
} |
2.KafkaConfig
/**
* <h3>Kafka 集中配置</h3>
* <pre>
* 职责分层:
* 1. AdminClient — 运维管理(健康检查、Topic 列表)
* 2. Producer — 消息生产(幂等 + Snappy 压缩 + 超时控制)
* 3. Consumer — 消息消费(手动创建 Factory,注入 ErrorHandler)
* 4. ErrorHandler — 消费异常重试(3 次 + 1s 间隔,耗尽记日志)
* 5. HealthCheck — Broker 连通性探测
* </pre>
*
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {
private final org.springframework.boot.autoconfigure.kafka.KafkaProperties kafkaProperties;
// ═══════════════════════════════════════════════════════════════════
// 1. AdminClient — 运维管理
// ═══════════════════════════════════════════════════════════════════
/**
* 原生 AdminClient,用于 Topic 管理、集群元数据查询。
* 每次调用都创建新实例(轻量),避免长连接空转。
*/
@Bean
public AdminClient kafkaAdminClient() {
Map<String, Object> config = kafkaProperties.buildAdminProperties(null);
return AdminClient.create(config);
}
/**
* Spring KafkaAdmin Bean,配合 @Bean newTopic() 自动创建 Topic。
* 调试期间 Broker 不可用时不阻塞启动。
*/
@Bean
public KafkaAdmin kafkaAdmin() {
KafkaAdmin admin = new KafkaAdmin(kafkaProperties.buildProducerProperties(null));
admin.setFatalIfBrokerNotAvailable(false);
return admin;
}
// ═══════════════════════════════════════════════════════════════════
// 2. Producer — 消息生产
// ═══════════════════════════════════════════════════════════════════
/**
* Producer 工厂。
* <ul>
* <li>enable.idempotence = true → 幂等,保证 exactly-once</li>
* <li>compression.type = snappy → 压缩传输,带宽换 CPU</li>
* <li>max.in.flight.requests = 5 → 并发请求数(幂等时安全提高)</li>
* <li>request.timeout.ms = 30s → 单次请求超时</li>
* <li>delivery.timeout.ms = 60s → 端到端投递超时(含重试)</li>
* </ul>
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = kafkaProperties.buildProducerProperties(null);
props.put("acks", "all"); // 幂等必须 all
props.put("enable.idempotence", true);
props.put("compression.type", "snappy");
props.put("max.in.flight.requests.per.connection", 5);
props.put("request.timeout.ms", 30000);
props.put("delivery.timeout.ms", 60000);
return new DefaultKafkaProducerFactory<>(props);
}
/**
* KafkaTemplate — 线程安全,单例复用。
* String 泛型,适合 JSON 或纯文本消息。
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// ═══════════════════════════════════════════════════════════════════
// 3. Consumer — 消息消费
// ═══════════════════════════════════════════════════════════════════
/**
* Consumer 工厂。
* <ul>
* <li>session.timeout.ms = 30s → 会话超时,超时触发 rebalance</li>
* <li>heartbeat.interval.ms = 3s → 心跳间隔,建议 session 的 1/10</li>
* <li>max.poll.interval.ms = 5min → poll 间隔上限,超过触发 rebalance</li>
* </ul>
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties(null);
props.put("session.timeout.ms", 30000);
props.put("heartbeat.interval.ms", 3000);
props.put("max.poll.interval.ms", 300000);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* Listener 容器工厂,生产环境定制入口。
* <ul>
* <li>concurrency = 2 → 消费线程数(yml 中 listener.concurrency 也配置了)</li>
* <li>batchListener = false → 单条消费(批量场景改为 true)</li>
* <li>CommonErrorHandler → 统一异常重试策略</li>
* </ul>
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2);
factory.setBatchListener(false);
factory.setCommonErrorHandler(kafkaErrorHandler());
return factory;
}
// ═══════════════════════════════════════════════════════════════════
// 4. ErrorHandler — 消费异常处理
// ═══════════════════════════════════════════════════════════════════
/**
* 消费者异常处理策略。
* <p>
* 重试 3 次,每次间隔 1 秒。全部失败后记录日志并提交 offset(跳过该消息)。
* <p>
* 线上加强方案:
* <ol>
* <li>升级为指数退避:{@code new ExponentialBackOff(1000L, 2.0)}</li>
* <li>接入 DLT(死信队列):{@code new DeadLetterPublishingRecoverer(template)}</li>
* </ol>
*/
private DefaultErrorHandler kafkaErrorHandler() {
return new DefaultErrorHandler(
(record, exception) -> log.error(
"Kafka 消费失败(重试耗尽)→ topic={}, partition={}, offset={}, key={}, value={}",
record.topic(), record.partition(), record.offset(), record.key(), record.value(), exception),
new FixedBackOff(1000L, 3L)
);
}
// ═══════════════════════════════════════════════════════════════════
// 5. 运维工具 — 健康检查 & Topic 管理
// ═══════════════════════════════════════════════════════════════════
/**
* 探测 Broker 是否可达。
* 调用 listTopics() 做轻量 RTT,超时由 AdminClient 默认值兜底。
*
* @return true = Broker 连通
*/
public boolean isBrokerAlive() {
try (AdminClient client = AdminClient.create(kafkaProperties.buildAdminProperties(null))) {
client.listTopics().names().get();
return true;
} catch (Exception e) {
log.warn("Kafka Broker 连接失败: {}", e.getMessage());
return false;
}
}
/**
* 列出 Broker 上所有 Topic。
*
* @return Topic 名称集合
*/
public Set<String> listTopics() throws ExecutionException, InterruptedException {
try (AdminClient client = AdminClient.create(kafkaProperties.buildAdminProperties(null))) {
return client.listTopics().names().get();
}
}
}
|
3.KafkaProducerService
/**
* Kafka 生产者服务(生产级)
* <p>
* 功能:异步/同步发送、消息头注入、超时兜底、链路追踪 ID
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final KafkaProperties kafkaProperties;
private static final DateTimeFormatter TIME_FMT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
// ==================== 异步发送 ====================
/**
* 发送消息(异步),返回 CompletableFuture 供调用方自行处理
*/
public CompletableFuture<SendResult<String, String>> sendAsync(String topic, String message) {
return sendAsync(topic, null, message);
}
/**
* 发送消息(异步,带 key)
*/
public CompletableFuture<SendResult<String, String>> sendAsync(String topic, String key, String message) {
ProducerRecord<String, String> record = buildRecord(topic, key, message);
long start = System.currentTimeMillis();
return kafkaTemplate.send(record)
.whenComplete((result, ex) -> {
long elapsed = System.currentTimeMillis() - start;
if (ex == null) {
log.info("Kafka 发送成功 → topic={}, partition={}, offset={}, key={}, size={}B, cost={}ms",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset(),
key,
message.length(),
elapsed);
} else {
log.error("Kafka 发送失败 → topic={}, key={}, cost={}ms, error={}",
topic, key, elapsed, ex.getMessage(), ex);
}
});
}
// ==================== 同步发送(有超时) ====================
/**
* 同步发送,5 秒超时,返回结果或抛异常
*/
public SendResult<String, String> sendSync(String topic, String message) throws Exception {
return sendSync(topic, null, message);
}
/**
* 同步发送(带 key),5 秒超时
*/
public SendResult<String, String> sendSync(String topic, String key, String message) throws Exception {
try {
SendResult<String, String> result = sendAsync(topic, key, message).get();
return result;
} catch (Exception e) {
log.error("Kafka 同步发送异常 → topic={}, key={}", topic, key, e);
throw e;
}
}
// ==================== 便捷方法(兼容旧接口) ====================
/**
* 发送消息(异步,不返回值)— 兼容旧调用
*/
public void send(String topic, String message) {
sendAsync(topic, message);
}
/**
* 发送消息(异步,带 key)— 兼容旧调用
*/
public void send(String topic, String key, String message) {
sendAsync(topic, key, message);
}
// ==================== 内部构建 =======================================================================
/**
* 构建 ProducerRecord,自动注入消息头
*/
private ProducerRecord<String, String> buildRecord(String topic, String key, String message) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, System.currentTimeMillis(), key, message);
// 注入消息头(线上标配:链路追踪 ID + 来源 + 时间戳)
record.headers().add(new RecordHeader("X-Message-Id", UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)));
record.headers().add(new RecordHeader("X-Source", "sx-monitor".getBytes(StandardCharsets.UTF_8)));
record.headers().add(new RecordHeader("X-Timestamp", LocalDateTime.now().format(TIME_FMT).getBytes(StandardCharsets.UTF_8)));
return record;
}
/**
* 检查 broker 是否可用(简单 probe)
*/
public boolean isReady() {
try {
kafkaTemplate.partitionsFor(kafkaProperties.getTopics().getTest());
return true;
} catch (Exception e) {
return false;
}
}
}
|
4.KafkaConsumerService
/**
* Kafka 消费者服务
* <p>
* 启动即监听,收到消息后处理业务逻辑
*
* @author lk
* @date 2026/6/2
*/
@Slf4j
@Component
public class KafkaConsumerService {
/**
* 监听 test-topic
*/
@KafkaListener(topics = "${spring.kafka.topics.test}", groupId = "${spring.kafka.consumer.group-id}")
public void listenTestTopic(ConsumerRecord<String, String> record) {
handle(record);
}
/**
* 监听 monitor-topic
*/
@KafkaListener(topics = "${spring.kafka.topics.monitor}", groupId = "${spring.kafka.consumer.group-id}")
public void listenMonitorTopic(ConsumerRecord<String, String> record) {
handle(record);
}
/**
* 业务处理入口(线上在这里接具体业务逻辑)
*/
private void handle(ConsumerRecord<String, String> record) {
log.info("Kafka 消费 → topic={}, partition={}, offset={}, key={}, value={}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
|
5.KafkaController
/**
* Kafka 调试控制器
* <p>
* 测试地址:localhost:14030/api/doc.html → Kafka调试
*/
@Slf4j
@AllArgsConstructor
@RestController
@RequestMapping("/kafka")
@Tag(name = "Kafka调试")
public class KafkaController {
private final KafkaProducerService producerService;
private final KafkaConfig kafkaConfig;
// ==================== 发送 ====================
@PostMapping("/send")
@Operation(summary = "1.发送消息(异步)")
@ApiOperationSupport(order = 1)
public R<?> send(@RequestParam(defaultValue = "test-topic") String topic,
@RequestParam String message) {
producerService.sendAsync(topic, message);
return R.ok("消息已发送 → topic=" + topic + ", message=" + message);
}
@PostMapping("/send-with-key")
@Operation(summary = "2.发送带Key的消息(异步)")
@ApiOperationSupport(order = 2)
public R<?> sendWithKey(@RequestParam(defaultValue = "test-topic") String topic,
@RequestParam String key,
@RequestParam String message) {
producerService.sendAsync(topic, key, message);
return R.ok("消息已发送 → topic=" + topic + ", key=" + key + ", message=" + message);
}
@PostMapping("/send-batch")
@Operation(summary = "3.批量发送(N条消息)")
@ApiOperationSupport(order = 3)
public R<?> sendBatch(@RequestParam(defaultValue = "test-topic") String topic,
@RequestParam(defaultValue = "hello kafka") String prefix,
@RequestParam(defaultValue = "10") int count) {
for (int i = 0; i < count; i++) {
producerService.sendAsync(topic, prefix + " [" + i + "]");
}
return R.ok("已发送 " + count + " 条消息 → topic=" + topic);
}
@PostMapping("/send-sync")
@Operation(summary = "4.发送消息(同步,等待 Broker 确认)")
@ApiOperationSupport(order = 4)
public R<?> sendSync(@RequestParam(defaultValue = "test-topic") String topic,
@RequestParam String message) {
try {
var result = producerService.sendSync(topic, message);
Map<String, Object> data = new HashMap<>();
data.put("topic", result.getRecordMetadata().topic());
data.put("partition", result.getRecordMetadata().partition());
data.put("offset", result.getRecordMetadata().offset());
data.put("message", message);
return R.ok(data);
} catch (Exception e) {
return R.error("发送失败: " + e.getMessage());
}
}
// ==================== 运维 ====================
@GetMapping("/health")
@Operation(summary = "5.Broker 连通性检查")
@ApiOperationSupport(order = 5)
public R<?> health() {
Map<String, Object> info = new HashMap<>();
info.put("brokerAlive", kafkaConfig.isBrokerAlive());
info.put("producerReady", producerService.isReady());
return R.ok(info);
}
@GetMapping("/topics")
@Operation(summary = "6.Topic 列表")
@ApiOperationSupport(order = 6)
public R<?> topics() {
try {
Set<String> topics = kafkaConfig.listTopics();
return R.ok(topics);
} catch (Exception e) {
return R.error("获取 Topic 列表失败: " + e.getMessage());
}
}
} |
五、进阶用法(生产环境必备)
1.手动提交偏移量(确保消息不丢失)
默认情况下,spring-kafka 会在消息监听器正常返回后自动提交偏移量(enable.auto.commit=true)。但如果你希望业务处理成功后才提交,避免处理失败时丢失消息,需要改为手动提交。
配置:
| spring: kafka: listener: ack-mode: manual_immediate # 手动确认模式 consumer: enable-auto-commit: false # 关闭自动提交 |
消费者代码(增加 Acknowledgment 参数):
|
@Component @KafkaListener(topics = "my-topic", groupId = "my-group") private void doBusiness(MyMessage msg) { |
2.批量消费提升吞吐量
当消息量很大时,可以一次拉取多条消息批量处理。
配置:
| spring: kafka: listener: type: batch # 批量模式 consumer: max-poll-records: 10 # 每次拉取最多10条 |
消费者方法参数改为 List<MyMessage>:
|
@Component @KafkaListener(topics = "my-topic") |
3.使用 Kafka 事务(生产端 exactly-once)
如果生产者需要保证消息发送到多个分区的原子性,可以开启事务。
| spring: kafka: producer: transaction-id-prefix: tx- # 开启事务必须配置前缀 properties: enable.idempotence: true |
然后在业务方法上使用 @Transactional 注解;
(注意是 org.springframework.transaction.annotation.Transactional),KafkaTemplate 会参与到 Spring 事务中。
| @Transactional public void sendInTransaction() { kafkaTemplate.send("topic1", "msg1"); kafkaTemplate.send("topic2", "msg2"); // 若发生异常,所有消息都不会提交 } |
4.消费者异常处理与重试
Spring Kafka 提供了 @RetryableTopic 或 SeekToCurrentErrorHandler 来处理消费失败时的重试。简单示例:配置一个 ErrorHandler 让失败的消息稍后重试。
| @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory( ConsumerFactory<String, Object> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); // 设置重试策略(最多3次,间隔1秒) factory.setRetryTemplate(RetryTemplate.builder() .maxAttempts(3) .fixedBackoff(1000) .build()); return factory; } |
六、常见问题与最佳实践
1.消息丢失怎么办?
-
生产者端:设置
acks=all和enable.idempotence=true。 -
消费者端:关闭自动提交(
enable.auto.commit=false),业务处理成功后再手动提交。
2.消息重复怎么办?
-
生产者幂等只能防止单生产者内的重复,跨会话或跨分区的事务场景需用事务 API。
-
消费者重复通常由 rebalance 导致,业务层必须做幂等设计,例如使用数据库唯一键、Redis 去重表等。
3.如何选择 key 和 value 的序列化方式?
-
简单场景:
StringSerializer+ 手动构造 JSON 字符串。 -
对象场景:
JsonSerializer最方便,但要处理好类型信任问题。 -
高性能场景:考虑
Avro或Protobuf,配合 Schema Registry。
4.监听器方法中可以返回 CompletableFuture 吗?
可以,但需要设置 @KafkaListener 的 properties 参数为 async,且返回类型必须是 ListenableFuture 或 CompletableFuture。实际使用较少,因为大多数业务是同步的。
七、总结
本文通过一个完整的 Spring Boot 集成 Kafka 的示例,演示了:
-
如何添加依赖和配置 application.yml。
-
如何用
KafkaTemplate发送消息。 -
如何用
@KafkaListener接收消息。 -
进阶功能:手动提交、批量消费、事务、重试等。
Spring Kafka 将底层的 Kafka 客户端封装得非常优雅,开发者只需要关注业务逻辑。但要注意,生产环境中消息的不丢失、不重复问题必须从配置和业务两方面保证。希望这篇博客能帮助你快速上手 Kafka + Spring Boot,并在实际项目中少踩坑。
项目源码:可访问 GitHub 示例仓库(示例链接请替换为真实地址)。
参考资料:
如果你觉得本文有帮助,请点赞、收藏、转发支持!
更多推荐
所有评论(0)