RabbitMQ
在微服务中,模块之间的通讯有两种方式,一种是同步通讯,发送请求之后,希望立即得到结果,例如feign,一种是异步通讯,发送请求之后,不需要立即得到结果,例如RabbitMQ。
一、简介
在微服务中,模块之间的通讯有两种方式,一种是同步通讯,发送请求之后,希望立即得到结果,例如feign,一种是异步通讯,发送请求之后,不需要立即得到结果,例如RabbitMQ。
(一) 同步通讯
微服务之间使用Feign调用就属于同步方式,虽然调用可以实时获取到结果,但是存在以下问题:
-
- 程序耦合度高
- 性能下降
- 资源浪费
- 级联失败
(二) 异步通讯
使用异步调用的方式可以避免同步调用所带来的问题,不需要立即得到返回结果,同时也不用关心接收方对数据的处理,调用方只需要发送消息,不关心消息是如何被处理的,接收方,只需要接收消息,不需要返回结果。带来的好处就是吞吐量提升、故障隔离、调用间没有阻塞、耦合度低以及流量消峰,缺点就是架构复杂、需要依赖消息队列的可靠性、安全性以及性能。
(三) 技术对比
MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
比较常见的MQ实现:
-
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
几种常见MQ的对比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
- 追求可用性:Kafka、 RocketMQ 、RabbitMQ
- 追求可靠性:RabbitMQ、RocketMQ
- 追求吞吐能力:RocketMQ、Kafka
- 追求消息低延迟:RabbitMQ、Kafka
二、安装
这里使用docker容器化部署的方式进行安装,默认账号:root,密码:123456
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-di \
rabbitmq:3.8-management
http端口为5672,可以通过5672端口来访问管理后台。
三、快速入门
(一) 基本概念
在RabbitMQ中,一些基本的概念和名词:
- publisher:生产者
- consumer:消费者
- exchange:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
。。。
RabbitMQ概念详细说明
- Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
- Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
- Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别。
- Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
- Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定可以是多对多的关系。
- Connection
网络连接,比如一个TCP连接。
- Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
- Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
- Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
- Broker
表示消息队列服务器实体
MQ的基本结构:
RabbitMQ官方提供了5个不同的示例,对应了不同的消息模型:
对于提供的5种消息模型,在后面进行介绍,这里先说以下RabbitMQ的实现,实现RabbitMQ采用的是SpringAMQP,它是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。SpringAMQP的官方地址:Spring AMQP
在SpringAMQP中提供了两个非常重要的组件:AmqpAdmin和RabbitTemplate,分别用来管理队列和交换机以及发送消息。对于SpringAMQP提供的API更详细、更强大的用法,可以参考官网进行使用。
(二) 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(三) 添加配置
spring:
rabbitmq:
host: 192.168.0.240
port: 5672
virtual-host: /
username: root
password: 123456
(四) 消息发送
这里以简单队列模式进行测试:
// 队列名称
String queueName = "simple.queue";
// 使用AmqpAdmin声明队列
amqpAdmin.declareQueue(new Queue(queueName, true, false, false, null));
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
这个时候,就可以在Queue中看到自己创建的队列和消息:
(五) 消息接收
刚刚发送消息的时候,是往simple.queue的队列中发送了一条消息,消息内容为:hello, spring amqp!,需要获取到队列中的消息。
@EnableRabbit
package cn.yichangqiao.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消费者接收到消息:【" + msg + "】");
}
}
这里消息接收的参数类型(public void listenSimpleQueueMessage(String msg))可以是AMQP提供的Message对象(包含了全部消息内容,消息头和消息体等)、发送消息的类型、Channel(当前传输的通道),并且三种参数不分数量和顺序。
此外,对于监听队列的消息,SpringAMQP提供了两个注解,分别为RabbitListener和RabbitHandler,不同之处在于RabbitListener主要用在类和方法上,用于指明监听哪些队列,而RabbitHandler主要用在方法上,用于重载接收不同类型的消息。
(六) 消息转换器
Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
因此我们需要修改配置为JSON转换器,在publisher和consumer两个服务中都引入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
配置消息转换器。
在启动类中添加一个Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
四、消息模型
在MQ的官方文档中,给出了5个消息队列的模型。
(一) 基本消息队列
基本消息队列中接收消息的消费者,只绑定一个队列,也就是这个消费者只消费这一个队列中的消息,与队列之间是一对一的绑定关系。由消息的发送者,往队列中发送一条消息后,由绑定这个队列的消费者进行消费,这种模式就是简单队列模式。
/**
* 基本消息队列 - 消息发送者
*
* @Author: yichangqiao
* @Date: 2024/2/20 9:44
*/
@Slf4j
@Component
public class BasicQueuePublish {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
/**
* 发送消息到简单队列
*/
public void sendMessageToBasicQueue() {
// 创建队列
String queueName = "basic.queue";
// 创建队列的参数:
// String name (队列名称), boolean durable (持久性), boolean exclusive (排他性), boolean autoDelete (自动删除)
amqpAdmin.declareQueue(new Queue(queueName, true, false, false));
// 发送消息
String message = "发送的消息内容为:" + LocalDateTime.now();
rabbitTemplate.convertAndSend(queueName, message);
log.info("发送消息到简单队列成功");
}
}
/**
* 简单消息队列 - 消息消费者
* @Author: yichangqiao
* @Date: 2024/2/20 10:08
*/
@Slf4j
@Component
public class BasicQueueListener {
@RabbitListener(queues = "basic.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消费者接收到消息:【" + msg + "】");
}
}
(二) 工作消息队列
Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
/**
* 工作消息队列 - 消息发送者
*
* @Author: yichangqiao
* @Date: 2024/2/20 10:23
*/
@Slf4j
@Component
public class WorkQueuePublish {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
/**
* 发送消息到简单队列
*/
public void sendMessageToWorkQueue() {
// 创建队列
String queueName = "work.queue";
// 创建队列的参数:
// String name (队列名称), boolean durable (持久性), boolean exclusive (排他性), boolean autoDelete (自动删除)
amqpAdmin.declareQueue(new Queue(queueName, true, false, false));
// 发送消息
String message = "_发送的消息内容为:" + LocalDateTime.now();
for (int i = 0; i < 500; i++) {
rabbitTemplate.convertAndSend(queueName, i + message);
}
log.info("发送消息到工作队列成功");
}
}
/**
* 工作消息队列 - 消息消费者
*
* @Author: yichangqiao
* @Date: 2024/2/20 10:28
*/
@Slf4j
@Component
public class WorkQueueListener {
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
}
这里使用两个消费者,共同绑定了work.queue这一个队列,来共同消费队列中的消息,但是在listenWorkQueue1中消费消息后休眠了20S,在listenWorkQueue2中消费消息后休眠了200S,这两个消费者消费消息的能力是不同的,listenWorkQueue1的消费能力是比listenWorkQueue2要强的,但是在实际的消费过程中,消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。
在spring中有一个简单的配置,可以解决这个问题。修改consumer服务的application.yml文件,添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
(三) 发布订阅
在发布订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给交换机
- Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
-
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- Consumer:消费者,与以前一样,订阅队列,没有变化。
- Queue:消息队列也与以前一样,接收消息、缓存消息。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
Spring提供了一个接口Exchange,来表示所有不同类型的交换机:
1. 广播
在广播模式下,消息发送流程是这样的:
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("exchange.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
/**
* 广播模式 - 消息发送者
*
* @Author: yichangqiao
* @Date: 2024/2/20 10:50
*/
@Slf4j
@Component
public class FanoutExchangePublish {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
/**
* 发送消息 - 广播
*/
public void sendMessageToFanoutExchange() {
// 交换机
String exchangeName = "exchange.fanout";
// 消息
String message = "hello, everyone!" + LocalDateTime.now();
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
}
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
2. 路由
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息。
/**
* 路由模式 - 消息消费者
* @Author: yichangqiao
* @Date: 2024/2/20 11:05
*/
@Slf4j
@Component
public class DirectExchangeListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
}
/**
* 路由模式 - 消息发送者
*
* @Author: yichangqiao
* @Date: 2024/2/20 11:03
*/
@Slf4j
@Component
public class DirectExchangePublish {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
public void sendMessageToDirectExchange() {
// 交换机名称
String exchangeName = "exchange.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
}
3. 主题
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者 item.spu
item.*
:只能匹配item.spu
图示:
解释:
- Queue1:绑定的是
china.#
,因此凡是以china.
开头的routing key
都会被匹配到。包括china.news和china.weather - Queue2:绑定的是
#.news
,因此凡是以.news
结尾的routing key
都会被匹配。包括china.news和japan.news
/**
* 主题模式 - 消息发送者
* @Author: yichangqiao
* @Date: 2024/2/20 11:12
*/
@Slf4j
@Component
public class TopicExchangePublish {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
public void sendMessageToTopicExchange() {
// 交换机名称
String exchangeName = "exchange.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
}
/**
* 主题模式 - 消息消费者
* @Author: yichangqiao
* @Date: 2024/2/20 11:13
*/
@Slf4j
@Component
public class TopicExchangeListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
}
五、消息可靠性
消息队列在实际的使用过程中,其实有很多的问题需要思考。
从消息发送,到消费者接收,会经历很多的过程。
其中的每一步都可能导致消息丢失,常见的丢失原因包括:
- 发送时丢失:
-
- 生产者发送的消息未送达exchange
- 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机
针对这些问题,RabbitMQ分别给出了解决方案:
- 生产者确认机制
- mq持久化
- 消费者确认机制
- 失败重试机制
(一) 生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
返回结果有两种方式:
- publisher-confirm,发送者确认
-
- 消息成功投递到交换机,返回ack。
- 消息未投递到交换机,返回nack。
- publisher-return,发送者回执
-
- 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
确认机制发送消息时,需要给每一个消息设置一个全局唯一id,以区分不同消息,避免ACK冲突。
- publisher-return(发送者回执)
如果消息正确投递到了交换机,但是没有路由到队列,可以使用发送者回执机制,返回ACK,以及路由失败的原因。
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启发布者确认机制 异步回调机制
publisher-returns: true # 开启消费者返回机制
template:
mandatory: true # 消息路由失败后的策略,true代表的是手动处理
说明:
publish-confirm-type
:开启publisher-confirm,这里支持两种类型:
-
simple
:同步等待confirm结果,直到超时correlated
:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback- 默认为none
publish-returns
:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallbacktemplate.mandatory
:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息。
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 投递失败,记录日志
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有业务需要,可以重发消息
});
}
}
- publisher-confirm(发送者确认)
发送者确认机制用来保证消息能够投递到交换机,如果没有正确的投递到交换机,返回ACK。
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.消息体
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// 3.1.ack,消息成功
log.debug("消息发送成功, ID:{}", correlationData.getId());
}else{
// 3.2.nack,消息失败
log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
}
},
ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.发送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
// 休眠一会儿,等待ack回执
Thread.sleep(2000);
}
(二) 消息持久化
生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。
- 交换机持久化
- 队列持久化
- 消息持久化
- 交换机持久化
SpringAMQP中可以通过代码指定交换机持久化:
@Bean
public DirectExchange simpleExchange(){
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, false);
}
- 队列持久化
SpringAMQP中可以通过代码指定队列持久化:
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder构建队列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
- 消息持久化
利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:
- 1:非持久化
- 2:持久化
用java代码指定:
默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。
(三) 消费者确认机制
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
设想这样的场景:
- 1)RabbitMQ投递消息给消费者
- 2)消费者获取消息后,返回ACK给RabbitMQ
- 3)RabbitMQ删除消息
- 4)消费者宕机,消息尚未处理
这样,消息就丢失了。因此消费者返回ACK的时机非常重要。而SpringAMQP则允许配置三种确认模式:
•manual:手动ack,需要在业务代码结束后,调用api发送ack。
•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
由此可知:
- none模式下,消息投递是不可靠的,可能丢失
- auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
- manual:自己根据业务情况,判断什么时候该ack
一般,我们都是使用默认的auto即可。
- none模式
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 关闭ack
- auto模式
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # Spring自动处理ack
(四) 失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。
- 本地重试
可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试,重试达到最大次数后,Spring会返回ack,消息会被丢弃。
- 失败策略
在本地重试达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
重试失败之后,消息会从simple.queue队列中自动入队到error.queue队列中存储。
六、死信交换机
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果这个包含死信的队列配置了dead-letter-exchange
属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。
在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。可以给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。
// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){
return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
.deadLetterExchange("dl.direct") // 指定死信交换机
.build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){
return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:
● 消息所在的队列设置了超时时间
● 消息本身设置了超时时间
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.ttl.queue", durable = "true"),
exchange = @Exchange(name = "dl.ttl.direct"),
key = "ttl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}
给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
.ttl(10000) // 设置队列的超时时间,10秒
.deadLetterExchange("dl.ttl.direct") // 指定死信交换机
.build();
}
注意,这个队列设定了死信交换机为dl.ttl.direct
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
@Test
public void testTTLQueue() {
// 创建消息
String message = "hello, ttl queue";
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
// 记录日志
log.debug("发送消息成功");
}
@Test
public void testTTLMsg() {
// 创建消息
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
log.debug("发送消息成功");
}
当队列、消息都设置了TTL时,任意一个到期就会成为死信。
七、延迟队列
利用死信交换机和TTL可以实现延迟队列效果,实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
延迟队列的使用场景包括:
- 延迟发送短信
- 用户下单,如果用户在15 分钟内未支付,则自动取消
- 预约工作会议,20分钟后自动通知所有参会人员
因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。
这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:
使用方式可以参考官网地址:Scheduling Messages with RabbitMQ | RabbitMQ
DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:
- 接收消息
- 判断消息是否具备x-delay属性
- 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
- 返回routing not found结果给消息发送者
- x-delay时间到期后,重新投递消息到指定队列
插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。
声明DelayExchange交换机:
发送消息:
八、惰性队列
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
解决消息堆积有两种思路:
- 增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
- 扩大队列容积,提高堆积上限
要提升队列容积,把消息保存在内存中显然是不行的。
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
基于@Bean声明lazy-queue
基于@RabbitListener声明LazyQueue
更多推荐
所有评论(0)