kafka--基础--4.1--集成springboot--简单案例
·
kafka–基础–4.1–集成springboot–简单案例
1、代码
1.1、结构

1.2、代码
KafkaConsumer
package feizhou.kafka.demo1.business;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @ClassName: KafkaConsumer
* @Description 请描述下该类是做什么的
* @Author feizhou
* @Date 2025/10/7 16:04
* @Verson 1.0
**/
@Service
public class KafkaConsumer {
// 监听test主题的消息
@KafkaListener(topics = "test", groupId = "my-consumer-group")
public void consumeMessage(String message, Acknowledgment ack) {
System.out.println("收到消息 in group my-group: " + message);
// 手动提交偏移量 - 消息处理成功后提交
ack.acknowledge();
}
// // 批量消费手动提交
// @KafkaListener(topics = "test", groupId = "my-consumer-group")
// public void consumeBatchMessage(List<String> messages, Acknowledgment ack) {
// try {
// System.out.println("收到批量消息,数量: " + messages.size());
//
// for (String message : messages) {
// System.out.println("处理消息: " + message);
// // 批量处理逻辑
// }
//
// // 批量处理完成后一次性提交
// ack.acknowledge();
// System.out.println("批量偏移量已提交");
//
// } catch (Exception e) {
// System.err.println("批量处理失败: " + e.getMessage());
// // 不提交,整个批次会重新投递
// }
// }
}
application-kafka.yml
spring:
kafka:
bootstrap-servers: 192.168.187.89:9092 # Kafka集群地址
# 生产者配置
producer:
compression-type: gzip # 默认不压缩,可选值:"gzip", "snappy", "lz4", "zstd"
acks: all # 确认机制
retries: 3 # 发送失败后重试次数
retry-backoff-ms: 500 # 每次重试之间的间隔时间(单位:毫秒)
enable-idempotence: true # 启用幂等性
linger-ms: 5 # 延迟发送的时间,单位毫秒,默认 0ms(立即发送)
batch-size: 32768 # 生产者每个批次发送的最大字节数,默认 16KB
buffer-memory: 67108864 # 生产者缓冲区内存大小,单位字节,默认32MB
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键的序列化器
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化器
transactional-id-prefix: tx- # 启用事务时的前缀,用于事务标识
# 生产者拦截器,用于记录日志和指标收集
interceptors:
- org.apache.kafka.clients.producer.internals.LogAndMetricsProducerInterceptor
# 消费者配置
consumer:
group-id: my-consumer-group # 消费者组ID
concurrency: 10 # 设置消费者并发数
enable-auto-commit: false # 禁用自动提交,手动提交偏移量
auto-offset-reset: earliest # 如果偏移量无效则从最早的消息开始消费。可选"latest"
session-timeout-ms: 10000 # session会话超时时间(与心跳有关)
max-poll-records: 1000 # 每次poll的最大记录数
max-poll-interval-ms: 300000 # 最大轮询间隔 ,确保消费者不会在空闲期间超时
fetch-max-wait-ms: 300 # 拉取消息时的最大等待时间
fetch-max-bytes: 104857600 # 拉取的最大字节数,单位字节
fetch-min-size: 5000 # 拉取消息时的最小字节数,单位字节
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 键的反序列化器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化器
isolation-level: read_committed # 事务隔离级别,确保读取已提交的数据。可选 "read_uncommitted"
# Admin配置,用于创建和管理Kafka的主题
admin:
bootstrap-servers: localhost:9092 # Kafka集群地址
default-topic:
partitions: 5 # 默认主题的分区数
replication-factor: 3 # 默认主题的副本数
# 副本机制
replication: # 副本落后超过此时间(毫秒)会被移出 ISR 列表
replica_lag_time_max_ms: 10000 # 默认为 10000 毫秒,即 10 秒
# 最少需要多少个同步副本才能进行写入
min_insync_replicas: 2 # 默认为 1,根据需求设置
# 是否允许在没有足够 ISR 的情况下进行领导者选举
unclean_leader_election_enable: false # 默认为 false,防止数据丢失
# 监听器的配置
listener:
concurrency: 10 # 设置监听器的并发数,控制消费者的并发线程数
ack-mode: manual # 手动确认消息,默认是 "auto" 自动确认
poll-timeout: 3000 # 拉取消息的超时时间,单位毫秒
missing-topics-fatal: false # 是否忽略主题缺失的错误,默认是false
container-factory: kafkaListenerContainerFactory # 默认监听器容器工厂
# 错误处理配置
error-handler:
# 在消息消费出错时重试的配置
retry:
max-attempts: 3 # 最大重试次数
initial-interval: 1000 # 初始重试间隔,单位毫秒
multiplier: 1.5 # 重试间隔的增量倍数
#日志段和日志保留配置
log:
# 设置每个日志段的大小
segment:
bytes: 1073741824 # 每个日志段的大小,默认为 1GB # 设置日志保留时间(毫秒)
retention:
ms: 604800000 # 7天,单位是毫秒
# 设置日志的最大保留大小
retention_bytes: 10737418240 # 默认为 10GB
# 设置日志清理策略(delete 或 compact)
cleanup_policy: "delete" # 可选:delete 或 compact
# 设置日志索引间隔大小
index_interval_bytes: 4096 # 默认为 4096
MainDemo1Tests
package feizhou.kafka.demo1;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootTest
class MainDemo1Tests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
void sendMessage() {
kafkaTemplate.send("test","111111111");
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.5.6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>feizhou</groupId>
<artifactId>kafka-demo1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-demo1</name>
<description>kafka-demo1</description>
<url/>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.3.10</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、测试结果


更多推荐
所有评论(0)