Linux部署RocketMQ5.1.4 + ACL密码鉴权 + SpringBoot普通即时消息生产消费Demo
·
Linux部署RocketMQ5.1.4 + ACL密码鉴权 + SpringBoot普通即时消息生产消费Demo
目录
- 服务器环境准备
- RocketMQ安装部署
- 开启ACL账号密码鉴权
- 启停/重启全套命令
- SpringBoot 普通即时消息完整Demo(生产者+消费者)
- 测试接口与运行说明
- 常见问题
一、服务器环境准备
1. 基础环境
- 系统:CentOS7/CentOS8
- JDK:1.8+
- 开放端口:9876(NameServer)、10911(Broker)
2. 防火墙放行端口
firewall-cmd --add-port=9876/tcp --permanent
firewall-cmd --add-port=10911/tcp --permanent
firewall-cmd --reload
# 查看放行端口
firewall-cmd --list-ports
3. 创建安装目录
mkdir -p /usr/local/rocketmq
cd /usr/local/rocketmq
二、RocketMQ 5.1.4 安装解压
- 下载二进制包:
rocketmq-all-5.1.4-bin-release.zip,上传服务器 - 解压
unzip rocketmq-all-5.1.4-bin-release.zip
mv rocketmq-all-5.1.4-bin-release rocketmq5.1.4
cd rocketmq5.1.4
# 配置临时环境变量
export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq5.1.4
三、修改JVM内存(低配服务器必改,防止OOM)
1. 修改NameServer内存
vim bin/runserver.sh
# 找到JAVA_OPT,修改内存参数
JAVA_OPT="${JAVA_OPT} -Xms256m -Xmx256m -Xmn128m"
2. 修改Broker内存
vim bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -Xms256m -Xmx256m -Xmn128m"
四、开启ACL密码鉴权(设置连接账号密码)
4.1 开启Broker鉴权开关
vim conf/broker.conf
# 文件末尾追加配置
aclEnable=true
autoCreateTopicEnable=true
# 外网访问填写服务器公网IP,内网填内网IP
brokerIP1=120.xx.xx.xx
4.2 配置账号权限 conf/plain_acl.yml
accounts:
# 运维管理员账号
- accessKey: mq_admin
secretKey: Admin@2026Rmq
whiteRemoteAddress: 127.0.0.1
admin: true
# Java业务应用连接账号(代码中使用)
- accessKey: app_mq_user
secretKey: App@666888
whiteRemoteAddress: 0.0.0.0
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: DENY
# 仅允许操作普通消息Topic:发布+订阅
topicPerms:
- "order_msg_topic=PUB|SUB"
groupPerms:
- "order-consumer-group=SUB"
globalWhiteRemoteAddresses:
- 127.0.0.1
accessKey=用户名,secretKey=密码,修改保存后重启Broker生效
五、RocketMQ 启动/停止/重启命令
进入MQ根目录执行:cd /usr/local/rocketmq/rocketmq5.1.4
5.1 启动(顺序:先NameServer,后Broker)
# 后台启动NameServer
nohup sh bin/mqnamesrv &
# 查看启动日志,输出 boot success 代表成功
tail -f ~/logs/rocketmqlogs/namesrv.log
# 启动Broker,替换为你的服务器IP
nohup sh bin/mqbroker -n 120.xx.xx.xx:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
5.2 停止(顺序:先Broker,后NameServer)
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
5.3 重启完整流程
# 1.停止服务
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
sleep 5
# 2.重新启动
nohup sh bin/mqnamesrv &
sleep 3
nohup sh bin/mqbroker -n 120.xx.xx.xx:9876 &
六、SpringBoot 普通即时消息完整Demo
场景说明:下单后发送即时异步消息,消费者接收消息执行业务(短信通知、库存扣减、日志记录等)
6.1 pom.xml 核心依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.15</version>
<relativePath/>
</parent>
<groupId>com.mq</groupId>
<artifactId>rocketmq-normal-demo</artifactId>
<version>1.0.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- web测试接口 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- RocketMQ starter 支持ACL鉴权 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<!-- JSON序列化 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.32</version>
</dependency>
</dependencies>
</project>
6.2 application.yml 配置(MQ地址+ACL账号密码)
spring:
application:
name: rocketmq-normal-demo
rocketmq:
# 替换为你的服务器公网/内网IP
name-server: 120.xx.xx.xx:9876
producer:
group: order-producer-group
# ACL账号密码(与plain_acl.yml配置一致)
access-key: app_mq_user
secret-key: App@666888
send-message-timeout: 5000
6.3 消息实体 OrderMsg.java
package com.mq.entity;
import lombok.Data;
@Data
public class OrderMsg {
// 订单编号
private String orderNo;
// 用户ID
private Long userId;
// 订单金额
private Double amount;
// 消息备注
private String remark;
}
6.4 生产者:发送普通即时消息 OrderProducer.java
package com.mq.service;
import com.alibaba.fastjson2.JSON;
import com.mq.entity.OrderMsg;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 普通消息Topic
private static final String TOPIC_NAME = "order_msg_topic";
/**
* 发送普通同步即时消息
*/
public void sendOrderMsg(OrderMsg orderMsg) {
try {
// 同步发送,等待broker返回发送成功
rocketMQTemplate.syncSend(TOPIC_NAME, orderMsg);
System.out.println("订单消息发送成功,订单号:" + orderMsg.getOrderNo());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("发送订单消息失败");
}
}
}
6.5 消费者:监听并消费消息 OrderConsumer.java
package com.mq.consumer;
import com.alibaba.fastjson2.JSON;
import com.mq.entity.OrderMsg;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@RocketMQMessageListener(
topic = "order_msg_topic",
consumerGroup = "order-consumer-group"
)
public class OrderConsumer implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
// 解析消息体
String body = new String(msg.getBody());
OrderMsg orderMsg = JSON.parseObject(body, OrderMsg.class);
System.out.println("===== 收到订单消息 =====");
System.out.println("订单号:" + orderMsg.getOrderNo());
System.out.println("用户ID:" + orderMsg.getUserId());
System.out.println("订单金额:" + orderMsg.getAmount());
// 业务逻辑:
// 1. 发送下单短信通知用户
// 2. 异步扣减库存
// 3. 记录订单操作日志
// 4. 推送订单消息给运营后台
// 消费成功,告知Broker删除本条消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
e.printStackTrace();
// 消费异常,返回重试,Broker自动重试3次
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
6.6 测试接口 Controller MqTestController.java
package com.mq.controller;
import com.mq.entity.OrderMsg;
import com.mq.service.OrderProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@RestController
public class MqTestController {
@Autowired
private OrderProducer orderProducer;
/**
* 测试发送普通订单消息
* 访问地址:http://127.0.0.1:8080/send/order?userId=10001&amount=99.9
*/
@GetMapping("/send/order")
public String sendOrderMsg(@RequestParam Long userId, @RequestParam Double amount) {
OrderMsg msg = new OrderMsg();
// 生成唯一订单号
String orderNo = "ORD" + UUID.randomUUID().toString().substring(0, 10);
msg.setOrderNo(orderNo);
msg.setUserId(userId);
msg.setAmount(amount);
msg.setRemark("用户下单异步通知消息");
orderProducer.sendOrderMsg(msg);
return "消息发送完成,订单号:" + orderNo;
}
}
6.7 项目启动类 RocketMqNormalApplication.java
package com.mq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketMqNormalApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMqNormalApplication.class, args);
}
}
七、运行测试流程
- 服务器启动NameServer + Broker,确认无报错日志
- 启动SpringBoot项目
- 浏览器访问测试接口:
http://localhost:8080/send/order?userId=10001&amount=199.5 - 查看项目控制台,消费者打印订单信息,代表收发正常
八、常见踩坑问题
- ACL权限报错
yml中access-key、secret-key和plain_acl.yml必须完全一致;topicPerms配置PUB|SUB读写权限。 - 外网无法连接Broker
broker.conf配置brokerIP1为公网IP,防火墙放行9876、10911端口。 - 消息重复消费
业务层增加幂等处理(订单号唯一索引、Redis标记)。 - 消费失败无限重试
多次失败消息会进入死信队列,可单独监听死信Topic做人工补偿。 - 启动内存不足
修改runserver.sh、runbroker.sh降低Xms/Xmx内存参数。
九、普通即时消息适用业务场景
- 订单创建后异步通知短信、公众号推送
- 支付完成异步更新订单状态、发放优惠券
- 用户操作日志异步入库
- 商品上架同步推送搜索索引
- 审批流程异步消息通知负责人
更多推荐
所有评论(0)