原生 Java 测试代码
RabbitMQ 仲裁队列原生 Java 测试代码
代码注解版 · 非 Spring Boot 写法
|
适用场景 |
使用 RabbitMQ 原生 Java Client,演示如何在非 Spring Boot 环境下创建仲裁队列、发送消息、消费消息并进行手动确认。 |
|
文档版本 |
代码注解版,可直接拷贝到普通 Maven 项目中测试。 |
|
核心提醒 |
仲裁队列必须显式传入 x-queue-type=quorum;如果是单机环境,x-quorum-initial-group-size 建议写 1。 |
1. 使用前先明确两件事
这份文档使用的是 RabbitMQ 原生 Java Client,不依赖 Spring Boot,也不使用 RabbitTemplate 和 @RabbitListener。所有交换机声明、队列声明、消息发送与消息消费都直接通过 Channel 完成。
|
关键提醒
|
2. Maven 依赖(注解版)
如果你使用普通 Maven 项目,可以先引入 RabbitMQ Java Client 依赖。下面给出一个带注释的依赖片段,便于直接复制到 pom.xml 中。
<dependencies>
<!-- RabbitMQ 官方 Java 客户端,用于创建连接、通道、发送消息和消费消息 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
<!-- 仅用于打印日志,方便观察收发结果;如果你不想加日志框架,也可以直接用 System.out -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.12</version>
</dependency>
</dependencies>
说明:如果你的本地环境已有其他日志实现,可以保留 amqp-client,按需替换 slf4j-simple。
3. 完整 Java 代码(注解版)
下面这份 QuorumQueueRawDemo.java 可以直接运行。代码里已经加入中文注释,目的是让你一眼看懂每一段在做什么。
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.MessageProperties;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class QuorumQueueRawDemo {
// ===== 1. RabbitMQ 连接信息 =====
// 本地测试时,通常使用默认的 guest / guest 账号和 5672 端口。
private static final String HOST = "127.0.0.1";
private static final int PORT = 5672;
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
// ===== 2. 交换机、队列、路由键 =====
// 这三个名字要保持一致,否则消息可能发出去了,但收不到。
private static final String EXCHANGE_NAME = "exchange.quorum.test";
private static final String QUEUE_NAME = "queue.quorum.test";
private static final String ROUTING_KEY = "routing.key.quorum.test";
public static void main(String[] args) throws Exception {
// 3. 创建连接工厂,相当于先准备好“连接 RabbitMQ 的配置模板”。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
// 4. 给消费者单独开一个连接和通道。
// 这么做更清晰,也更符合“生产和消费职责分离”的思路。
Connection consumerConnection = factory.newConnection("quorum-consumer-connection");
Channel consumerChannel = consumerConnection.createChannel();
// 5. 先声明交换机、仲裁队列和绑定关系。
// 单机测试时,仲裁初始副本数写 1 即可。
declareQuorumInfrastructure(consumerChannel, 1);
// 6. 启动消费者,让它开始监听队列。
startConsumer(consumerChannel);
// 7. 再给生产者单独开一个连接和通道。
Connection producerConnection = factory.newConnection("quorum-producer-connection");
Channel producerChannel = producerConnection.createChannel();
// 8. 再声明一次也没问题,只要参数保持完全一致即可。
// RabbitMQ 会认为这是同一套基础设施,不会重复创建冲突资源。
declareQuorumInfrastructure(producerChannel, 1);
// 9. 连续发送 5 条测试消息,观察控制台是否能正常消费。
for (int i = 1; i <= 5; i++) {
String msg = "message test quorum ~~~ #" + i;
producerChannel.basicPublish(
EXCHANGE_NAME,
ROUTING_KEY,
// 把消息声明为持久化文本消息,更符合仲裁队列“可靠消息”的测试场景。
MessageProperties.PERSISTENT_TEXT_PLAIN,
msg.getBytes(StandardCharsets.UTF_8)
);
System.out.println("生产者发送消息: " + msg);
Thread.sleep(1000);
}
// 10. 留一点时间给消费者处理消息,避免主线程过早退出。
Thread.sleep(10000);
// 11. 最后记得关闭通道和连接,避免资源泄露。
producerChannel.close();
producerConnection.close();
consumerChannel.close();
consumerConnection.close();
System.out.println("程序结束");
}
/**
* 声明交换机、仲裁队列和绑定关系。
* @param channel RabbitMQ 通道
* @param initialGroupSize 仲裁初始副本数;单机测试写 1,三节点集群可写 3
*/
private static void declareQuorumInfrastructure(Channel channel, int initialGroupSize) throws Exception {
// 1. 声明 Direct 交换机。
// durable=true 表示交换机持久化,RabbitMQ 重启后仍然存在。
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 2. 准备队列参数。
// 其中 x-queue-type=quorum 是最关键的一行,少了它就不是仲裁队列。
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
// 3. 设置仲裁初始副本数。
// 单机环境写 1;如果你是 3 节点集群,并且想测试高可用,可以改成 3。
args.put("x-quorum-initial-group-size", initialGroupSize);
// 4. 声明队列。
// 第 2 个参数 true 表示队列持久化;后面两个 false 表示非独占、非自动删除。
channel.queueDeclare(QUEUE_NAME, true, false, false, args);
// 5. 把交换机和队列绑定起来。
// 这样发到 exchange.quorum.test 且路由键匹配的消息,才会进入该队列。
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("交换机、仲裁队列、绑定关系声明成功");
}
/**
* 启动消费者,并关闭自动确认,改为手动确认。
*/
private static void startConsumer(Channel channel) throws Exception {
// 每次先取 1 条消息,便于观察“收到一条,处理一条,确认一条”的过程。
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String data = new String(delivery.getBody(), StandardCharsets.UTF_8);
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
try {
System.out.println("消费者收到消息: " + data);
// 模拟业务处理。
Thread.sleep(500);
// 处理成功后,手动 ACK。
// false 表示只确认当前这一条消息,不批量确认。
channel.basicAck(deliveryTag, false);
System.out.println("消息手动确认成功,deliveryTag = " + deliveryTag);
} catch (Exception e) {
System.out.println("消费失败: " + e.getMessage());
// 处理失败时,执行 NACK。
// 最后一个参数 true 表示把消息重新放回队列,后续还可以继续重试。
channel.basicNack(deliveryTag, false, true);
}
};
CancelCallback cancelCallback = consumerTag ->
System.out.println("消费者被取消: " + consumerTag);
// autoAck=false:关闭自动确认。
// 如果这里写 true,那么消息一投递给消费者就会被系统自动确认,
// 这就无法体现“业务处理完成后再确认”的意义了。
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
System.out.println("消费者启动成功,正在监听队列: " + QUEUE_NAME);
}
}
提示:如果你只是做单机实验,这份代码已经够用;如果要验证高可用,再把 initialGroupSize 改为 3,并准备三节点 RabbitMQ 集群。
4. 代码执行逻辑
- 先创建 ConnectionFactory,统一配置 RabbitMQ 的地址、端口和登录信息。
- 消费者连接先启动,并声明交换机、仲裁队列和绑定关系。
- 消费者通过 basicConsume() 监听队列,且把 autoAck 设置为 false,改用手动确认。
- 生产者随后通过 basicPublish() 向交换机发送消息,交换机再按路由键把消息转发到仲裁队列。
- 消费者收到消息后先处理业务,再调用 basicAck() 确认;如果处理失败,则调用 basicNack() 让消息重新入队。
5. 运行步骤
- 先启动本地 RabbitMQ 服务,并确认 5672 端口可用。
- 把文档中的 Java 文件复制到普通 Maven 项目中,同时加入 amqp-client 依赖。
- 执行 main() 方法,观察控制台日志。
- 打开 RabbitMQ 管理界面,确认 queue.quorum.test 的类型为 quorum。
- 当控制台出现“生产者发送消息”“消费者收到消息”“消息手动确认成功”等输出时,说明实验闭环已经跑通。
6. 常见错误与定位方法
- 如果 queueDeclare(..., null) 也能创建队列,但类型显示不是 quorum,说明你少传了 x-queue-type=quorum。
- 如果单机环境把 x-quorum-initial-group-size 写成 3,队列声明可能会失败,因为本地没有足够副本节点。
- 如果生产者显示发送成功,但消费者没有收到,优先检查交换机名、队列名、路由键三者是否完全一致。
- 如果消息已经打印出来,但又重复投递,通常要检查是否没有执行 basicAck(),或者业务代码抛异常后走了 basicNack()。
7. 一句话结论
这份代码的核心不是“能把消息发出去”,而是用 RabbitMQ 原生 Java Client 把仲裁队列、手动确认和可靠消费这一整条链路跑通,并且把关键点都写成了看得懂的中文注释。
RabbitMQ 优先级队列原生 Java 测试代码
注解版(不使用 Spring Boot)
|
文档用途: 用 RabbitMQ 原生 Java Client 演示“优先级队列”的创建、发送、消费与手动确认。本文示例采用经典队列实现多级优先级,方便直接观察不同优先级消息的投递顺序。 |
一、优先级队列的核心思路
• 声明队列时必须设置队列参数 x-max-priority,否则它就是普通队列,不会按优先级投递。
• 生产者发消息时,要在消息属性里写入 priority 值;数字越大,优先级越高。
• 为了更容易观察优先级效果,通常让消息先进入队列,再启动消费者,或者给消费者设置较小的 prefetch 值。
• 本示例使用手动确认 basicAck(),既方便实验,也更接近真实项目里的可靠消费场景。
二、Maven 依赖
pom.xml 依赖片段:
|
<dependency> |
三、完整代码(带中文注解)
PriorityQueueRawDemo.java:
|
import com.rabbitmq.client.AMQP; |
四、代码逻辑说明
1. 为什么要设置 x-max-priority?
因为优先级队列不是 RabbitMQ 的默认行为。只有在声明队列时加上该参数,RabbitMQ 才会把这个队列按“优先级队列”处理。
2. 为什么发送消息时还要设置 priority?
队列声明只是告诉 RabbitMQ“这个队列支持优先级”;真正哪条消息优先级高,要靠生产者在 BasicProperties 中逐条写入。
3. 为什么示例里先发消息,再启动消费者?
这样消息会先堆在队列里,消费者启动后更容易看到高优先级消息先被取走。如果消费者一直在线,很多消息会直接投递给消费者,优先级效果不一定明显。
4. 为什么还要加 basicQos(1)?
因为 prefetch 太大时,低优先级消息可能已经提前发给消费者了。设置为 1,能让队列每次只放出一条未确认消息,更方便观察优先级顺序。
5. 为什么还保留 basicAck()?
因为手动确认更稳妥。只有业务处理成功,才告诉 RabbitMQ 这条消息可以删掉;如果处理失败,可以 basicNack() 让消息重新入队。
五、运行步骤
1. 先启动本地 RabbitMQ 服务和管理界面。
2. 在普通 Maven 项目中加入 amqp-client 依赖。
3. 把上述 PriorityQueueRawDemo.java 放到 src/main/java 中。
4. 运行 main() 方法。
5. 打开 RabbitMQ 管理界面,检查 queue.priority.test 是否已创建。
6. 观察控制台输出,通常会优先看到 priority 更高的消息先被消费。
六、预期现象
控制台输出示例:
|
生产者发送消息: 普通消息-优先级1,priority = 1 |
七、常见问题
问题 1:明明设置了 priority,为什么消费顺序没变化?
通常是因为消费者太早启动,消息一进队列就被投递了;或者 prefetch 太大,低优先级消息已经提前被消费者拿走。
问题 2:为什么我设置了 x-max-priority 还是报错?
常见原因是之前已经存在同名普通队列,参数不一致会触发 PRECONDITION_FAILED。删除旧队列或更换新队列名即可。
问题 3:priority 能不能无限大?
不建议。实验和业务里一般用少量优先级档位就够了,比如 3 档、4 档或 10 档。
问题 4:优先级队列是不是一定比普通队列好?
不一定。优先级会增加复杂度和资源开销,只有确实需要“紧急消息先处理”时才值得使用。
八、总结
优先级队列的实验重点其实就两句:一是队列声明时加 x-max-priority,二是消息发送时写 priority。
只要把这两个点配对起来,再配合较小的 prefetch 和手动确认,就能比较清楚地验证优先级消息先消费的效果。
RabbitMQ 延迟消息原生 Java 测试代码(注解版)
适用于课程实验、运行演示与答辩说明;本示例不依赖 Spring Boot。
|
实验主题 |
实现方式 |
核心验证点 |
一、实验目的
- 掌握基于 RabbitMQ 延迟交换机插件实现延迟消息的基本方法。
- 理解 x-delayed-message 交换机、x-delayed-type 参数和 x-delay 消息头之间的关系。
- 使用 RabbitMQ 原生 Java Client 完成交换机声明、消息发送、消息消费与手动确认。
- 通过 10 秒延迟投递实验,验证消息不是立刻消费,而是在设定时间后进入消费者。
二、前置条件
- 本机已安装并启动 RabbitMQ 服务。
- RabbitMQ 已启用 delayed message exchange 插件,否则 x-delayed-message 类型交换机无法声明成功。
- Java 环境可正常运行,能够连接到 127.0.0.1:5672。
|
说明
|
三、完整测试代码
|
import com.rabbitmq.client.AMQP; |
四、代码逻辑说明
1. 为什么要声明 x-delayed-message 交换机?
因为这里实现的是“基于插件的延迟消息”,真正控制延迟行为的是交换机,不是普通队列。只要交换机类型声明为 x-delayed-message,并且消息头里带有 x-delay,就可以实现指定毫秒数的延迟投递。
2. x-delayed-type = direct 是什么意思?
它表示这个延迟交换机在路由规则上按 direct 交换机工作,也就是消息会根据精确匹配的 routing key 被投递到绑定队列。这里使用 routing.key.delay.video,把消息路由到 queue.delay.video。
3. 为什么队列还是普通队列?
因为延迟能力不是队列自带的。队列在这里主要负责存储等待消费的消息;真正决定“何时把消息交给队列”的,是延迟交换机。换句话说,消息先在交换机侧等待,时间到了才会投递到普通队列中。
4. 为什么要手动 ACK?
手动确认能够让消费过程更可控。消费者只有在真正处理成功后才调用 basicAck;如果处理中出现异常,就可以 basicNack 并选择重新入队,避免消息在业务尚未完成时被误判为已消费。
五、运行步骤
1. 启动 RabbitMQ,并确认 delayed message 插件已启用。
2. 编译并运行 DelayMessageRawJavaDemo。
3. 观察控制台:程序启动后先打印生产者发送时间。
4. 等待约 10 秒,再观察消费者接收时间。
5. 如果消费者收到消息的时间晚于发送时间约 10 秒,说明延迟消息测试成功。
六、预期输出示例
|
延迟交换机、队列、绑定关系声明成功 |
七、常见问题与排查
- 报错“unknown exchange type x-delayed-message”:通常说明插件没有安装或没有启用。
- 消息立即被消费,没有延迟:检查消息头中是否正确设置了 x-delay,单位是否是毫秒。
- 消息发出后消费者收不到:检查交换机名、队列名、路由键是否完全一致。
- 手动 ACK 报错:检查 basicConsume 的 autoAck 是否设置为 false。
八、实验结论
本实验使用 RabbitMQ 原生 Java Client,完成了延迟交换机声明、消息发送、延迟投递、消费者接收与手动确认的完整闭环。实验表明:只要交换机声明为 x-delayed-message,并通过 x-delay 指定延迟时间,就可以实现基于插件的延迟消息测试。
RabbitMQ 惰性队列原生 Java 测试代码
适用于 RabbitMQ Java Client;含版本兼容说明、完整代码、逐段注释、运行步骤与常见问题
本说明给出一个“不使用 Spring Boot、只使用 RabbitMQ 原生 Java Client”的惰性队列测试示例。代码包含交换机声明、惰性队列声明、绑定、生产、消费、手动确认等完整流程,并附上中文注解,便于直接复制到实验报告或课程作业中。
|
版本兼容提示:从 RabbitMQ 3.12 开始,经典队列上的 x-queue-mode=lazy 参数会被忽略;经典队列默认就更接近过去“惰性队列”的行为。也就是说,这份代码在新版本中仍可运行,但“惰性模式”这个开关本身未必还生效。若实验老师要求演示旧版惰性队列写法,这份代码是标准历史写法;若使用新版本 RabbitMQ,请把它理解为“经典队列 + 历史惰性参数示例”。 |
一、实验目标
- 理解惰性队列的用途:尽量把消息落盘,减少内存占用,适合消息堆积较多的场景。
- 掌握 RabbitMQ 原生 Java Client 的基本用法,而不是依赖 Spring Boot 或 Spring AMQP。
- 掌握交换机、经典队列、路由键、消息发送、消息消费和手动确认的完整流程。
- 理解当前 RabbitMQ 新版本与旧版惰性队列行为之间的差异。
二、核心思路
这份示例的逻辑很简单:先连接 RabbitMQ,再声明 Direct 交换机,然后声明一个经典队列,并在参数中加入 x-queue-mode=lazy;接着用路由键把交换机与队列绑定;随后启动消费者并关闭自动确认,改为手动 basicAck();最后由生产者连续发送几条测试消息,观察消费端是否成功收到。
如果 RabbitMQ 版本较老(例如 3.11 及更早版本),这个参数会让经典队列按照“惰性队列”方式运行;如果 RabbitMQ 版本较新(3.12 及之后),参数会被忽略,但整个经典队列的收发测试流程仍然成立。
三、Maven 依赖
|
<dependencies> |
说明:这里只用了 RabbitMQ 原生客户端依赖,没有使用 Spring Boot,也没有使用 RabbitTemplate 或 @RabbitListener。
四、完整代码(含中文注解)
|
import com.rabbitmq.client.BuiltinExchangeType; |
五、代码逻辑说明
- declareLazyInfrastructure(channel):负责声明交换机、声明经典队列、写入 x-queue-mode=lazy 参数,并完成绑定。
- startConsumer(channel):负责启动消费者,关闭自动确认,并在消费成功后手动 basicAck()。
- main():负责串联整个流程,包括建立连接、启动消费者、发送测试消息、等待消费、关闭资源。
- MessageProperties.PERSISTENT_TEXT_PLAIN:让消息以持久化方式发送,更符合实验场景。
- basicQos(1):让消费者一次只取一条,便于你观察消息处理顺序和确认动作。
六、运行步骤
- 先确保本地 RabbitMQ 服务已经启动,默认端口为 5672。
- 创建一个普通 Java 项目,导入 amqp-client 依赖。
- 把上面的代码保存为 LazyQueueRawDemo.java。
- 运行 main() 方法。
- 观察控制台是否打印:交换机、惰性队列、绑定关系声明成功;消费者启动成功;生产者发送消息;消费者收到消息;消息手动确认成功。
- 打开 RabbitMQ 管理界面,查看 queue.lazy.test 是否已被创建。
七、你在管理界面里应该看到什么
如果你的 RabbitMQ 版本较老,那么 queue.lazy.test 会是一个经典队列,并带有 lazy 模式属性。
如果你的 RabbitMQ 版本是 3.12 或更新版本,x-queue-mode 参数虽然可能被忽略,但 queue.lazy.test 仍会作为经典队列正常存在,消息也仍然可以正常发送与消费。
八、常见问题
问题 1:队列参数冲突。如果之前已经存在同名队列,并且它不是用同样参数声明的,那么再次运行时可能报 PRECONDITION_FAILED。解决方法是删除旧队列,或换一个新的队列名。
问题 2:管理界面里看不到“lazy”字样。很可能是 RabbitMQ 版本较新。从 3.12 起,经典队列默认行为已经接近过去的惰性队列,x-queue-mode=lazy 参数会被忽略。
问题 3:消费者收不到消息。一般是交换机名、队列名、路由键三者不一致,或者 RabbitMQ 服务没有启动。
问题 4:basicAck() 报错。通常是因为 autoAck 没有关闭。这个示例里 basicConsume(QUEUE_NAME, false, ...) 已经把自动确认关闭了。
九、可直接放进实验报告的结论
本实验基于 RabbitMQ 原生 Java Client 实现了惰性队列测试流程,包括交换机声明、队列声明、消息发送、消息消费与手动确认。通过在队列参数中设置 x-queue-mode=lazy,可以在旧版本 RabbitMQ 中创建惰性队列;在新版本 RabbitMQ 中,该参数可能被忽略,但经典队列依然具有接近过去惰性队列的行为特征。实验结果表明,使用 RabbitMQ 原生 Java 客户端也可以完成 RabbitMQ 队列的完整创建与收发测试,不依赖 Spring Boot 同样能够验证消息队列的核心机制。
RabbitMQ 死信队列
完整代码与测试文档
基于 Spring Boot 3 / Java 17 / RabbitMQ
|
文档目的 |
提供一套可直接运行的 RabbitMQ 死信队列示例代码,并给出配置、发送、消费与测试步骤。 |
|
覆盖场景 |
1)消费者拒绝消息进入死信;2)消息 TTL 到期进入死信。 |
|
测试方式 |
通过两个 HTTP 接口分别发送消息,再从控制台日志和 x-death 头信息确认死信原因。 |
|
适用内容 |
课程实验、实训报告、项目演示、面试准备、内部技术分享。 |
说明:本示例故意不使用“消费者不 ACK 模拟超时进入死信”作为测试方式。官方更标准、也更稳定的死信演示方式是:显式拒绝且不重新入队,或使用消息/队列 TTL 过期。
1. 项目概述
本项目提供一套完整的 Spring Boot + RabbitMQ 示例,用于演示死信队列(DLQ,Dead Letter Queue)的基本配置、消息发送、正常消费失败转死信,以及死信队列消费处理。
为了让测试结果更直观,示例中保留了两个独立场景:一个队列专门用于“消费者拒绝消息后进入死信”,另一个队列专门用于“消息在队列中等待超过 TTL 后进入死信”。
- 运行环境:Java 17、Spring Boot 3.2.x、RabbitMQ 3.x。
- 管理方式:使用 Spring AMQP 自动声明交换机、队列和绑定关系。
- 验证方式:查看控制台日志中的消息流转过程与 x-death 头信息。
2. 项目结构
建议使用如下目录结构组织代码:
rabbitmq-dlq-demo
├─ pom.xml
├─ src/main/resources
│ └─ application.yml
└─ src/main/java/com/example/rabbitdlq
├─ RabbitDlqApplication.java
├─ config/RabbitMQConfig.java
├─ controller/TestProducerController.java
├─ service/MessageProducerService.java
└─ consumer
├─ NormalRejectConsumer.java
└─ DeadLetterConsumer.java
3. 完整代码
下面给出每个文件的完整代码。将这些文件按目录放入项目后,即可直接启动并测试。
3.1 pom.xml
Maven 依赖配置,包含 Web、RabbitMQ 与测试依赖。
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.5</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>rabbitmq-dlq-demo</artifactId>
<version>1.0.0</version>
<name>rabbitmq-dlq-demo</name>
<description>RabbitMQ Dead Letter Queue Demo</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<!-- Web,用于提供测试接口 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- RabbitMQ 核心依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Spring Boot 打包插件 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.2 src/main/resources/application.yml
RabbitMQ 连接配置与基础日志级别。
server:
port: 8080
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
logging:
level:
root: info
3.3 src/main/java/com/example/rabbitdlq/RabbitDlqApplication.java
项目启动类。
package com.example.rabbitdlq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
/**
* 项目启动类
*/
@EnableRabbit
@SpringBootApplication
public class RabbitDlqApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitDlqApplication.class, args);
}
}
3.4 src/main/java/com/example/rabbitdlq/config/RabbitMQConfig.java
RabbitMQ 核心配置:正常交换机、正常队列、死信交换机、死信队列、绑定关系以及手动 ACK 容器工厂。
package com.example.rabbitdlq.config;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置类
*
* 说明:
* 1. 定义业务交换机
* 2. 定义两个正常队列:
* - 一个用于“拒绝消息后进入死信”
* - 一个用于“TTL 到期后进入死信”
* 3. 定义死信交换机和死信队列
* 4. 定义手动 ACK 的监听容器工厂
*/
@Configuration
public class RabbitMQConfig {
// 1. 业务交换机 / 路由键
public static final String BIZ_EXCHANGE = "exchange.biz.video";
public static final String ROUTING_KEY_REJECT = "routing.key.video.reject";
public static final String ROUTING_KEY_TTL = "routing.key.video.ttl";
// 2. 正常队列
public static final String NORMAL_REJECT_QUEUE = "queue.normal.video.reject";
public static final String NORMAL_TTL_QUEUE = "queue.normal.video.ttl";
// 3. 死信交换机 / 队列 / 路由键
public static final String DEAD_EXCHANGE = "exchange.dead.letter.video";
public static final String DEAD_QUEUE = "queue.dead.letter.video";
public static final String DEAD_ROUTING_KEY = "routing.key.dead.letter.video";
/**
* 业务交换机
*/
@Bean
public DirectExchange bizExchange() {
return new DirectExchange(BIZ_EXCHANGE, true, false);
}
/**
* 死信交换机
*/
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE, true, false);
}
/**
* 正常队列 1:用于测试“消费者拒绝后进入死信”
*/
@Bean
public Queue normalRejectQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
return new Queue(NORMAL_REJECT_QUEUE, true, false, false, args);
}
/**
* 正常队列 2:用于测试“消息 TTL 到期后进入死信”
*
* x-message-ttl:消息在该队列中的最大存活时间,单位毫秒
*/
@Bean
public Queue normalTtlQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
args.put("x-message-ttl", 10000); // 10 秒后过期
return new Queue(NORMAL_TTL_QUEUE, true, false, false, args);
}
/**
* 死信队列
*/
@Bean
public Queue deadQueue() {
return new Queue(DEAD_QUEUE, true);
}
/**
* 绑定:正常拒绝队列 -> 业务交换机
*/
@Bean
public Binding bindRejectQueue() {
return BindingBuilder.bind(normalRejectQueue())
.to(bizExchange())
.with(ROUTING_KEY_REJECT);
}
/**
* 绑定:正常 TTL 队列 -> 业务交换机
*/
@Bean
public Binding bindTtlQueue() {
return BindingBuilder.bind(normalTtlQueue())
.to(bizExchange())
.with(ROUTING_KEY_TTL);
}
/**
* 绑定:死信队列 -> 死信交换机
*/
@Bean
public Binding bindDeadQueue() {
return BindingBuilder.bind(deadQueue())
.to(deadExchange())
.with(DEAD_ROUTING_KEY);
}
/**
* 手动 ACK 的监听器工厂
*/
@Bean("manualAckContainerFactory")
public SimpleRabbitListenerContainerFactory manualAckContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setPrefetchCount(1);
return factory;
}
}
3.5 src/main/java/com/example/rabbitdlq/service/MessageProducerService.java
消息发送服务,分别投递到两个测试场景对应的正常队列。
package com.example.rabbitdlq.service;
import com.example.rabbitdlq.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
/**
* 消息生产者
*/
@Service
public class MessageProducerService {
private final RabbitTemplate rabbitTemplate;
public MessageProducerService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* 发送消息到“拒绝测试队列”
*/
public void sendRejectMessage(String message) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.BIZ_EXCHANGE,
RabbitMQConfig.ROUTING_KEY_REJECT,
message
);
}
/**
* 发送消息到“TTL 测试队列”
*/
public void sendTtlMessage(String message) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.BIZ_EXCHANGE,
RabbitMQConfig.ROUTING_KEY_TTL,
message
);
}
}
3.6 src/main/java/com/example/rabbitdlq/controller/TestProducerController.java
测试接口:一个用于拒绝测试,一个用于 TTL 过期测试。
package com.example.rabbitdlq.controller;
import com.example.rabbitdlq.service.MessageProducerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 提供两个 HTTP 接口,便于直接测试
*/
@RestController
public class TestProducerController {
private final MessageProducerService producerService;
public TestProducerController(MessageProducerService producerService) {
this.producerService = producerService;
}
/**
* 测试 1:发送到“拒绝测试队列”
* 示例:/mq/send/reject?msg=hello-reject
*/
@GetMapping("/mq/send/reject")
public String sendReject(@RequestParam(defaultValue = "hello-reject") String msg) {
producerService.sendRejectMessage(msg);
return "已发送到拒绝测试队列:" + msg;
}
/**
* 测试 2:发送到“TTL 测试队列”
* 示例:/mq/send/ttl?msg=hello-ttl
*/
@GetMapping("/mq/send/ttl")
public String sendTtl(@RequestParam(defaultValue = "hello-ttl") String msg) {
producerService.sendTtlMessage(msg);
return "已发送到 TTL 测试队列(10 秒后过期进入死信):" + msg;
}
}
3.7 src/main/java/com/example/rabbitdlq/consumer/NormalRejectConsumer.java
正常消费者:收到消息后主动拒绝,不重新入队,从而触发死信。
package com.example.rabbitdlq.consumer;
import com.example.rabbitdlq.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 正常消费者(拒绝测试)
*/
@Component
public class NormalRejectConsumer {
private static final Logger log = LoggerFactory.getLogger(NormalRejectConsumer.class);
@RabbitListener(
queues = RabbitMQConfig.NORMAL_REJECT_QUEUE,
containerFactory = "manualAckContainerFactory"
)
public void processMessage(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String body = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("★ [normal-reject] 收到正常队列消息:{}", body);
log.info("★ [normal-reject] 故意拒绝消息,不重新入队,进入死信队列");
// false = 不重新入队,触发死信
channel.basicReject(deliveryTag, false);
}
}
3.8 src/main/java/com/example/rabbitdlq/consumer/DeadLetterConsumer.java
死信消费者:统一消费死信消息,并打印 x-death 头信息。
package com.example.rabbitdlq.consumer;
import com.example.rabbitdlq.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 死信消费者
*/
@Component
public class DeadLetterConsumer {
private static final Logger log = LoggerFactory.getLogger(DeadLetterConsumer.class);
@RabbitListener(
queues = RabbitMQConfig.DEAD_QUEUE,
containerFactory = "manualAckContainerFactory"
)
public void processDeadLetter(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
String body = new String(message.getBody(), StandardCharsets.UTF_8);
Map<String, Object> headers = message.getMessageProperties().getHeaders();
log.info("==============================================");
log.info("★ [dead-letter] 收到死信消息:{}", body);
log.info("★ [dead-letter] headers:{}", headers);
log.info("★ [dead-letter] x-death:{}", headers.get("x-death"));
log.info("==============================================");
// 死信队列中的消息正常确认
channel.basicAck(deliveryTag, false);
}
}
4. 启动 RabbitMQ
如果本机没有安装 RabbitMQ,可以先用 Docker 方式启动。
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
管理后台地址:http://localhost:15672,默认用户名和密码均为 guest。
5. 测试步骤
项目启动成功后,可通过如下方式依次测试两个场景。
建议先执行“拒绝测试”,再执行“TTL 测试”,更容易观察消息流转。
|
序号 |
测试场景 |
访问接口 |
预期结果 |
|
1 |
消费者拒绝消息 |
GET /mq/send/reject?msg=hello-reject-001 |
正常消费者收到消息后执行 basicReject(deliveryTag, false),随后死信消费者打印 x-death,其中 reason 通常为 rejected。 |
|
2 |
消息 TTL 到期 |
GET /mq/send/ttl?msg=hello-ttl-001 |
消息发送后因为正常队列未设置该场景下的消费者,约 10 秒后转入死信队列,x-death 中的 reason 通常为 expired。 |
curl "http://localhost:8080/mq/send/reject?msg=hello-reject-001"
curl "http://localhost:8080/mq/send/ttl?msg=hello-ttl-001"
6. 预期日志示例
下面给出两个典型日志片段,便于你在实验报告中说明测试结果。
6.1 拒绝消息进入死信的日志
★ [normal-reject] 收到正常队列消息:hello-reject-001
★ [normal-reject] 故意拒绝消息,不重新入队,进入死信队列
★ [dead-letter] 收到死信消息:hello-reject-001
★ [dead-letter] x-death:[... reason=rejected ...]
6.2 TTL 到期进入死信的日志
★ [dead-letter] 收到死信消息:hello-ttl-001
★ [dead-letter] x-death:[... reason=expired ...]
7. 关键说明
- 死信队列不是独立类型的队列,而是一种“消息转移结果”:当消息满足死信条件时,会被重新路由到指定的死信交换机。
- 本示例将死信交换机设置为 direct 类型,便于通过明确的路由键将死信转发到固定队列。
- TTL 队列测试故意不配置对应的正常消费者,否则消息会立即被消费,无法等待到过期。
- 在手动 ACK 模式下,监听器要自己调用 basicAck、basicReject 或 basicNack。
- 查看死信原因时,可以重点观察消息头中的 x-death 字段。
8. 总结
通过本示例,可以完整验证 RabbitMQ 死信队列的常见用法:一是消费者主动拒绝消息并不重新入队,二是消息在正常队列中等待超过 TTL 后过期。二者最终都会流入死信交换机,再进入死信队列,由专门的消费者进行统一处理。
如果后续还需要扩展,可以在此基础上继续加入重试队列、延迟重投、失败原因分类、批量回放以及监控告警等机制。
更多推荐


所有评论(0)