RabbitMQ 仲裁队列原生 Java 测试代码

代码注解版 · 非 Spring Boot 写法

适用场景

使用 RabbitMQ 原生 Java Client,演示如何在非 Spring Boot 环境下创建仲裁队列、发送消息、消费消息并进行手动确认。

文档版本

代码注解版,可直接拷贝到普通 Maven 项目中测试。

核心提醒

仲裁队列必须显式传入 x-queue-type=quorum;如果是单机环境,x-quorum-initial-group-size 建议写 1。

1. 使用前先明确两件事

这份文档使用的是 RabbitMQ 原生 Java Client,不依赖 Spring Boot,也不使用 RabbitTemplate 和 @RabbitListener。所有交换机声明、队列声明、消息发送与消息消费都直接通过 Channel 完成。

关键提醒

  • 如果你继续使用 channel.exchangeDeclare()、channel.queueDeclare()、channel.basicPublish() 这套方式,就不要再混用 Spring 风格的发送和监听代码。
  • 仲裁队列不是普通的 queueDeclare(..., null)。真正决定队列类型的是参数 Map 中的 x-queue-type=quorum。
  • 单机测试建议把 x-quorum-initial-group-size 写成 1;只有在 3 节点集群环境下,才建议改成 3。

2. Maven 依赖(注解版)

如果你使用普通 Maven 项目,可以先引入 RabbitMQ Java Client 依赖。下面给出一个带注释的依赖片段,便于直接复制到 pom.xml 中。

<dependencies>

    <!-- RabbitMQ 官方 Java 客户端,用于创建连接、通道、发送消息和消费消息 -->

    <dependency>

        <groupId>com.rabbitmq</groupId>

        <artifactId>amqp-client</artifactId>

        <version>5.20.0</version>

    </dependency>

    <!-- 仅用于打印日志,方便观察收发结果;如果你不想加日志框架,也可以直接用 System.out -->

    <dependency>

        <groupId>org.slf4j</groupId>

        <artifactId>slf4j-simple</artifactId>

        <version>2.0.12</version>

    </dependency>

</dependencies>

说明:如果你的本地环境已有其他日志实现,可以保留 amqp-client,按需替换 slf4j-simple。

3. 完整 Java 代码(注解版)

下面这份 QuorumQueueRawDemo.java 可以直接运行。代码里已经加入中文注释,目的是让你一眼看懂每一段在做什么。

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.CancelCallback;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;

import java.util.HashMap;

import java.util.Map;

public class QuorumQueueRawDemo {

    // ===== 1. RabbitMQ 连接信息 =====

    // 本地测试时,通常使用默认的 guest / guest 账号和 5672 端口。

    private static final String HOST = "127.0.0.1";

    private static final int PORT = 5672;

    private static final String USERNAME = "guest";

    private static final String PASSWORD = "guest";

    // ===== 2. 交换机、队列、路由键 =====

    // 这三个名字要保持一致,否则消息可能发出去了,但收不到。

    private static final String EXCHANGE_NAME = "exchange.quorum.test";

    private static final String QUEUE_NAME = "queue.quorum.test";

    private static final String ROUTING_KEY = "routing.key.quorum.test";

    public static void main(String[] args) throws Exception {

        // 3. 创建连接工厂,相当于先准备好“连接 RabbitMQ 的配置模板”。

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost(HOST);

        factory.setPort(PORT);

        factory.setUsername(USERNAME);

        factory.setPassword(PASSWORD);

        // 4. 给消费者单独开一个连接和通道。

        // 这么做更清晰,也更符合“生产和消费职责分离”的思路。

        Connection consumerConnection = factory.newConnection("quorum-consumer-connection");

        Channel consumerChannel = consumerConnection.createChannel();

        // 5. 先声明交换机、仲裁队列和绑定关系。

        // 单机测试时,仲裁初始副本数写 1 即可。

        declareQuorumInfrastructure(consumerChannel, 1);

        // 6. 启动消费者,让它开始监听队列。

        startConsumer(consumerChannel);

        // 7. 再给生产者单独开一个连接和通道。

        Connection producerConnection = factory.newConnection("quorum-producer-connection");

        Channel producerChannel = producerConnection.createChannel();

        // 8. 再声明一次也没问题,只要参数保持完全一致即可。

        // RabbitMQ 会认为这是同一套基础设施,不会重复创建冲突资源。

        declareQuorumInfrastructure(producerChannel, 1);

        // 9. 连续发送 5 条测试消息,观察控制台是否能正常消费。

        for (int i = 1; i <= 5; i++) {

            String msg = "message test quorum ~~~ #" + i;

            producerChannel.basicPublish(

                    EXCHANGE_NAME,

                    ROUTING_KEY,

                    // 把消息声明为持久化文本消息,更符合仲裁队列“可靠消息”的测试场景。

                    MessageProperties.PERSISTENT_TEXT_PLAIN,

                    msg.getBytes(StandardCharsets.UTF_8)

            );

            System.out.println("生产者发送消息: " + msg);

            Thread.sleep(1000);

        }

        // 10. 留一点时间给消费者处理消息,避免主线程过早退出。

        Thread.sleep(10000);

        // 11. 最后记得关闭通道和连接,避免资源泄露。

        producerChannel.close();

        producerConnection.close();

        consumerChannel.close();

        consumerConnection.close();

        System.out.println("程序结束");

    }

    /**

     * 声明交换机、仲裁队列和绑定关系。

     * @param channel RabbitMQ 通道

     * @param initialGroupSize 仲裁初始副本数;单机测试写 1,三节点集群可写 3

     */

    private static void declareQuorumInfrastructure(Channel channel, int initialGroupSize) throws Exception {

        // 1. 声明 Direct 交换机。

        // durable=true 表示交换机持久化,RabbitMQ 重启后仍然存在。

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);

        // 2. 准备队列参数。

        // 其中 x-queue-type=quorum 是最关键的一行,少了它就不是仲裁队列。

        Map<String, Object> args = new HashMap<>();

        args.put("x-queue-type", "quorum");

        // 3. 设置仲裁初始副本数。

        // 单机环境写 1;如果你是 3 节点集群,并且想测试高可用,可以改成 3。

        args.put("x-quorum-initial-group-size", initialGroupSize);

        // 4. 声明队列。

        // 第 2 个参数 true 表示队列持久化;后面两个 false 表示非独占、非自动删除。

        channel.queueDeclare(QUEUE_NAME, true, false, false, args);

        // 5. 把交换机和队列绑定起来。

        // 这样发到 exchange.quorum.test 且路由键匹配的消息,才会进入该队列。

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        System.out.println("交换机、仲裁队列、绑定关系声明成功");

    }

    /**

     * 启动消费者,并关闭自动确认,改为手动确认。

     */

    private static void startConsumer(Channel channel) throws Exception {

        // 每次先取 1 条消息,便于观察“收到一条,处理一条,确认一条”的过程。

        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {

            String data = new String(delivery.getBody(), StandardCharsets.UTF_8);

            long deliveryTag = delivery.getEnvelope().getDeliveryTag();

            try {

                System.out.println("消费者收到消息: " + data);

                // 模拟业务处理。

                Thread.sleep(500);

                // 处理成功后,手动 ACK。

                // false 表示只确认当前这一条消息,不批量确认。

                channel.basicAck(deliveryTag, false);

                System.out.println("消息手动确认成功,deliveryTag = " + deliveryTag);

            } catch (Exception e) {

                System.out.println("消费失败: " + e.getMessage());

                // 处理失败时,执行 NACK。

                // 最后一个参数 true 表示把消息重新放回队列,后续还可以继续重试。

                channel.basicNack(deliveryTag, false, true);

            }

        };

        CancelCallback cancelCallback = consumerTag ->

                System.out.println("消费者被取消: " + consumerTag);

        // autoAck=false:关闭自动确认。

        // 如果这里写 true,那么消息一投递给消费者就会被系统自动确认,

        // 这就无法体现“业务处理完成后再确认”的意义了。

        channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);

        System.out.println("消费者启动成功,正在监听队列: " + QUEUE_NAME);

    }

}

提示:如果你只是做单机实验,这份代码已经够用;如果要验证高可用,再把 initialGroupSize 改为 3,并准备三节点 RabbitMQ 集群。

4. 代码执行逻辑

  • 先创建 ConnectionFactory,统一配置 RabbitMQ 的地址、端口和登录信息。
  • 消费者连接先启动,并声明交换机、仲裁队列和绑定关系。
  • 消费者通过 basicConsume() 监听队列,且把 autoAck 设置为 false,改用手动确认。
  • 生产者随后通过 basicPublish() 向交换机发送消息,交换机再按路由键把消息转发到仲裁队列。
  • 消费者收到消息后先处理业务,再调用 basicAck() 确认;如果处理失败,则调用 basicNack() 让消息重新入队。

5. 运行步骤

  • 先启动本地 RabbitMQ 服务,并确认 5672 端口可用。
  • 把文档中的 Java 文件复制到普通 Maven 项目中,同时加入 amqp-client 依赖。
  • 执行 main() 方法,观察控制台日志。
  • 打开 RabbitMQ 管理界面,确认 queue.quorum.test 的类型为 quorum。
  • 当控制台出现“生产者发送消息”“消费者收到消息”“消息手动确认成功”等输出时,说明实验闭环已经跑通。

6. 常见错误与定位方法

  • 如果 queueDeclare(..., null) 也能创建队列,但类型显示不是 quorum,说明你少传了 x-queue-type=quorum。
  • 如果单机环境把 x-quorum-initial-group-size 写成 3,队列声明可能会失败,因为本地没有足够副本节点。
  • 如果生产者显示发送成功,但消费者没有收到,优先检查交换机名、队列名、路由键三者是否完全一致。
  • 如果消息已经打印出来,但又重复投递,通常要检查是否没有执行 basicAck(),或者业务代码抛异常后走了 basicNack()。

7. 一句话结论

这份代码的核心不是“能把消息发出去”,而是用 RabbitMQ 原生 Java Client 把仲裁队列、手动确认和可靠消费这一整条链路跑通,并且把关键点都写成了看得懂的中文注释。

RabbitMQ 优先级队列原生 Java 测试代码

注解版(不使用 Spring Boot)

文档用途: 用 RabbitMQ 原生 Java Client 演示“优先级队列”的创建、发送、消费与手动确认。本文示例采用经典队列实现多级优先级,方便直接观察不同优先级消息的投递顺序。

一、优先级队列的核心思路

声明队列时必须设置队列参数 x-max-priority,否则它就是普通队列,不会按优先级投递。

生产者发消息时,要在消息属性里写入 priority 值;数字越大,优先级越高。

为了更容易观察优先级效果,通常让消息先进入队列,再启动消费者,或者给消费者设置较小的 prefetch 值。

本示例使用手动确认 basicAck(),既方便实验,也更接近真实项目里的可靠消费场景。

二、Maven 依赖

pom.xml 依赖片段:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>

三、完整代码(带中文注解)

PriorityQueueRawDemo.java:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class PriorityQueueRawDemo {

    // =========================
    // 1. RabbitMQ 连接信息
    // =========================
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 5672;
    private static final String USERNAME = "guest";
    private static final String PASSWORD = "guest";

    // =========================
    // 2. 交换机、队列、路由键
    // =========================
    private static final String EXCHANGE_NAME = "exchange.priority.test";
    private static final String QUEUE_NAME = "queue.priority.test";
    private static final String ROUTING_KEY = "routing.key.priority.test";

    public static void main(String[] args) throws Exception {
        // 创建连接工厂,并设置 RabbitMQ 服务端地址
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);

        // 生产者连接和通道
        Connection producerConnection = factory.newConnection("priority-producer-connection");
        Channel producerChannel = producerConnection.createChannel();

        // 先声明交换机、优先级队列和绑定关系
        declarePriorityInfrastructure(producerChannel, 10);

        // 先发送几条不同优先级的消息
        // 数字越大,优先级越高
        sendMessage(producerChannel, "普通消息-优先级1", 1);
        sendMessage(producerChannel, "重要消息-优先级8", 8);
        sendMessage(producerChannel, "中等消息-优先级5", 5);
        sendMessage(producerChannel, "最重要消息-优先级10", 10);
        sendMessage(producerChannel, "普通消息-优先级2", 2);

        // 再启动消费者,这样更容易观察优先级效果
        Connection consumerConnection = factory.newConnection("priority-consumer-connection");
        Channel consumerChannel = consumerConnection.createChannel();

        // 再声明一次也可以,但参数必须完全一致
        declarePriorityInfrastructure(consumerChannel, 10);

        // 启动消费者
        startConsumer(consumerChannel);

        // 主线程等待一段时间,避免程序过早退出
        Thread.sleep(15000);

        // 关闭资源
        producerChannel.close();
        producerConnection.close();

        consumerChannel.close();
        consumerConnection.close();

        System.out.println("程序结束");
    }

    /**
     * 声明交换机、优先级队列、绑定关系
     * maxPriority 表示该队列支持的最大优先级
     */
    private static void declarePriorityInfrastructure(Channel channel, int maxPriority) throws Exception {
        // 1) 声明 Direct 交换机,durable=true 表示持久化
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);

        // 2) 给队列增加优先级参数
        // x-max-priority 是优先级队列的关键参数
        Map<String, Object> args = new HashMap<>();
        args.put("x-max-priority", maxPriority);

        // 3) 声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, args);

        // 4) 将交换机和队列通过路由键绑定起来
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        System.out.println("优先级队列及绑定关系声明成功");
    }

    /**
     * 发送一条带优先级的消息
     */
    private static void sendMessage(Channel channel, String msg, int priority) throws Exception {
        // deliveryMode=2 表示消息持久化
        // priority(priority) 表示设置消息优先级
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)
                .priority(priority)
                .build();

        channel.basicPublish(
                EXCHANGE_NAME,
                ROUTING_KEY,
                props,
                msg.getBytes(StandardCharsets.UTF_8)
        );

        System.out.println("生产者发送消息: " + msg + ",priority = " + priority);
    }

    /**
     * 启动消费者并使用手动确认
     */
    private static void startConsumer(Channel channel) throws Exception {
        // basicQos(1) 表示一次只取 1 条未确认消息
        // 这样更容易观察优先级队列的效果
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String data = new String(delivery.getBody(), StandardCharsets.UTF_8);
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();

            Integer priority = delivery.getProperties().getPriority();
            if (priority == null) {
                priority = 0;
            }

            try {
                System.out.println("消费者收到消息: " + data + ",priority = " + priority);

                // 模拟业务处理
                Thread.sleep(500);

                // 手动确认
                channel.basicAck(deliveryTag, false);
                System.out.println("消息确认成功,deliveryTag = " + deliveryTag);

            } catch (Exception e) {
                System.out.println("消费失败: " + e.getMessage());

                // 失败后重新入队
                channel.basicNack(deliveryTag, false, true);
            }
        };

        CancelCallback cancelCallback = consumerTag ->
                System.out.println("消费者被取消: " + consumerTag);

        // autoAck=false,表示关闭自动确认,改为手动确认
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);

        System.out.println("消费者启动成功,正在监听队列: " + QUEUE_NAME);
    }
}

四、代码逻辑说明

1. 为什么要设置 x-max-priority?

因为优先级队列不是 RabbitMQ 的默认行为。只有在声明队列时加上该参数,RabbitMQ 才会把这个队列按“优先级队列”处理。

2. 为什么发送消息时还要设置 priority?

队列声明只是告诉 RabbitMQ“这个队列支持优先级”;真正哪条消息优先级高,要靠生产者在 BasicProperties 中逐条写入。

3. 为什么示例里先发消息,再启动消费者?

这样消息会先堆在队列里,消费者启动后更容易看到高优先级消息先被取走。如果消费者一直在线,很多消息会直接投递给消费者,优先级效果不一定明显。

4. 为什么还要加 basicQos(1)?

因为 prefetch 太大时,低优先级消息可能已经提前发给消费者了。设置为 1,能让队列每次只放出一条未确认消息,更方便观察优先级顺序。

5. 为什么还保留 basicAck()?

因为手动确认更稳妥。只有业务处理成功,才告诉 RabbitMQ 这条消息可以删掉;如果处理失败,可以 basicNack() 让消息重新入队。

五、运行步骤

1. 先启动本地 RabbitMQ 服务和管理界面。

2. 在普通 Maven 项目中加入 amqp-client 依赖。

3. 把上述 PriorityQueueRawDemo.java 放到 src/main/java 中。

4. 运行 main() 方法。

5. 打开 RabbitMQ 管理界面,检查 queue.priority.test 是否已创建。

6. 观察控制台输出,通常会优先看到 priority 更高的消息先被消费。

六、预期现象

控制台输出示例:

生产者发送消息: 普通消息-优先级1,priority = 1
生产者发送消息: 重要消息-优先级8,priority = 8
生产者发送消息: 中等消息-优先级5,priority = 5
生产者发送消息: 最重要消息-优先级10,priority = 10
生产者发送消息: 普通消息-优先级2,priority = 2

消费者启动成功,正在监听队列: queue.priority.test
消费者收到消息: 最重要消息-优先级10,priority = 10
消费者收到消息: 重要消息-优先级8,priority = 8
消费者收到消息: 中等消息-优先级5,priority = 5
消费者收到消息: 普通消息-优先级2,priority = 2
消费者收到消息: 普通消息-优先级1,priority = 1

七、常见问题

问题 1:明明设置了 priority,为什么消费顺序没变化?

通常是因为消费者太早启动,消息一进队列就被投递了;或者 prefetch 太大,低优先级消息已经提前被消费者拿走。

问题 2:为什么我设置了 x-max-priority 还是报错?

常见原因是之前已经存在同名普通队列,参数不一致会触发 PRECONDITION_FAILED。删除旧队列或更换新队列名即可。

问题 3:priority 能不能无限大?

不建议。实验和业务里一般用少量优先级档位就够了,比如 3 档、4 档或 10 档。

问题 4:优先级队列是不是一定比普通队列好?

不一定。优先级会增加复杂度和资源开销,只有确实需要“紧急消息先处理”时才值得使用。

八、总结

优先级队列的实验重点其实就两句:一是队列声明时加 x-max-priority,二是消息发送时写 priority。

只要把这两个点配对起来,再配合较小的 prefetch 和手动确认,就能比较清楚地验证优先级消息先消费的效果。

RabbitMQ 延迟消息原生 Java 测试代码(注解版)

适用于课程实验、运行演示与答辩说明;本示例不依赖 Spring Boot。

实验主题
基于插件的延迟消息测试

实现方式
RabbitMQ 原生 Java Client

核心验证点
延迟投递 + 手动 ACK

一、实验目的

  • 掌握基于 RabbitMQ 延迟交换机插件实现延迟消息的基本方法。
  • 理解 x-delayed-message 交换机、x-delayed-type 参数和 x-delay 消息头之间的关系。
  • 使用 RabbitMQ 原生 Java Client 完成交换机声明、消息发送、消息消费与手动确认。
  • 通过 10 秒延迟投递实验,验证消息不是立刻消费,而是在设定时间后进入消费者。

二、前置条件

  • 本机已安装并启动 RabbitMQ 服务。
  • RabbitMQ 已启用 delayed message exchange 插件,否则 x-delayed-message 类型交换机无法声明成功。
  • Java 环境可正常运行,能够连接到 127.0.0.1:5672。

说明

  • 本例采用原生 Java 写法,所以不使用 RabbitTemplate,也不使用 @RabbitListener。
  • 延迟能力并不是由队列本身决定,而是由延迟交换机根据消息头中的 x-delay 值控制投递时机。

三、完整测试代码

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class DelayMessageRawJavaDemo {

    // RabbitMQ 连接信息:根据本机实际情况修改
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 5672;
    private static final String USERNAME = "guest";
    private static final String PASSWORD = "guest";

    // 交换机、队列、路由键名称:三者要保持一致
    private static final String EXCHANGE_NAME = "exchange.delay.video";
    private static final String QUEUE_NAME = "queue.delay.video";
    private static final String ROUTING_KEY = "routing.key.delay.video";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);

        // 创建一个连接,并在同一个连接下分别创建生产者和消费者通道
        Connection connection = factory.newConnection("delay-message-demo");
        Channel consumerChannel = connection.createChannel();
        Channel producerChannel = connection.createChannel();

        // 1. 声明延迟交换机、队列和绑定关系
        declareDelayInfrastructure(consumerChannel);

        // 2. 启动消费者,准备接收延迟投递后的消息
        startConsumer(consumerChannel);

        // 3. 发送一条延迟 10 秒的测试消息
        sendDelayMessage(producerChannel, 10000);

        // 4. 主线程等待,便于观察消息何时真正被消费
        Thread.sleep(15000);

        // 5. 关闭资源
        producerChannel.close();
        consumerChannel.close();
        connection.close();
    }

    /**
     * 声明延迟交换机、普通持久化队列和绑定关系
     */
    private static void declareDelayInfrastructure(Channel channel) throws Exception {
        // x-delayed-type 指定延迟交换机底层按哪种路由类型工作,这里选择 direct
        Map<String, Object> exchangeArgs = new HashMap<>();
        exchangeArgs.put("x-delayed-type", "direct");

        // 声明延迟交换机:交换机类型必须是 x-delayed-message
        channel.exchangeDeclare(
                EXCHANGE_NAME,
                "x-delayed-message",
                true,
                false,
                exchangeArgs
        );

        // 这里声明的是普通持久化队列,延迟不是队列特性,而是交换机特性
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 绑定交换机和队列,让消息能通过路由键进入目标队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        System.out.println("延迟交换机、队列、绑定关系声明成功");
    }

    /**
     * 发送延迟消息
     * @param delayMillis 延迟时间,单位毫秒
     */
    private static void sendDelayMessage(Channel channel, int delayMillis) throws Exception {
        String now = new SimpleDateFormat("HH:mm:ss").format(new Date());
        String body = "测试基于插件的延迟消息 [" + now + "]";

        // 在消息头中设置 x-delay,表示消息应当延迟多少毫秒后再投递
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-delay", delayMillis);

        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .contentType("text/plain")
                .deliveryMode(2) // 2 表示持久化消息
                .headers(headers)
                .build();

        channel.basicPublish(
                EXCHANGE_NAME,
                ROUTING_KEY,
                props,
                body.getBytes(StandardCharsets.UTF_8)
        );

        System.out.println("[生产者] 发送时间: " + now);
        System.out.println("[生产者] 消息内容: " + body);
        System.out.println("[生产者] 延迟毫秒: " + delayMillis);
    }

    /**
     * 启动消费者,关闭自动确认,改为手动 ACK
     */
    private static void startConsumer(Channel channel) throws Exception {
        // basicQos(1) 表示每次尽量只取 1 条,便于演示和观察
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            long tag = delivery.getEnvelope().getDeliveryTag();
            String data = new String(delivery.getBody(), StandardCharsets.UTF_8);
            String now = new SimpleDateFormat("HH:mm:ss").format(new Date());

            try {
                System.out.println("[消费者] 接收时间: " + now);
                System.out.println("[消费者] 收到消息: " + data);

                // 消费成功后,手动确认
                channel.basicAck(tag, false);
                System.out.println("[消费者] ACK 成功: " + tag);
            } catch (Exception e) {
                System.out.println("[消费者] 处理失败: " + e.getMessage());

                // 处理失败时,拒绝当前消息并重新入队
                channel.basicNack(tag, false, true);
            }
        };

        CancelCallback cancelCallback = consumerTag ->
                System.out.println("消费者被取消: " + consumerTag);

        // autoAck=false,表示关闭自动确认,启用手动确认
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
        System.out.println("消费者启动成功,监听队列: " + QUEUE_NAME);
    }
}

四、代码逻辑说明

1. 为什么要声明 x-delayed-message 交换机?

因为这里实现的是“基于插件的延迟消息”,真正控制延迟行为的是交换机,不是普通队列。只要交换机类型声明为 x-delayed-message,并且消息头里带有 x-delay,就可以实现指定毫秒数的延迟投递。

2. x-delayed-type = direct 是什么意思?

它表示这个延迟交换机在路由规则上按 direct 交换机工作,也就是消息会根据精确匹配的 routing key 被投递到绑定队列。这里使用 routing.key.delay.video,把消息路由到 queue.delay.video。

3. 为什么队列还是普通队列?

因为延迟能力不是队列自带的。队列在这里主要负责存储等待消费的消息;真正决定“何时把消息交给队列”的,是延迟交换机。换句话说,消息先在交换机侧等待,时间到了才会投递到普通队列中。

4. 为什么要手动 ACK?

手动确认能够让消费过程更可控。消费者只有在真正处理成功后才调用 basicAck;如果处理中出现异常,就可以 basicNack 并选择重新入队,避免消息在业务尚未完成时被误判为已消费。

五、运行步骤

1. 启动 RabbitMQ,并确认 delayed message 插件已启用。

2. 编译并运行 DelayMessageRawJavaDemo。

3. 观察控制台:程序启动后先打印生产者发送时间。

4. 等待约 10 秒,再观察消费者接收时间。

5. 如果消费者收到消息的时间晚于发送时间约 10 秒,说明延迟消息测试成功。

六、预期输出示例

延迟交换机、队列、绑定关系声明成功
消费者启动成功,监听队列: queue.delay.video
[生产者] 发送时间: 10:20:15
[生产者] 消息内容: 测试基于插件的延迟消息 [10:20:15]
[生产者] 延迟毫秒: 10000

......约 10 秒后......

[消费者] 接收时间: 10:20:25
[消费者] 收到消息: 测试基于插件的延迟消息 [10:20:15]
[消费者] ACK 成功: 1

七、常见问题与排查

  • 报错“unknown exchange type x-delayed-message”:通常说明插件没有安装或没有启用。
  • 消息立即被消费,没有延迟:检查消息头中是否正确设置了 x-delay,单位是否是毫秒。
  • 消息发出后消费者收不到:检查交换机名、队列名、路由键是否完全一致。
  • 手动 ACK 报错:检查 basicConsume 的 autoAck 是否设置为 false。

八、实验结论

本实验使用 RabbitMQ 原生 Java Client,完成了延迟交换机声明、消息发送、延迟投递、消费者接收与手动确认的完整闭环。实验表明:只要交换机声明为 x-delayed-message,并通过 x-delay 指定延迟时间,就可以实现基于插件的延迟消息测试。

RabbitMQ 惰性队列原生 Java 测试代码

适用于 RabbitMQ Java Client;含版本兼容说明、完整代码、逐段注释、运行步骤与常见问题

本说明给出一个“不使用 Spring Boot、只使用 RabbitMQ 原生 Java Client”的惰性队列测试示例。代码包含交换机声明、惰性队列声明、绑定、生产、消费、手动确认等完整流程,并附上中文注解,便于直接复制到实验报告或课程作业中。

版本兼容提示:从 RabbitMQ 3.12 开始,经典队列上的 x-queue-mode=lazy 参数会被忽略;经典队列默认就更接近过去“惰性队列”的行为。也就是说,这份代码在新版本中仍可运行,但“惰性模式”这个开关本身未必还生效。若实验老师要求演示旧版惰性队列写法,这份代码是标准历史写法;若使用新版本 RabbitMQ,请把它理解为“经典队列 + 历史惰性参数示例”。

一、实验目标

  • 理解惰性队列的用途:尽量把消息落盘,减少内存占用,适合消息堆积较多的场景。
  • 掌握 RabbitMQ 原生 Java Client 的基本用法,而不是依赖 Spring Boot 或 Spring AMQP。
  • 掌握交换机、经典队列、路由键、消息发送、消息消费和手动确认的完整流程。
  • 理解当前 RabbitMQ 新版本与旧版惰性队列行为之间的差异。

二、核心思路

这份示例的逻辑很简单:先连接 RabbitMQ,再声明 Direct 交换机,然后声明一个经典队列,并在参数中加入 x-queue-mode=lazy;接着用路由键把交换机与队列绑定;随后启动消费者并关闭自动确认,改为手动 basicAck();最后由生产者连续发送几条测试消息,观察消费端是否成功收到。

如果 RabbitMQ 版本较老(例如 3.11 及更早版本),这个参数会让经典队列按照“惰性队列”方式运行;如果 RabbitMQ 版本较新(3.12 及之后),参数会被忽略,但整个经典队列的收发测试流程仍然成立。

三、Maven 依赖

<dependencies>
    <!-- RabbitMQ 原生 Java 客户端 -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.20.0</version>
    </dependency>

    <!-- 日志实现,可选;不用也能运行 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.13</version>
    </dependency>
</dependencies>

说明:这里只用了 RabbitMQ 原生客户端依赖,没有使用 Spring Boot,也没有使用 RabbitTemplate 或 @RabbitListener。

四、完整代码(含中文注解)

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.MessageProperties;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/**
 * 惰性队列测试示例(原生 Java Client 版)
 *
 * 说明:
 * 1. 这里不使用 Spring Boot。
 * 2. 这里演示的是“经典队列 + x-queue-mode=lazy”的历史写法。
 * 3. RabbitMQ 3.12+ 会忽略 x-queue-mode=lazy,但代码仍可正常演示收发流程。
 */
public class LazyQueueRawDemo {

    // RabbitMQ 服务端连接信息
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 5672;
    private static final String USERNAME = "guest";
    private static final String PASSWORD = "guest";

    // 交换机、队列、路由键名称
    private static final String EXCHANGE_NAME = "exchange.lazy.test";
    private static final String QUEUE_NAME = "queue.lazy.test";
    private static final String ROUTING_KEY = "routing.key.lazy.test";

    public static void main(String[] args) throws Exception {

        // 1)创建连接工厂,并设置 RabbitMQ 的连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);

        // 2)先建立一个连接给消费者使用
        Connection consumerConnection = factory.newConnection("lazy-consumer-connection");
        Channel consumerChannel = consumerConnection.createChannel();

        // 3)声明交换机、惰性队列、绑定关系
        declareLazyInfrastructure(consumerChannel);

        // 4)启动消费者
        startConsumer(consumerChannel);

        // 5)再建立一个连接给生产者使用
        Connection producerConnection = factory.newConnection("lazy-producer-connection");
        Channel producerChannel = producerConnection.createChannel();

        // 生产者这边也声明一次基础设施,确保对象存在
        // 注意:如果队列已存在,那么参数必须与第一次声明完全一致
        declareLazyInfrastructure(producerChannel);

        // 6)发送几条测试消息
        for (int i = 1; i <= 5; i++) {
            String msg = "message test lazy ~~~ #" + i;

            // basicPublish:向指定交换机 + 路由键发送消息
            // MessageProperties.PERSISTENT_TEXT_PLAIN:表示持久化文本消息
            producerChannel.basicPublish(
                    EXCHANGE_NAME,
                    ROUTING_KEY,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    msg.getBytes(StandardCharsets.UTF_8)
            );

            System.out.println("生产者发送消息: " + msg);
            Thread.sleep(1000);
        }

        // 7)留一点时间给消费者处理消息
        Thread.sleep(10000);

        // 8)关闭资源
        producerChannel.close();
        producerConnection.close();

        consumerChannel.close();
        consumerConnection.close();

        System.out.println("程序结束");
    }

    /**
     * 声明交换机、惰性队列、绑定关系
     */
    private static void declareLazyInfrastructure(Channel channel) throws Exception {

        // 声明 Direct 交换机,durable=true 表示交换机持久化
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);

        // 创建一个参数集合,用于声明“惰性队列”
        Map<String, Object> args = new HashMap<>();

        // 关键参数:x-queue-mode = lazy
        // 说明:
        // - 在 RabbitMQ 3.11 及更早版本中,这会让经典队列按惰性模式工作
        // - 在 RabbitMQ 3.12+ 中,这个参数会被忽略
        args.put("x-queue-mode", "lazy");

        // 声明队列:
        // queue:队列名
        // durable=true:队列持久化
        // exclusive=false:不独占
        // autoDelete=false:不自动删除
        // args:附加参数,这里放惰性队列参数
        channel.queueDeclare(QUEUE_NAME, true, false, false, args);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        System.out.println("交换机、惰性队列、绑定关系声明成功");
    }

    /**
     * 启动消费者:关闭自动确认,改为手动确认
     */
    private static void startConsumer(Channel channel) throws Exception {

        // 每次最多先取 1 条,方便观察消费过程
        channel.basicQos(1);

        // 收到消息后的回调逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String data = new String(delivery.getBody(), StandardCharsets.UTF_8);
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();

            try {
                System.out.println("消费者收到消息: " + data);

                // 模拟业务处理时间
                Thread.sleep(500);

                // 手动确认:告诉 RabbitMQ,这条消息已经处理成功
                channel.basicAck(deliveryTag, false);
                System.out.println("消息手动确认成功,deliveryTag = " + deliveryTag);

            } catch (Exception e) {
                System.out.println("消费失败: " + e.getMessage());

                // basicNack(..., true):处理失败后重新入队
                channel.basicNack(deliveryTag, false, true);
            }
        };

        // 消费者被取消时的回调
        CancelCallback cancelCallback = consumerTag ->
                System.out.println("消费者被取消: " + consumerTag);

        // autoAck=false:关闭自动确认,改为手动确认
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);

        System.out.println("消费者启动成功,正在监听队列: " + QUEUE_NAME);
    }
}

五、代码逻辑说明

  • declareLazyInfrastructure(channel):负责声明交换机、声明经典队列、写入 x-queue-mode=lazy 参数,并完成绑定。
  • startConsumer(channel):负责启动消费者,关闭自动确认,并在消费成功后手动 basicAck()。
  • main():负责串联整个流程,包括建立连接、启动消费者、发送测试消息、等待消费、关闭资源。
  • MessageProperties.PERSISTENT_TEXT_PLAIN:让消息以持久化方式发送,更符合实验场景。
  • basicQos(1):让消费者一次只取一条,便于你观察消息处理顺序和确认动作。

六、运行步骤

  • 先确保本地 RabbitMQ 服务已经启动,默认端口为 5672。
  • 创建一个普通 Java 项目,导入 amqp-client 依赖。
  • 把上面的代码保存为 LazyQueueRawDemo.java。
  • 运行 main() 方法。
  • 观察控制台是否打印:交换机、惰性队列、绑定关系声明成功;消费者启动成功;生产者发送消息;消费者收到消息;消息手动确认成功。
  • 打开 RabbitMQ 管理界面,查看 queue.lazy.test 是否已被创建。

七、你在管理界面里应该看到什么

如果你的 RabbitMQ 版本较老,那么 queue.lazy.test 会是一个经典队列,并带有 lazy 模式属性。

如果你的 RabbitMQ 版本是 3.12 或更新版本,x-queue-mode 参数虽然可能被忽略,但 queue.lazy.test 仍会作为经典队列正常存在,消息也仍然可以正常发送与消费。

八、常见问题

问题 1:队列参数冲突。如果之前已经存在同名队列,并且它不是用同样参数声明的,那么再次运行时可能报 PRECONDITION_FAILED。解决方法是删除旧队列,或换一个新的队列名。

问题 2:管理界面里看不到“lazy”字样。很可能是 RabbitMQ 版本较新。从 3.12 起,经典队列默认行为已经接近过去的惰性队列,x-queue-mode=lazy 参数会被忽略。

问题 3:消费者收不到消息。一般是交换机名、队列名、路由键三者不一致,或者 RabbitMQ 服务没有启动。

问题 4:basicAck() 报错。通常是因为 autoAck 没有关闭。这个示例里 basicConsume(QUEUE_NAME, false, ...) 已经把自动确认关闭了。

九、可直接放进实验报告的结论

本实验基于 RabbitMQ 原生 Java Client 实现了惰性队列测试流程,包括交换机声明、队列声明、消息发送、消息消费与手动确认。通过在队列参数中设置 x-queue-mode=lazy,可以在旧版本 RabbitMQ 中创建惰性队列;在新版本 RabbitMQ 中,该参数可能被忽略,但经典队列依然具有接近过去惰性队列的行为特征。实验结果表明,使用 RabbitMQ 原生 Java 客户端也可以完成 RabbitMQ 队列的完整创建与收发测试,不依赖 Spring Boot 同样能够验证消息队列的核心机制。

RabbitMQ 死信队列
完整代码与测试文档

基于 Spring Boot 3 / Java 17 / RabbitMQ

文档目的

提供一套可直接运行的 RabbitMQ 死信队列示例代码,并给出配置、发送、消费与测试步骤。

覆盖场景

1)消费者拒绝消息进入死信;2)消息 TTL 到期进入死信。

测试方式

通过两个 HTTP 接口分别发送消息,再从控制台日志和 x-death 头信息确认死信原因。

适用内容

课程实验、实训报告、项目演示、面试准备、内部技术分享。

说明:本示例故意不使用“消费者不 ACK 模拟超时进入死信”作为测试方式。官方更标准、也更稳定的死信演示方式是:显式拒绝且不重新入队,或使用消息/队列 TTL 过期。

1. 项目概述

本项目提供一套完整的 Spring Boot + RabbitMQ 示例,用于演示死信队列(DLQ,Dead Letter Queue)的基本配置、消息发送、正常消费失败转死信,以及死信队列消费处理。

为了让测试结果更直观,示例中保留了两个独立场景:一个队列专门用于“消费者拒绝消息后进入死信”,另一个队列专门用于“消息在队列中等待超过 TTL 后进入死信”。

  • 运行环境:Java 17、Spring Boot 3.2.x、RabbitMQ 3.x。
  • 管理方式:使用 Spring AMQP 自动声明交换机、队列和绑定关系。
  • 验证方式:查看控制台日志中的消息流转过程与 x-death 头信息。

2. 项目结构

建议使用如下目录结构组织代码:

rabbitmq-dlq-demo
├─ pom.xml
├─ src/main/resources
│  └─ application.yml
└─ src/main/java/com/example/rabbitdlq
   ├─ RabbitDlqApplication.java
   ├─ config/RabbitMQConfig.java
   ├─ controller/TestProducerController.java
   ├─ service/MessageProducerService.java
   └─ consumer
      ├─ NormalRejectConsumer.java
      └─ DeadLetterConsumer.java

3. 完整代码

下面给出每个文件的完整代码。将这些文件按目录放入项目后,即可直接启动并测试。

3.1 pom.xml

Maven 依赖配置,包含 Web、RabbitMQ 与测试依赖。

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.5</version>
        <relativePath/>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>rabbitmq-dlq-demo</artifactId>
    <version>1.0.0</version>
    <name>rabbitmq-dlq-demo</name>
    <description>RabbitMQ Dead Letter Queue Demo</description>

    <properties>
        <java.version>17</java.version>
    </properties>

    <dependencies>
        <!-- Web,用于提供测试接口 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- RabbitMQ 核心依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- 测试依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Spring Boot 打包插件 -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

3.2 src/main/resources/application.yml

RabbitMQ 连接配置与基础日志级别。

server:
  port: 8080

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

logging:
  level:
    root: info

3.3 src/main/java/com/example/rabbitdlq/RabbitDlqApplication.java

项目启动类。

package com.example.rabbitdlq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;

/**
 * 项目启动类
 */
@EnableRabbit
@SpringBootApplication
public class RabbitDlqApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitDlqApplication.class, args);
    }
}

3.4 src/main/java/com/example/rabbitdlq/config/RabbitMQConfig.java

RabbitMQ 核心配置:正常交换机、正常队列、死信交换机、死信队列、绑定关系以及手动 ACK 容器工厂。

package com.example.rabbitdlq.config;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置类
 *
 * 说明:
 * 1. 定义业务交换机
 * 2. 定义两个正常队列:
 *    - 一个用于“拒绝消息后进入死信”
 *    - 一个用于“TTL 到期后进入死信”
 * 3. 定义死信交换机和死信队列
 * 4. 定义手动 ACK 的监听容器工厂
 */
@Configuration
public class RabbitMQConfig {

    // 1. 业务交换机 / 路由键
    public static final String BIZ_EXCHANGE = "exchange.biz.video";
    public static final String ROUTING_KEY_REJECT = "routing.key.video.reject";
    public static final String ROUTING_KEY_TTL = "routing.key.video.ttl";

    // 2. 正常队列
    public static final String NORMAL_REJECT_QUEUE = "queue.normal.video.reject";
    public static final String NORMAL_TTL_QUEUE = "queue.normal.video.ttl";

    // 3. 死信交换机 / 队列 / 路由键
    public static final String DEAD_EXCHANGE = "exchange.dead.letter.video";
    public static final String DEAD_QUEUE = "queue.dead.letter.video";
    public static final String DEAD_ROUTING_KEY = "routing.key.dead.letter.video";

    /**
     * 业务交换机
     */
    @Bean
    public DirectExchange bizExchange() {
        return new DirectExchange(BIZ_EXCHANGE, true, false);
    }

    /**
     * 死信交换机
     */
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE, true, false);
    }

    /**
     * 正常队列 1:用于测试“消费者拒绝后进入死信”
     */
    @Bean
    public Queue normalRejectQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        return new Queue(NORMAL_REJECT_QUEUE, true, false, false, args);
    }

    /**
     * 正常队列 2:用于测试“消息 TTL 到期后进入死信”
     *
     * x-message-ttl:消息在该队列中的最大存活时间,单位毫秒
     */
    @Bean
    public Queue normalTtlQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        args.put("x-message-ttl", 10000); // 10 秒后过期
        return new Queue(NORMAL_TTL_QUEUE, true, false, false, args);
    }

    /**
     * 死信队列
     */
    @Bean
    public Queue deadQueue() {
        return new Queue(DEAD_QUEUE, true);
    }

    /**
     * 绑定:正常拒绝队列 -> 业务交换机
     */
    @Bean
    public Binding bindRejectQueue() {
        return BindingBuilder.bind(normalRejectQueue())
                .to(bizExchange())
                .with(ROUTING_KEY_REJECT);
    }

    /**
     * 绑定:正常 TTL 队列 -> 业务交换机
     */
    @Bean
    public Binding bindTtlQueue() {
        return BindingBuilder.bind(normalTtlQueue())
                .to(bizExchange())
                .with(ROUTING_KEY_TTL);
    }

    /**
     * 绑定:死信队列 -> 死信交换机
     */
    @Bean
    public Binding bindDeadQueue() {
        return BindingBuilder.bind(deadQueue())
                .to(deadExchange())
                .with(DEAD_ROUTING_KEY);
    }

    /**
     * 手动 ACK 的监听器工厂
     */
    @Bean("manualAckContainerFactory")
    public SimpleRabbitListenerContainerFactory manualAckContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            ConnectionFactory connectionFactory) {

        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setPrefetchCount(1);
        return factory;
    }
}

3.5 src/main/java/com/example/rabbitdlq/service/MessageProducerService.java

消息发送服务,分别投递到两个测试场景对应的正常队列。

package com.example.rabbitdlq.service;

import com.example.rabbitdlq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

/**
 * 消息生产者
 */
@Service
public class MessageProducerService {

    private final RabbitTemplate rabbitTemplate;

    public MessageProducerService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /**
     * 发送消息到“拒绝测试队列”
     */
    public void sendRejectMessage(String message) {
        rabbitTemplate.convertAndSend(
                RabbitMQConfig.BIZ_EXCHANGE,
                RabbitMQConfig.ROUTING_KEY_REJECT,
                message
        );
    }

    /**
     * 发送消息到“TTL 测试队列”
     */
    public void sendTtlMessage(String message) {
        rabbitTemplate.convertAndSend(
                RabbitMQConfig.BIZ_EXCHANGE,
                RabbitMQConfig.ROUTING_KEY_TTL,
                message
        );
    }
}

3.6 src/main/java/com/example/rabbitdlq/controller/TestProducerController.java

测试接口:一个用于拒绝测试,一个用于 TTL 过期测试。

package com.example.rabbitdlq.controller;

import com.example.rabbitdlq.service.MessageProducerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 提供两个 HTTP 接口,便于直接测试
 */
@RestController
public class TestProducerController {

    private final MessageProducerService producerService;

    public TestProducerController(MessageProducerService producerService) {
        this.producerService = producerService;
    }

    /**
     * 测试 1:发送到“拒绝测试队列”
     * 示例:/mq/send/reject?msg=hello-reject
     */
    @GetMapping("/mq/send/reject")
    public String sendReject(@RequestParam(defaultValue = "hello-reject") String msg) {
        producerService.sendRejectMessage(msg);
        return "已发送到拒绝测试队列:" + msg;
    }

    /**
     * 测试 2:发送到“TTL 测试队列”
     * 示例:/mq/send/ttl?msg=hello-ttl
     */
    @GetMapping("/mq/send/ttl")
    public String sendTtl(@RequestParam(defaultValue = "hello-ttl") String msg) {
        producerService.sendTtlMessage(msg);
        return "已发送到 TTL 测试队列(10 秒后过期进入死信):" + msg;
    }
}

3.7 src/main/java/com/example/rabbitdlq/consumer/NormalRejectConsumer.java

正常消费者:收到消息后主动拒绝,不重新入队,从而触发死信。

package com.example.rabbitdlq.consumer;

import com.example.rabbitdlq.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 正常消费者(拒绝测试)
 */
@Component
public class NormalRejectConsumer {

    private static final Logger log = LoggerFactory.getLogger(NormalRejectConsumer.class);

    @RabbitListener(
            queues = RabbitMQConfig.NORMAL_REJECT_QUEUE,
            containerFactory = "manualAckContainerFactory"
    )
    public void processMessage(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String body = new String(message.getBody(), StandardCharsets.UTF_8);

        log.info("★ [normal-reject] 收到正常队列消息:{}", body);
        log.info("★ [normal-reject] 故意拒绝消息,不重新入队,进入死信队列");

        // false = 不重新入队,触发死信
        channel.basicReject(deliveryTag, false);
    }
}

3.8 src/main/java/com/example/rabbitdlq/consumer/DeadLetterConsumer.java

死信消费者:统一消费死信消息,并打印 x-death 头信息。

package com.example.rabbitdlq.consumer;

import com.example.rabbitdlq.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 死信消费者
 */
@Component
public class DeadLetterConsumer {

    private static final Logger log = LoggerFactory.getLogger(DeadLetterConsumer.class);

    @RabbitListener(
            queues = RabbitMQConfig.DEAD_QUEUE,
            containerFactory = "manualAckContainerFactory"
    )
    public void processDeadLetter(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String body = new String(message.getBody(), StandardCharsets.UTF_8);
        Map<String, Object> headers = message.getMessageProperties().getHeaders();

        log.info("==============================================");
        log.info("★ [dead-letter] 收到死信消息:{}", body);
        log.info("★ [dead-letter] headers:{}", headers);
        log.info("★ [dead-letter] x-death:{}", headers.get("x-death"));
        log.info("==============================================");

        // 死信队列中的消息正常确认
        channel.basicAck(deliveryTag, false);
    }
}

4. 启动 RabbitMQ

如果本机没有安装 RabbitMQ,可以先用 Docker 方式启动。

docker run -d   --name rabbitmq   -p 5672:5672   -p 15672:15672   rabbitmq:management

管理后台地址:http://localhost:15672,默认用户名和密码均为 guest。

5. 测试步骤

项目启动成功后,可通过如下方式依次测试两个场景。

建议先执行“拒绝测试”,再执行“TTL 测试”,更容易观察消息流转。

序号

测试场景

访问接口

预期结果

1

消费者拒绝消息

GET /mq/send/reject?msg=hello-reject-001

正常消费者收到消息后执行 basicReject(deliveryTag, false),随后死信消费者打印 x-death,其中 reason 通常为 rejected。

2

消息 TTL 到期

GET /mq/send/ttl?msg=hello-ttl-001

消息发送后因为正常队列未设置该场景下的消费者,约 10 秒后转入死信队列,x-death 中的 reason 通常为 expired。

curl "http://localhost:8080/mq/send/reject?msg=hello-reject-001"

curl "http://localhost:8080/mq/send/ttl?msg=hello-ttl-001"

6. 预期日志示例

下面给出两个典型日志片段,便于你在实验报告中说明测试结果。

6.1 拒绝消息进入死信的日志

★ [normal-reject] 收到正常队列消息:hello-reject-001
★ [normal-reject] 故意拒绝消息,不重新入队,进入死信队列

★ [dead-letter] 收到死信消息:hello-reject-001
★ [dead-letter] x-death:[... reason=rejected ...]

6.2 TTL 到期进入死信的日志

★ [dead-letter] 收到死信消息:hello-ttl-001
★ [dead-letter] x-death:[... reason=expired ...]

7. 关键说明

  • 死信队列不是独立类型的队列,而是一种“消息转移结果”:当消息满足死信条件时,会被重新路由到指定的死信交换机。
  • 本示例将死信交换机设置为 direct 类型,便于通过明确的路由键将死信转发到固定队列。
  • TTL 队列测试故意不配置对应的正常消费者,否则消息会立即被消费,无法等待到过期。
  • 在手动 ACK 模式下,监听器要自己调用 basicAck、basicReject 或 basicNack。
  • 查看死信原因时,可以重点观察消息头中的 x-death 字段。

8. 总结

通过本示例,可以完整验证 RabbitMQ 死信队列的常见用法:一是消费者主动拒绝消息并不重新入队,二是消息在正常队列中等待超过 TTL 后过期。二者最终都会流入死信交换机,再进入死信队列,由专门的消费者进行统一处理。

如果后续还需要扩展,可以在此基础上继续加入重试队列、延迟重投、失败原因分类、批量回放以及监控告警等机制。

更多推荐