一、简介

在微服务中,模块之间的通讯有两种方式,一种是同步通讯,发送请求之后,希望立即得到结果,例如feign,一种是异步通讯,发送请求之后,不需要立即得到结果,例如RabbitMQ。

(一) 同步通讯

微服务之间使用Feign调用就属于同步方式,虽然调用可以实时获取到结果,但是存在以下问题:

    • 程序耦合度高
    • 性能下降
    • 资源浪费
    • 级联失败

(二) 异步通讯

使用异步调用的方式可以避免同步调用所带来的问题,不需要立即得到返回结果,同时也不用关心接收方对数据的处理,调用方只需要发送消息,不关心消息是如何被处理的,接收方,只需要接收消息,不需要返回结果。带来的好处就是吞吐量提升、故障隔离、调用间没有阻塞、耦合度低以及流量消峰,缺点就是架构复杂、需要依赖消息队列的可靠性、安全性以及性能。

(三) 技术对比

MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。

比较常见的MQ实现:

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka

几种常见MQ的对比:

RabbitMQ

ActiveMQ

RocketMQ

Kafka

公司/社区

Rabbit

Apache

阿里

Apache

开发语言

Erlang

Java

Java

Scala&Java

协议支持

AMQP,XMPP,SMTP,STOMP

OpenWire,STOMP,REST,XMPP,AMQP

自定义协议

自定义协议

可用性

一般

单机吞吐量

一般

非常高

消息延迟

微秒级

毫秒级

毫秒级

毫秒以内

消息可靠性

一般

一般

  • 追求可用性:Kafka、 RocketMQ 、RabbitMQ
  • 追求可靠性:RabbitMQ、RocketMQ
  • 追求吞吐能力:RocketMQ、Kafka
  • 追求消息低延迟:RabbitMQ、Kafka

二、安装

这里使用docker容器化部署的方式进行安装,默认账号:root,密码:123456

docker run \
 -e RABBITMQ_DEFAULT_USER=root \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -di \
 rabbitmq:3.8-management

http端口为5672,可以通过5672端口来访问管理后台。

三、快速入门

(一) 基本概念

在RabbitMQ中,一些基本的概念和名词:

  • publisher:生产者
  • consumer:消费者
  • exchange:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

。。。

RabbitMQ概念详细说明

  1. Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

  1. Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

  1. Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别。

  1. Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

  1. Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定可以是多对多的关系。

  1. Connection

网络连接,比如一个TCP连接。

  1. Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

  1. Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

  1. Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

  1. Broker

表示消息队列服务器实体

MQ的基本结构:

RabbitMQ官方提供了5个不同的示例,对应了不同的消息模型:

对于提供的5种消息模型,在后面进行介绍,这里先说以下RabbitMQ的实现,实现RabbitMQ采用的是SpringAMQP,它是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。SpringAMQP的官方地址:Spring AMQP

在SpringAMQP中提供了两个非常重要的组件:AmqpAdmin和RabbitTemplate,分别用来管理队列和交换机以及发送消息。对于SpringAMQP提供的API更详细、更强大的用法,可以参考官网进行使用。

(二) 引入依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(三) 添加配置

spring:
  rabbitmq:
    host: 192.168.0.240
    port: 5672
    virtual-host: /
    username: root
    password: 123456

(四) 消息发送

这里以简单队列模式进行测试:

// 队列名称
String queueName = "simple.queue";
// 使用AmqpAdmin声明队列
amqpAdmin.declareQueue(new Queue(queueName, true, false, false, null));
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);

这个时候,就可以在Queue中看到自己创建的队列和消息:

(五) 消息接收

刚刚发送消息的时候,是往simple.queue的队列中发送了一条消息,消息内容为:hello, spring amqp!,需要获取到队列中的消息。

@EnableRabbit
package cn.yichangqiao.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        log.info("spring 消费者接收到消息:【" + msg + "】");
    }

}

这里消息接收的参数类型(public void listenSimpleQueueMessage(String msg))可以是AMQP提供的Message对象(包含了全部消息内容,消息头和消息体等)、发送消息的类型Channel(当前传输的通道),并且三种参数不分数量和顺序。

此外,对于监听队列的消息,SpringAMQP提供了两个注解,分别为RabbitListener和RabbitHandler,不同之处在于RabbitListener主要用在类和方法上,用于指明监听哪些队列,而RabbitHandler主要用在方法上,用于重载接收不同类型的消息。

(六) 消息转换器

Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

因此我们需要修改配置为JSON转换器,在publisher和consumer两个服务中都引入依赖:

<dependency>
  <groupId>com.fasterxml.jackson.dataformat</groupId>
  <artifactId>jackson-dataformat-xml</artifactId>
  <version>2.9.10</version>
</dependency>

配置消息转换器。

在启动类中添加一个Bean即可:

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

四、消息模型

在MQ的官方文档中,给出了5个消息队列的模型。

(一) 基本消息队列

基本消息队列中接收消息的消费者,只绑定一个队列,也就是这个消费者只消费这一个队列中的消息,与队列之间是一对一的绑定关系。由消息的发送者,往队列中发送一条消息后,由绑定这个队列的消费者进行消费,这种模式就是简单队列模式。

/**
 * 基本消息队列 - 消息发送者
 *
 * @Author: yichangqiao
 * @Date: 2024/2/20 9:44
 */
@Slf4j
@Component
public class BasicQueuePublish {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private AmqpAdmin amqpAdmin;

    /**
     * 发送消息到简单队列
     */
    public void sendMessageToBasicQueue() {
        // 创建队列
        String queueName = "basic.queue";
        // 创建队列的参数:
        // String name (队列名称), boolean durable (持久性), boolean exclusive (排他性), boolean autoDelete (自动删除)
        amqpAdmin.declareQueue(new Queue(queueName, true, false, false));
        // 发送消息
        String message = "发送的消息内容为:" + LocalDateTime.now();
        rabbitTemplate.convertAndSend(queueName, message);
        log.info("发送消息到简单队列成功");
    }

}
/**
 * 简单消息队列 - 消息消费者
 * @Author: yichangqiao
 * @Date: 2024/2/20 10:08
 */
@Slf4j
@Component
public class BasicQueueListener {

    @RabbitListener(queues = "basic.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        log.info("spring 消费者接收到消息:【" + msg + "】");
    }

}

(二) 工作消息队列

Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。

/**
 * 工作消息队列 - 消息发送者
 *
 * @Author: yichangqiao
 * @Date: 2024/2/20 10:23
 */
@Slf4j
@Component
public class WorkQueuePublish {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private AmqpAdmin amqpAdmin;

    /**
     * 发送消息到简单队列
     */
    public void sendMessageToWorkQueue() {
        // 创建队列
        String queueName = "work.queue";
        // 创建队列的参数:
        // String name (队列名称), boolean durable (持久性), boolean exclusive (排他性), boolean autoDelete (自动删除)
        amqpAdmin.declareQueue(new Queue(queueName, true, false, false));
        // 发送消息
        String message = "_发送的消息内容为:" + LocalDateTime.now();
        for (int i = 0; i < 500; i++) {
            rabbitTemplate.convertAndSend(queueName, i + message);
        }
        log.info("发送消息到工作队列成功");
    }

}
/**
 * 工作消息队列 - 消息消费者
 *
 * @Author: yichangqiao
 * @Date: 2024/2/20 10:28
 */
@Slf4j
@Component
public class WorkQueueListener {

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(20);
    }

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(200);
    }

}

这里使用两个消费者,共同绑定了work.queue这一个队列,来共同消费队列中的消息,但是在listenWorkQueue1中消费消息后休眠了20S,在listenWorkQueue2中消费消息后休眠了200S,这两个消费者消费消息的能力是不同的,listenWorkQueue1的消费能力是比listenWorkQueue2要强的,但是在实际的消费过程中,消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。

在spring中有一个简单的配置,可以解决这个问题。修改consumer服务的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

(三) 发布订阅

在发布订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给交换机
  • Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

Spring提供了一个接口Exchange,来表示所有不同类型的交换机:

1. 广播

在广播模式下,消息发送流程是这样的:

  • 1)  可以有多个队列
  • 2)  每个队列都要绑定到Exchange(交换机)
  • 3)  生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  • 4)  交换机把消息发送给绑定过的所有队列
  • 5)  订阅队列的消费者都能拿到消息
@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("exchange.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}
/**
 * 广播模式 - 消息发送者
 *
 * @Author: yichangqiao
 * @Date: 2024/2/20 10:50
 */
@Slf4j
@Component
public class FanoutExchangePublish {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private AmqpAdmin amqpAdmin;

    /**
     * 发送消息 - 广播
     */
    public void sendMessageToFanoutExchange() {
        // 交换机
        String exchangeName = "exchange.fanout";
        // 消息
        String message = "hello, everyone!" + LocalDateTime.now();
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }

}
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

2. 路由

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。
/**
 * 路由模式 - 消息消费者
 * @Author: yichangqiao
 * @Date: 2024/2/20 11:05
 */
@Slf4j
@Component
public class DirectExchangeListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }

}
/**
 * 路由模式  - 消息发送者
 *
 * @Author: yichangqiao
 * @Date: 2024/2/20 11:03
 */
@Slf4j
@Component
public class DirectExchangePublish {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private AmqpAdmin amqpAdmin;

    public void sendMessageToDirectExchange() {
        // 交换机名称
        String exchangeName = "exchange.direct";
        // 消息
        String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "blue", message);
    }

}

3. 主题

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.#:能够匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

图示:

解释:

  • Queue1:绑定的是china.# ,因此凡是以 china.开头的routing key 都会被匹配到。包括china.news和china.weather
  • Queue2:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配。包括china.news和japan.news
/**
 * 主题模式 - 消息发送者
 * @Author: yichangqiao
 * @Date: 2024/2/20 11:12
 */
@Slf4j
@Component
public class TopicExchangePublish {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private AmqpAdmin amqpAdmin;

    public void sendMessageToTopicExchange() {
        // 交换机名称
        String exchangeName = "exchange.topic";
        // 消息
        String message = "喜报!孙悟空大战哥斯拉,胜!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    }
}
/**
 * 主题模式 - 消息消费者
 * @Author: yichangqiao
 * @Date: 2024/2/20 11:13
 */
@Slf4j
@Component
public class TopicExchangeListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }
}

五、消息可靠性

消息队列在实际的使用过程中,其实有很多的问题需要思考。

从消息发送,到消费者接收,会经历很多的过程。

其中的每一步都可能导致消息丢失,常见的丢失原因包括:

  • 发送时丢失:
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

针对这些问题,RabbitMQ分别给出了解决方案:

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制

(一) 生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack。
    • 消息未投递到交换机,返回nack。
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

确认机制发送消息时,需要给每一个消息设置一个全局唯一id,以区分不同消息,避免ACK冲突。

  1. publisher-return(发送者回执)

如果消息正确投递到了交换机,但是没有路由到队列,可以使用发送者回执机制,返回ACK,以及路由失败的原因。

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 开启发布者确认机制 异步回调机制
    publisher-returns: true  # 开启消费者返回机制
    template:
      mandatory: true  # 消息路由失败后的策略,true代表的是手动处理

说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    • 默认为none
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息。
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 投递失败,记录日志
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有业务需要,可以重发消息
        });
    }
}
  1. publisher-confirm(发送者确认)

发送者确认机制用来保证消息能够投递到交换机,如果没有正确的投递到交换机,返回ACK。

public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 1.消息体
    String message = "hello, spring amqp!";
    // 2.全局唯一的消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 3.添加callback
    correlationData.getFuture().addCallback(
        result -> {
            if(result.isAck()){
                // 3.1.ack,消息成功
                log.debug("消息发送成功, ID:{}", correlationData.getId());
            }else{
                // 3.2.nack,消息失败
                log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
            }
        },
        ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 4.发送消息
    rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);

    // 休眠一会儿,等待ack回执
    Thread.sleep(2000);
}

(二) 消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。

要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化
  1. 交换机持久化

SpringAMQP中可以通过代码指定交换机持久化:

@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new DirectExchange("simple.direct", true, false);
}
  1. 队列持久化

SpringAMQP中可以通过代码指定队列持久化:

@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}
  1. 消息持久化

利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:

  • 1:非持久化
  • 2:持久化

用java代码指定:

默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。

(三) 消费者确认机制

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

设想这样的场景:

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处理

这样,消息就丢失了。因此消费者返回ACK的时机非常重要。而SpringAMQP则允许配置三种确认模式:

•manual:手动ack,需要在业务代码结束后,调用api发送ack。

•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

由此可知:

  • none模式下,消息投递是不可靠的,可能丢失
  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可。

  1. none模式
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 关闭ack
  1. auto模式
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # Spring自动处理ack

(四) 失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。

  1. 本地重试

可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试,重试达到最大次数后,Spring会返回ack,消息会被丢弃。

  1. 失败策略

在本地重试达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

重试失败之后,消息会从simple.queue队列中自动入队到error.queue队列中存储。

六、死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。可以给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。

// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){
    return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
        .deadLetterExchange("dl.direct") // 指定死信交换机
        .build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:

● 消息所在的队列设置了超时时间

● 消息本身设置了超时时间

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.ttl.queue", durable = "true"),
    exchange = @Exchange(name = "dl.ttl.direct"),
    key = "ttl"
))
public void listenDlQueue(String msg){
    log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}

给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:

@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
        .ttl(10000) // 设置队列的超时时间,10秒
        .deadLetterExchange("dl.ttl.direct") // 指定死信交换机
        .build();
}

注意,这个队列设定了死信交换机为dl.ttl.direct

@Bean
public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
@Test
public void testTTLQueue() {
    // 创建消息
    String message = "hello, ttl queue";
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 发送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    // 记录日志
    log.debug("发送消息成功");
}
@Test
public void testTTLMsg() {
    // 创建消息
    Message message = MessageBuilder
        .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
        .setExpiration("5000")
        .build();
    // 消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 发送消息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
    log.debug("发送消息成功");
}

当队列、消息都设置了TTL时,任意一个到期就会成为死信。

七、延迟队列

利用死信交换机和TTL可以实现延迟队列效果,实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。

这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:

Community Plugins | RabbitMQ

使用方式可以参考官网地址:Scheduling Messages with RabbitMQ | RabbitMQ

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

  • 接收消息
  • 判断消息是否具备x-delay属性
  • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
  • 返回routing not found结果给消息发送者
  • x-delay时间到期后,重新投递消息到指定队列

插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。

声明DelayExchange交换机:

发送消息:

八、惰性队列

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。

解决消息堆积有两种思路:

  • 增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
  • 扩大队列容积,提高堆积上限

要提升队列容积,把消息保存在内存中显然是不行的。

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

基于@Bean声明lazy-queue

基于@RabbitListener声明LazyQueue

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐