Linux部署RocketMQ5.1.4 + ACL密码鉴权 + SpringBoot普通即时消息生产消费Demo

目录

  1. 服务器环境准备
  2. RocketMQ安装部署
  3. 开启ACL账号密码鉴权
  4. 启停/重启全套命令
  5. SpringBoot 普通即时消息完整Demo(生产者+消费者)
  6. 测试接口与运行说明
  7. 常见问题

一、服务器环境准备

1. 基础环境

  • 系统:CentOS7/CentOS8
  • JDK:1.8+
  • 开放端口:9876(NameServer)、10911(Broker)

2. 防火墙放行端口

firewall-cmd --add-port=9876/tcp --permanent
firewall-cmd --add-port=10911/tcp --permanent
firewall-cmd --reload
# 查看放行端口
firewall-cmd --list-ports

3. 创建安装目录

mkdir -p /usr/local/rocketmq
cd /usr/local/rocketmq

二、RocketMQ 5.1.4 安装解压

  1. 下载二进制包:rocketmq-all-5.1.4-bin-release.zip,上传服务器
  2. 解压
unzip rocketmq-all-5.1.4-bin-release.zip
mv rocketmq-all-5.1.4-bin-release rocketmq5.1.4
cd rocketmq5.1.4
# 配置临时环境变量
export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq5.1.4

三、修改JVM内存(低配服务器必改,防止OOM)

1. 修改NameServer内存

vim bin/runserver.sh
# 找到JAVA_OPT,修改内存参数
JAVA_OPT="${JAVA_OPT} -Xms256m -Xmx256m -Xmn128m"

2. 修改Broker内存

vim bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -Xms256m -Xmx256m -Xmn128m"

四、开启ACL密码鉴权(设置连接账号密码)

4.1 开启Broker鉴权开关

vim conf/broker.conf
# 文件末尾追加配置
aclEnable=true
autoCreateTopicEnable=true
# 外网访问填写服务器公网IP,内网填内网IP
brokerIP1=120.xx.xx.xx

4.2 配置账号权限 conf/plain_acl.yml

accounts:
  # 运维管理员账号
  - accessKey: mq_admin
    secretKey: Admin@2026Rmq
    whiteRemoteAddress: 127.0.0.1
    admin: true
  # Java业务应用连接账号(代码中使用)
  - accessKey: app_mq_user
    secretKey: App@666888
    whiteRemoteAddress: 0.0.0.0
    admin: false
    defaultTopicPerm: DENY
    defaultGroupPerm: DENY
    # 仅允许操作普通消息Topic:发布+订阅
    topicPerms:
      - "order_msg_topic=PUB|SUB"
    groupPerms:
      - "order-consumer-group=SUB"
globalWhiteRemoteAddresses:
  - 127.0.0.1

accessKey=用户名,secretKey=密码,修改保存后重启Broker生效

五、RocketMQ 启动/停止/重启命令

进入MQ根目录执行:cd /usr/local/rocketmq/rocketmq5.1.4

5.1 启动(顺序:先NameServer,后Broker)

# 后台启动NameServer
nohup sh bin/mqnamesrv &
# 查看启动日志,输出 boot success 代表成功
tail -f ~/logs/rocketmqlogs/namesrv.log

# 启动Broker,替换为你的服务器IP
nohup sh bin/mqbroker -n 120.xx.xx.xx:9876 &
tail -f ~/logs/rocketmqlogs/broker.log

5.2 停止(顺序:先Broker,后NameServer)

sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

5.3 重启完整流程

# 1.停止服务
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
sleep 5
# 2.重新启动
nohup sh bin/mqnamesrv &
sleep 3
nohup sh bin/mqbroker -n 120.xx.xx.xx:9876 &

六、SpringBoot 普通即时消息完整Demo

场景说明:下单后发送即时异步消息,消费者接收消息执行业务(短信通知、库存扣减、日志记录等)

6.1 pom.xml 核心依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.15</version>
        <relativePath/>
    </parent>

    <groupId>com.mq</groupId>
    <artifactId>rocketmq-normal-demo</artifactId>
    <version>1.0.0</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- web测试接口 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- RocketMQ starter 支持ACL鉴权 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
        </dependency>
        <!-- JSON序列化 -->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.32</version>
        </dependency>
    </dependencies>
</project>

6.2 application.yml 配置(MQ地址+ACL账号密码)

spring:
  application:
    name: rocketmq-normal-demo
rocketmq:
  # 替换为你的服务器公网/内网IP
  name-server: 120.xx.xx.xx:9876
  producer:
    group: order-producer-group
    # ACL账号密码(与plain_acl.yml配置一致)
    access-key: app_mq_user
    secret-key: App@666888
    send-message-timeout: 5000

6.3 消息实体 OrderMsg.java

package com.mq.entity;

import lombok.Data;

@Data
public class OrderMsg {
    // 订单编号
    private String orderNo;
    // 用户ID
    private Long userId;
    // 订单金额
    private Double amount;
    // 消息备注
    private String remark;
}

6.4 生产者:发送普通即时消息 OrderProducer.java

package com.mq.service;

import com.alibaba.fastjson2.JSON;
import com.mq.entity.OrderMsg;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    // 普通消息Topic
    private static final String TOPIC_NAME = "order_msg_topic";

    /**
     * 发送普通同步即时消息
     */
    public void sendOrderMsg(OrderMsg orderMsg) {
        try {
            // 同步发送,等待broker返回发送成功
            rocketMQTemplate.syncSend(TOPIC_NAME, orderMsg);
            System.out.println("订单消息发送成功,订单号:" + orderMsg.getOrderNo());
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("发送订单消息失败");
        }
    }
}

6.5 消费者:监听并消费消息 OrderConsumer.java

package com.mq.consumer;

import com.alibaba.fastjson2.JSON;
import com.mq.entity.OrderMsg;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@RocketMQMessageListener(
        topic = "order_msg_topic",
        consumerGroup = "order-consumer-group"
)
public class OrderConsumer implements MessageListenerConcurrently {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            try {
                // 解析消息体
                String body = new String(msg.getBody());
                OrderMsg orderMsg = JSON.parseObject(body, OrderMsg.class);
                System.out.println("===== 收到订单消息 =====");
                System.out.println("订单号:" + orderMsg.getOrderNo());
                System.out.println("用户ID:" + orderMsg.getUserId());
                System.out.println("订单金额:" + orderMsg.getAmount());

                // 业务逻辑:
                // 1. 发送下单短信通知用户
                // 2. 异步扣减库存
                // 3. 记录订单操作日志
                // 4. 推送订单消息给运营后台

                // 消费成功,告知Broker删除本条消息
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                e.printStackTrace();
                // 消费异常,返回重试,Broker自动重试3次
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

6.6 测试接口 Controller MqTestController.java

package com.mq.controller;

import com.mq.entity.OrderMsg;
import com.mq.service.OrderProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
public class MqTestController {

    @Autowired
    private OrderProducer orderProducer;

    /**
     * 测试发送普通订单消息
     * 访问地址:http://127.0.0.1:8080/send/order?userId=10001&amount=99.9
     */
    @GetMapping("/send/order")
    public String sendOrderMsg(@RequestParam Long userId, @RequestParam Double amount) {
        OrderMsg msg = new OrderMsg();
        // 生成唯一订单号
        String orderNo = "ORD" + UUID.randomUUID().toString().substring(0, 10);
        msg.setOrderNo(orderNo);
        msg.setUserId(userId);
        msg.setAmount(amount);
        msg.setRemark("用户下单异步通知消息");

        orderProducer.sendOrderMsg(msg);
        return "消息发送完成,订单号:" + orderNo;
    }
}

6.7 项目启动类 RocketMqNormalApplication.java

package com.mq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RocketMqNormalApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketMqNormalApplication.class, args);
    }
}

七、运行测试流程

  1. 服务器启动NameServer + Broker,确认无报错日志
  2. 启动SpringBoot项目
  3. 浏览器访问测试接口:
    http://localhost:8080/send/order?userId=10001&amount=199.5
  4. 查看项目控制台,消费者打印订单信息,代表收发正常

八、常见踩坑问题

  1. ACL权限报错
    yml中access-key、secret-key和plain_acl.yml必须完全一致;topicPerms配置PUB|SUB读写权限。
  2. 外网无法连接Broker
    broker.conf配置brokerIP1为公网IP,防火墙放行9876、10911端口。
  3. 消息重复消费
    业务层增加幂等处理(订单号唯一索引、Redis标记)。
  4. 消费失败无限重试
    多次失败消息会进入死信队列,可单独监听死信Topic做人工补偿。
  5. 启动内存不足
    修改runserver.sh、runbroker.sh降低Xms/Xmx内存参数。

九、普通即时消息适用业务场景

  1. 订单创建后异步通知短信、公众号推送
  2. 支付完成异步更新订单状态、发放优惠券
  3. 用户操作日志异步入库
  4. 商品上架同步推送搜索索引
  5. 审批流程异步消息通知负责人

更多推荐