Java中RabbitMQ相关-Spring AMQP的使用
文章目录
在Java中使用Rabbitmq
使用SpringBoot组件Spring AMQP
添加依赖
<dependency>
<!-- Spring AMQP-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置消息转换器
Spring AMQP 默认使用 JDK 序列化方式,推荐替换为 JSON 序列化。在生产者和消费者启动类中配置:
@Bean
public MessageConverter jacksonToJsonMessageConverter() {
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
messageConverter.setCreateMessageIds(true); // 自动生成消息ID,保证消息唯一性
return messageConverter;
}
配置后,RabbitTemplate 发送对象时会自动序列化为 JSON,消费者接收时自动反序列化。
配置文件
生产者配置文件
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: rabbitmqDemo
password: rabbitmqDemo
virtual-host: rabbitmqDemo
connection-timeout: 1s # 设置MQ的链接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后初始等待时间
multiplier: 1 # 失败后下次等待时间倍长,下次等待时间 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
publisher-confirm-type: correlated # 开启publish confirm机制,并设置confirm类型 none: 关闭confirm机制; simple: 同步阻塞等待MQ的回执消息; correlated: MQ异步回调方式返回回执消息
publisher-returns: true # 开启publish return机制
消费者配置文件
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: rabbitmqDemo
username: rabbitmqDemo
password: rabbitmqDemo
listener:
simple:
prefetch: 1 # 预取消息数量,即最多推送多少条未确认消息给消费者,确认后才能获取下一条
acknowledge-mode: auto # 消息确认机制 none: 关闭 auto: 自动 manual: 手动
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初始失败等待时长
multiplier: 1 # 失败等待时长倍数,下次等待时长 = 初始时长 * 倍数
max-attempts: 3 # 最大失败重试次数
stateless: true # true: 无状态; false: 有状态 如果业务中包含事务,改为false
定义交换机队列
两种方式,一种是通过配置Bean定义交换机队列,一种是通过注解定义交换机队列
配置类
@Configuration
public class MqConfig {
// --- 队列 ---
@Bean
public Queue test1Queue() {
return QueueBuilder.durable("test1.queue").build();
}
@Bean
public Queue test2Queue() {
return new Queue("test2.queue");
}
// --- 交换机 ---
@Bean
public DirectExchange test1DirectExchange() {
return new DirectExchange("test1.direct.exchange", true, false);
}
@Bean
public TopicExchange test1TopicExchange() {
return new TopicExchange("test1.topic.exchange", true, false);
}
@Bean
public FanoutExchange test1FanoutExchange() {
return new FanoutExchange("test1.fanout.exchange", true, false);
}
// --- Binding ---
// Direct Exchange:精确匹配 routing key
@Bean
public Binding test1DirectBinding(Queue test1Queue, DirectExchange test1DirectExchange) {
return BindingBuilder.bind(test1Queue).to(test1DirectExchange).with("test1.key");
}
// Topic Exchange:通配符匹配(# 匹配0或多个单词,* 匹配1个单词)
@Bean
public Binding test1TopicBinding(Queue test1Queue, TopicExchange test1TopicExchange) {
return BindingBuilder.bind(test1Queue).to(test1TopicExchange).with("test1.#");
}
// Fanout Exchange:广播,不需要 routing key
@Bean
public Binding test1FanoutBinding(Queue test1Queue, FanoutExchange test1FanoutExchange) {
return BindingBuilder.bind(test1Queue).to(test1FanoutExchange);
}
}
Fanout 广播模式示例(一对多绑定):
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("demo.fanout2");
}
@Bean
public Queue fanoutQueue3() {
return new Queue("fanout.queue3");
}
@Bean
public Queue fanoutQueue4() {
return new Queue("fanout.queue4");
}
@Bean
public Binding fanoutBinding(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
}
@Bean
public Binding fanoutBinding2(Queue fanoutQueue4, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue4).to(fanoutExchange);
}
}
注解定义
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue4",declare = "true"),
exchange = @Exchange(name = "annotation.direct",type = ExchangeTypes.DIRECT),
key = {"key2","key3"}
))
public void listenToDirectQueue4CreateByAnnotation(String message){
log.info("listenToDirectQueue4CreateByAnnotation 接收到消息: {}", message);
}
发送消息
发送字符串
@Test
public void testSendMessage() {
String exchangeName = "test1.direct.exchange";
String message = "这是一条测试消息";
rabbitTemplate.convertAndSend(exchangeName, "test1.key", message);
}
发送对象(Map)
配置了 Jackson2JsonMessageConverter 后,可以直接发送对象,会自动序列化为 JSON:
@Test
public void testSendMapMessage() {
Map<String, Object> map = new HashMap<>();
map.put("name", "张三");
map.put("age", 18);
rabbitTemplate.convertAndSend("test1.direct.exchange", "test1.key", map);
}
Fanout 广播发送
Fanout 交换机会忽略 routing key,发送到所有绑定的队列:
@Test
public void testSendToFanoutExchange() {
rabbitTemplate.convertAndSend("demo.fanout2", null, "广播消息");
}
Topic 通配符发送
@Test
public void testSendToTopicExchange() {
// routing key 为 "1.1.b",会匹配 "test1.#" 但不会匹配 "test1.*.info"
rabbitTemplate.convertAndSend("test1.topic.exchange", "1.1.b", "Topic消息");
}
如何保证Rabbitmq消息的可靠性
一共三个方面:生产者可靠;mq可靠;消费者可靠
生产者可靠
生产者重试机制 + 生产者确认机制(Publisher Confirm)+ 消息回退机制(Publisher Return)
生产者 Confirm 机制
配置 publisher-confirm-type: correlated 后,MQ 收到消息会异步回调通知生产者。通过 CorrelationData 绑定消息与回调:
@Test
public void testConfirmCallBack() {
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
log.error("回调失败: {}", ex.getMessage());
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
if (result.isAck()) {
log.debug("消息发送成功,id: {}", cd.getId());
} else {
log.error("消息发送失败,id: {}, 原因: {}", cd.getId(), result.getReason());
}
}
});
rabbitTemplate.convertAndSend("demo.direct1", "key", "消息内容", cd);
}
生产者 Return 机制
配置 publisher-returns: true 后,如果消息从交换机无法路由到队列(routing key 不匹配),MQ 会回调通知生产者:
@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnsCallback(returnedMessage -> log.debug(
"消息丢失: exchange:{}, route:{}, replyCode:{}, replyText:{}, message:{}",
returnedMessage.getExchange(), returnedMessage.getRoutingKey(),
returnedMessage.getReplyCode(), returnedMessage.getReplyText(),
returnedMessage.getMessage()));
}
}
mq可靠
交换机持久化 + 队列持久化 + 消息持久化
消费者可靠
消费者确认机制
手动ACK示例
消费者配置 acknowledge-mode: manual,然后在消费方法中手动确认:
@RabbitListener(queues = "test1.queue")
public void listenManualAck(String message, Channel channel, Message msg) throws IOException {
try {
log.info("接收到消息: {}", message);
// 处理业务逻辑...
// 手动确认,第二个参数 false 表示不批量确认
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("消息处理失败", e);
// 第三个参数 true 表示消息重新入队
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
}
}
消费者失败重试 + 错误队列
开启消费者重试后,如果重试次数耗尽消息仍然消费失败,默认会丢弃消息。可以通过配置 MessageRecoverer 将失败消息转发到错误队列,避免消息丢失:
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfig {
@Bean
public DirectExchange errorExchange() {
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue() {
return new Queue("error.queue");
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error.routing.key");
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
// 重试耗尽后,将消息重新发布到错误交换机
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error.routing.key");
}
}
Rabbitmq的延迟消息
两种实现方式,第一种是通过死信队列,设置消息过期时间;第二种是通过rabbitmq插件定义延迟消息交换机。
方式一:死信队列 + TTL
消息在队列中过期后,会被转发到死信交换机(Dead Letter Exchange),再路由到死信队列,消费者监听死信队列即可实现延迟消费。
原理:给普通队列设置 TTL 和死信交换机,消息过期后自动转发到死信队列。
配置类方式
@Configuration
public class DelayConfig {
// 普通队列:设置 TTL 和死信交换机
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("delay.queue")
.ttl(10000) // 消息过期时间 10s
.deadLetterExchange("DLX.direct")
.deadLetterRoutingKey("DLX.key")
.build();
}
@Bean
public DirectExchange delayExchange() {
return new DirectExchange("delay.direct");
}
@Bean
public Binding delayBinding(Queue delayQueue, DirectExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.key");
}
// 死信交换机
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("DLX.direct");
}
// 死信队列
@Bean
public Queue dlxQueue() {
return new Queue("DLX.queue");
}
@Bean
public Binding dlxBinding(Queue dlxQueue, DirectExchange dlxExchange) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("DLX.key");
}
}
也可以通过 withArgument 方式设置死信参数(效果相同):
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("delay.queue")
.withArgument("x-dead-letter-exchange", "DLX.direct")
.withArgument("x-dead-letter-routing-key", "DLX.key")
.build();
}
发送和消费
// 发送消息到普通队列
rabbitTemplate.convertAndSend("delay.direct", "delay.key", "延迟消息");
// 监听死信队列,10s 后消费
@RabbitListener(queues = "DLX.queue")
public void listenDlx(String message) {
log.info("延迟消费: {}", message);
}
单条消息设置 TTL
除了在队列级别设置 TTL,也可以对单条消息单独设置过期时间(但存在队头阻塞问题:队头消息 TTL 长,会阻塞后面短 TTL 的消息过期):
@Test
public void testDelayMessage() {
rabbitTemplate.convertAndSend("delay.direct", "delay.key", "消息内容", message -> {
message.getMessageProperties().setExpiration("10000"); // 单条消息 TTL 10s
return message;
});
}
缺点:每种延迟时间需要一个独立队列,或者存在队头阻塞问题。
方式二:延迟消息插件(推荐)
安装 rabbitmq_delayed_message_exchange 插件后,可以使用 x-delayed-message 类型的交换机,消息在交换机层面延迟投递,支持任意延迟时间。
注解方式定义
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue2"),
exchange = @Exchange(name = "delay.direct2", type = ExchangeTypes.DIRECT, delayed = "true"),
key = "delay.key"
))
public void listenDelayQueue2(String message) {
log.info("listenDelayQueue2 接收到消息: {}", message);
}
注意 @Exchange 注解中 delayed = "true",标识这是一个延迟交换机。
配置类方式定义
@Configuration
public class DelayedMqConfig {
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
}
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable("delayed.queue").build();
}
@Bean
public Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed.key");
}
}
发送延迟消息
通过 setDelay 设置延迟时间(毫秒):
@Test
public void testSendMessageWithPlugin() {
rabbitTemplate.convertAndSend("delay.direct2", "delay.key", "插件延迟消息", message -> {
message.getMessageProperties().setDelay(10000); // 延迟 10s
return message;
});
}
两种方式对比
| 对比项 | 死信队列 + TTL | 延迟消息插件 |
|---|---|---|
| 实现原理 | 消息在队列中过期后转发到死信队列 | 消息在交换机中延迟后直接投递 |
| 延迟精度 | 每种延迟时间需要独立队列 | 支持任意延迟时间 |
| 队头阻塞 | 单条消息 TTL 存在此问题 | 无此问题 |
| 依赖 | 无需额外插件 | 需安装 rabbitmq_delayed_message_exchange 插件 |
| 推荐 | 简单场景 | 复杂延迟场景 |
更多推荐

所有评论(0)