SpringBoot整合RocketMQ
1. 启动RocketMQ服务 引入依赖
在pom.xml中添加RocketMQ与SpringBoot整合的核心依赖rocketmq-spring-boot-starter,注意版本匹配(SpringBoot 3.x需JDK 17+,排除旧版本的rocketmq-client,引入新版本5.3.0,以及SpringBoot的web和test依赖)
<dependencies>
<!-- RocketMQ SpringBoot Starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
<exclusions>
<!-- 排除旧版本客户端,避免冲突 -->
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- RocketMQ客户端核心依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
<!-- SpringBoot Web(可选,根据业务需求) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.0.4</version>
</dependency>
</dependencies>
2. 配置RocketMQ参数
在application.properties或application.yml中配置NameServer地址、生产者组等核心参数:
# NameServer地址(替换为实际部署的地址)
rocketmq.name-server=192.168.65.112:9876
# 生产者组名(自定义,需唯一)
rocketmq.producer.group=springBootGroup
# 消费者配置(可选,也可在注解中指定)
# rocketmq.consumer.group=testConsumerGroup
# rocketmq.consumer.topic=TestTopic
3. 创建启动类
编写SpringBoot启动类,添加@SpringBootApplication注解:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketMQApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQApplication.class, args);
}
}
4. 实现消息生产者
通过RocketMQTemplate发送消息,注入模板后调用convertAndSend发送方法(支持多种消息类型,如普通消息、事务消息等):
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class SpringProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
// 发送普通消息(Topic需提前创建)
public void sendMessage(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
}
// 发送带Tag的消息(如:TopicTest:TagA)
public void sendMessageWithTag(String topic, String tag, String message) {
rocketMQTemplate.convertAndSend(topic + ":" + tag, message);
}
}
5. 实现消息消费者
通过@RocketMQMessageListener注解声明消费者,实现RocketMQListener接口,重写onMessage方法处理消息:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
consumerGroup = "testConsumerGroup", // 消费者组名(自定义)
topic = "TestTopic", // 订阅的Topic
consumeMode = ConsumeMode.CONCURRENTLY, // 消费模式:并发消费(默认)/顺序消费
messageModel = MessageModel.CLUSTERING // 消息模型:集群消费(默认)/广播消费
)
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 处理消息逻辑
System.out.println("收到消息:" + message);
}
}
6. 测试整合效果
发送消息:通过Controller或单元测试调用生产者的sendMessage方法发送消息。
接收消息:启动SpringBoot应用,消费者会自动监听并处理消息,控制台输出“收到消息:xxx”。
关键注意事项
1.核心版本兼容性原则 :RocketMQ-Spring官方文档:https://github.com/apache/rocketmq-spring例如:rocketmq-spring-boot-starter:2.3.x 对应 SpringBoot 3.x,2.2.x 对应 SpringBoot 2.x。
1) SpringBoot与JDK版本匹配
SpringBoot 3.x 必须使用JDK 17+(文档明确提到:“SpringBoot升级到了3.0.4版本后,JDK要升级到17以上”)。
SpringBoot 2.x可使用JDK 8/11,需对应较低版本的RocketMQ Starter。
2) RocketMQ Starter与Client版本匹配
RocketMQ Starter(rocketmq-spring-boot-starter)的版本需与底层rocketmq-client版本对应,避免因依赖传递导致冲突。
推荐组合:SpringBoot 3.0.4 + rocketmq-spring-boot-starter:2.3.1 + rocketmq-client:5.3.0(需手动排除Starter自带的旧Client)
3)检查依赖树,定位冲突源
使用Maven/Gradle命令查看依赖树,确认是否有其他依赖间接引入低版本RocketMQ:
Maven:mvn dependency:tree | grep rocketmq
Gradle:./gradlew dependencies | grep rocketmq
若发现冲突(如rocketmq-client:4.x被其他依赖引入),在对应依赖中添加排除:
<!-- 例如:排除SpringCloud Alibaba中的旧版RocketMQ依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-rocketmq</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
2.多Topic支持:若需往多个Topic发送消息,可通过@ExtRocketMQTemplateConfiguration注解创建多个RocketMQTemplate实例。
3.事务消息:需定义事务监听器(@RocketMQTransactionListener),并在生产者中使用rocketMQTemplate.sendMessageInTransaction方法。
4.消费者配置:@RocketMQMessageListener中的consumerGroup和topic必须与生产者对应,否则无法接收消息。
更多推荐



所有评论(0)