RabbitMQ 超详细入门篇
RabbitMQ 入门篇????MQ 的基本概念:什么是 MQ ?MQ全称为Message Queue即消息队列"消息队列" 是在消息的传输过程中保存消息的容器它是典型的:生产者————消费者模型生产者不断向消息队列中生产消息 ———————— 消费者不断的从队列中获取消息.这样的好处: 生产者只需要关注发消息,消费者只需要关注收消息,二者没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦.为什
RabbitMQ 入门篇🚪
MQ 的基本概念:
什么是 MQ ?
MQ全称为Message Queue即消息队列
-
"消息队列"
是在消息的传输过程中保存消息的容器 -
它是典型的:生产者————消费者模型
生产者不断向消息队列中生产消息 ———————— 消费者不断的从队列中获取消息.
这样的好处: 生产者只需要关注发消息,消费者只需要关注收消息,二者没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦.
为什么要使用 MQ?
或者说MQ 有什么好处,MQ 主要可以实现三种功能:
服务解耦
-
场景:服务A产生数据, 而服务B,C,D需要这些数据
那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可;
-
但
随着我们的应用规模不断扩大,会有更多的服务需要A的数据
如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况 A服务中调用代码的维护会极为困难
程序非常的耦合
-
而 ,通过 MQ消息队列 可以实现,对程序的
解耦
A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据
下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可
流量削峰
-
场景:
我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对 √
但,在高峰期, 访问量瞬间翻了十倍, 达到每秒3000次请求, 单台服务器无法应对
我们增加到10台服务器,减压
-
而,很多时候这种高压 每天只出现一次,每次只有半小时
那么我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了
-
使用MQ来进行流量削峰
我们可以对于这种,可能会突然产生高请求的功能,设置一个MQ
当用户发起请求后台并不会立刻处理,而是通过 MQ 发送一个请求,发送到队列里面,排队等待处理…
我们的后台,接收者,发现队列中有消息,一个一个的取出,进行后台处理…
避免了同一时刻大量的请求,而处理不过来导致 服务崩溃~
异步调用
-
场景:
对于有些服务之间的调用会有很长时间的响应,而用户并不能接受这么时间的响应:
A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完给页面响应…
-
外卖支付
相信大家都点过外卖,用户支付完成之后,到真正外卖到手是一个很漫长复杂的过程~
我们不可能一直停留在页面上进行等待~
支付后————发送支付成功的通知————再寻找外卖小哥来进行配送…
而寻找外卖小哥的过程非常耗时,高峰期,可能要等待几十秒甚至更长,这样就造成整条调用链路响应非常缓慢
-
MQ解决方案:
用户下单,订单数据可以发送到消息队列服务器,立刻响应客户端
为您寻找骑手,整条链路的响应时间只有200毫秒左右
消息接收方,监听获取每一个订单消息后台缓慢的寻找外卖小哥~
AMQP 和 JMS ⭐
AMQP 和 JMS 是目前市面上常见的两种 消息队列协议
AMQP
-
AMQP
高级消息队列协议!
是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS
RabbitMQ 就是基于 AMQP 协议实现的
JMS
-
JMS
Java 消息服务
JMS的客户端之间可以通过
JMS服务
进行异步的消息传输 -
JMS(Java Message Service,Java消息服务)API是一个消息服务的标准或者说是规范
就像JDBC一样通过接口定义一组规范,不同的实现尝试实现对于的驱动来完成开发...
它使分布式通信耦合度更低,消息服务更加可靠以及异步性。 ActiveMQ 就是基于 JMS 规范实现的
总结:
规范:
-
AMQP 为消息定义了线路层(wire-level protocol)的协议
-
JMS所定义的是API规范
跨平台
- Java 体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差
- AMQP天然具
有跨平台、跨语言特性
支持消息类型
JMS
支持TextMessage、MapMessage 等复杂的消息类型AMQP
仅支持 byte[] 消息类型(复杂的类型可序列化后发送
Exchange 交换机
提供的路由算法
- AMQP可以提供多样化的路由方式来传递消息到消息队列 4种交换机类型,6种模式
- JMS 仅支持 队列 和 主题/订阅 方式两种
常见MQ产品:
- ActiveMQ:基于JMS,
早期的MQ框架,现在已经很少使用了
- Kafka:分布式消息系统,高吞吐量
- RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
本篇学习😶
- RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
RabbitMQ
-
RabbitMQ是由
erlang
语言开发,所以安装环境需要安装erlang
-
基于
AMQP
(Advanced Message Queue 高级消息队列协议
)协议实现的消息队列 -
它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛
RabbitMQ 的工作原理
组成部分:
Producer 消息生产者
- 消息生产者,即生产方客户端,生产方客户端将消息,
通过信道Channel
发送到MQ
Connection 连接对象
-
Producer /Consumer 和 broker 之间的 TCP 连接
程序通过,Connection连接对象将,创建出
Channel信道
:生产者通过 信道 将消息发送给MQ
消费者通过 信道 获取到MQ的消息~
-
Channel 信道:
如果每一次访问 RabbitMQ 都建立一个 Connection,消息量大的时候,对于性能也是巨大的;
Channel 是在 connection 内部建立的逻辑连接,为 Connection 减少了操作系统建立 TCP connection 的开销;
细节不详细介绍
可以理解为是一个,消息数据传递的一个
通到
可以通过它,来创建配置,
生产者|消费者 与MQ通信
声明设置绑定:交换机|队列
Broker 可以认为是 MQ
-
消息队列服务进程
:此进程包括两个部分:Exchange交换机
和Queue队列
-
Exchange交换机
是 RabbitMQ 非常重要的一个部件
一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中
-
Queue 队列
RabbitMQ 内部使用的一种数据结构
队列
队列就像是一个“吸管” 一边吸水一边出水,遵循 “先进先出”原则;
生产者发消息——交换机——转发到队列上
是真正消息存储的地方~
Consumer 消息消费者
- 消息消费者,即消费方客户端,
通关信道Channel
接收MQ转发的消息,并进行相关的处理;
-----发送消息-----
- 生产者通过 Connection 和Broker建立TCP连接。
- Connection 建立 Channel 通道
- 生产者通过信道,将消息发送给Broker(MQ),由Exchange将消息进行转发~
队列中去!
-----接收消息-----
- 消费者通过 Connection 和Broker建立TCP连接
- Connection 建立 Channel 通道
- 消费者监听指定的Queue(队列),当有消息到达Queue时Broker默认将消息,通过 Channel 推送给消费者
Exchange 交换机四种类型 ⭐
RabbitMQ消息传递模型的核心思想是:
-
生产者永远不会将任何消息直接发送到队列,通常生产者甚至不知道消息是否会被传递到任何队列 生产者只能向交换机(Exchange)发送消息
-
交换机是一个非常简单的东西,一边接收来自生产者的消息,另一边将消息推送到队列.
-
RabbitMQ 的交换机具有很多中类型,可以完成很多种复杂的场景操作:
交换机类型:
-
fanout: 广播模式
发布/订阅
,交换机给所有的队列,发送相同的消息; -
direct : 路由模式
routing key
交换机,根据对应的routing key
的队列上发送消息; -
topic: 动态路由模式,可以用过一定的规则定义
roting key
使 交换机动态的多样性选择队列
* 表示一个单词
# 表示任意数量(零个或多个)单词
-
headers: 请求头模式,
目前用的很少了
,就像请求头一样,发送消息时候附带头部数据
,交换机根据消息的头部信息匹配对应的队列;
RabbitMQ环境搭建 ✔
本次搭建是Linux 的 如果有朋友是Win的话可以参考这篇文章:🚀
工具准备🔨:
RabbitMQ是由erlang
语言开发,所以安装环境需要安装 erlang
- erlang-21.3.8.21-1.el7.x86_64.rpm
erlang环境
- rabbitmq-server-3.8.8-1.el7.noarch.rpm
rabbit安装
官网下载,如果没有的话也可以底部本人的网盘下载
环境搭建🏚:
本人使用的是 阿里云服务器
没有的话也可以使用虚拟机… 事先使用连接工具上传了文件
本人喜欢把工具都安装在 /usr/wsm
目录下:
[root@iZj6ciuzx7luldnazt4iswZ ~]# cd /
[root@iZj6ciuzx7luldnazt4iswZ /]# ls
bin dev home lib lost+found mnt patch root sbin sys usr www
boot etc install.sh lib64 media opt proc run srv tmp var
[root@iZj6ciuzx7luldnazt4iswZ /]# cd usr
[root@iZj6ciuzx7luldnazt4iswZ usr]# ls
bin etc games include lib lib64 libexec local sbin share src tmp
[root@iZj6ciuzx7luldnazt4iswZ usr]# mkdir wsm
[root@iZj6ciuzx7luldnazt4iswZ usr]# ls
bin etc games include lib lib64 libexec local sbin share src tmp wsm
[root@iZj6ciuzx7luldnazt4iswZ usr]# cd wsm
[root@iZj6ciuzx7luldnazt4iswZ wsm]# ls
erlang-21.3-1.el7.x86_64.rpm rabbitmq-server-3.8.8-1.el7.noarch.rpm #上传的两个文件
解压安装:
# 解压安装 erlang
rpm -ivh erlang-21.3.8.21-1.el7.x86_64.rpm
# 云下载一个 初始化一些配置, 过程比较慢请耐心等待~, 在这之后才可以进行 安装 RabbitMQ
yum install socat -y
# 解压安装 rabbitmq
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
ok ,安装完毕了解一些 RabbitMQ 命令:
# 启动服务
systemctl start rabbitmq-server
# 查看服务状态
systemctl status rabbitmq-server
# 开机自启动
systemctl enable rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
# 重启服务
systemctl restart rabbitmq-server
注意:这里只是把RabbitMQ 服务
给搭建好了,为了方便操作我们还需要安装一个web控制面板
# 安装web控制面板
rabbitmq-plugins enable rabbitmq_management
# 安装完毕以后,重启服务即可
systemctl restart rabbitmq-server
# 访问 http://服务器ip:15672 ,用默认账号密码(guest)登录,出现权限问题
# 默认情况只能在 localhost 本机下访问,所以需要添加一个远程登录的用户
# 创建账号和密码: admin 123456
rabbitmqctl add_user admin 123456
# 设置用户角色,用户级别: administrator monitoring policymaker managment
rabbitmqctl set_user_tags admin administrator
# 为用户添加资源权限
# set_permissions [-p <vhostpath>] <user> <conf> <write> <read> # 添加配置、写、读权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
##### 扩展一些命令:#####
关闭应用的命令为: rabbitmqctl stop_app
清除的命令为: rabbitmqctl reset
重新启动命令为: rabbitmqctl start_app
- 如果是阿里云的服务器
别忘记开启端口 还有 关闭防火墙~
用户级别:
- administrator:可以登录控制台、查看所有信息、可以对 rabbitmq 进行管理
- monitoring:监控者 登录控制台,查看所有信息
- policymaker:策略制定者 登录控制台,指定策略
- managment:普通管理员 登录控制台
主要端口介绍:阿里云建议将这些都打开~
-
4369 – erlang发现口
-
5672 – client端通信口
-
15672 – 管理界面ui端口
-
25672 – server间内部通信口
测试是否可以访问:
访问页面:
-
Overview
概览 RabbitMQ 的整体情况,也可以查看
集群各个节点的信息 情况
MQ 各个端口映射信息
-
Connection
该 选项专栏 下是MQ 与各个
生产者
消费者
连接情况. -
Channels
这里展示,各个
通道 与 连接的关系
-
Exchanage
展示所有的 交换机
-
Queue
展示所有的 队列
-
Admin
这里管理着,MQ 所有的操作用户~
RabbitMQ 管理页面:
Overview
Connections
- Name 连接名
点击连接名, 还可以查看详细的信息~
- User name 当前连接登录MQ 的用户
- State 当前连接的状态,
running 运行
idle 空闲
- SSL|TLS 是否使用的是
SSL|TLS协议
- Peotocol
AMQP 0-9-1
指的是AMQP 的协议版本号 - Channels 当前连接创建通道的 通道总数
- From client 每秒发出的消息数
- To client 每秒接收的消息数
Channels
记录各个连接的信道:
一个连接IP 可以有多个信道
多个通道通过多线程实现,不相互干扰 我们在 信道中创建:队列 交换机 ...
生产者的通道一般使用完之后会立马关闭,消费者是一直监听的…
-
Channel 通道名称
-
User Name 该通道,创建者 用户名
-
Model 通道的
确认模式
C confirm模式
T 表示事务
-
State 通道当前的状态
running 运行
idie 空闲
-
Unconfirmed 待确认的消息数
-
Prefetch
预先载入
Prefetch 表示每个消费者最大的能承受的未确认消息数目
简单来说就是用来指定一个消费者一次可以从 RabbitMQ 中获取多少条消息并缓存在消费者中,
一旦消费者的缓冲区满了,RabbitMQ 将会停止投递新的消息到该消费者中直到它发出有消息被 ack 了
消费者负责不断处理消息,不断 ack,然后只要
UnAcked
数少于Prefetch * consumer
数目,RabbitMQ 就不断将消息投递过去 -
Unacker 待 ack 的消息数
-
publish 消息生产者发送消息的
速率
-
confirm 消息生产者确认消息的
速率
-
unroutable
drop
表示消息,未被接收,且已经删除的消息. -
deliver / get 消息消费者获取消息的
速率
-
ack 消息消费者 ack 消息的速率.
MQ 的 ACK机制:100%消息消费!
Exchange
Queue
- Name 表示消息队列的名称
- Type 消息队列的类型…
- Features:表示消息队列的特性,D 表示消息队列持久化
- State:表示当前队列的状态,running 表示运行中;idle 表示空闲
- Ready:表示待消费的消息总数
- Unacked:表示待应答的消息总数
- Total:表示消息总数 Ready+Unacked
- incoming:表示消息进入的速率
- deliver/get:表示获取消息的速率
- ack:表示消息应答的速
Admin
Java 集成 RabbitMQ 案例
创建一个Maven项目并使用 git 进行管理, wlog.md
文件进行着项目日志的记录✍~
引入RabbitMQ
的依赖:
pom.xml
<dependencies>
<!-- rabbitMQ 依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
</dependencies>
简单模式 Hello Word:
如图,显而易见,非常简单就是一个一发一读
的过程…
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
发送者
Producer.Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** 消息生产者 **/
public class Producer {
// 定义队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.243.109.199");
factory.setUsername("admin");
factory.setPassword("123456");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
//创建连接对象
Connection connection = factory.newConnection();
//根据连接对象,获取信道
Channel channel = connection.createChannel();
/**设置消息队列的属性!
* queue :队列名称
* durable :是否持久化 如果持久化,mq重启后队列数据还在! (队列是在虚拟路径上的...)
* exclusive :队列是否独占此连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* autoDelete :队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* arguments :队列参数 null,可以设置一个队列的扩展参数,需要时候使用!比如:可设置存活时间
* */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**发送消息,参数:
* exchange :指定的交换机,不指定就会有默认的....
* routingKey :路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机routingKey设置为队列的名称
* props :消息包含的属性: 后面介绍,可以是一个一个对象... 消息持久化配置...
* body :发送的消息,AMQP以字节方式传输...
* */
channel.basicPublish("", QUEUE_NAME, null, "Hello Word你好世界".getBytes());
System.out.println("消息发送完毕");
}
}
消费者
Consumer.Java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** 消息消费者 **/
public class Consumer {
// 定义队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.243.109.199");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息.........");
//收到消息后用来处理消息的回调对象
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息 - 接受消息
* queue 消费哪个队列
* autoAck 消费成功之后是否要自动应答 true 代表自动应答 false 手动应答,要通过编程实现回复验证,这就是Unacked 为返回ack的数据
* deliverCallback 消费方法,当消费者接收到消息要执行的方法, 参数是一个函数式接口可以使用 lambda表达式~
* cancelCallback 消息被取消时的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
-
建议先启动
消费者
在启动发送者
可以看到,消费者启动之后,在等待 发生者 发送消息,发送者启动发送消息,
消费者控制台会立刻接收到消息!
-
MQ 发送者一般情况下都不会直接忘队列发消息
这种情况下MQ 都会有一个默认的交换机~
工作模式 Work Queues
工作模式 相当于 简单模式的 升级版!
- 多个消费者,对应一个发送者,发送者 产生的消息存在队列种,队列会以
复杂均衡形式
轮询的发送给多个消费者
一般应用于:发送方事务简单,接收方事务复杂…
美团外卖:用户下单——后台内部要联系商家 骑手 生产订单 处理...
抽取工作类:
因为上面示例我们知道,创建交换机|队列 需要Channel信道
交换机 队列是创建在信道里面的
- 而每次创建交换机的时候,都要创建一次
Connection
Channel
- 于是我们可以将它抽离出一个工具类
MQChannelUtil.Java
MQChannelUtil.Java
- com.wsm目录下创建一个
util包
专门用来存储工具类🛠
import com.rabbitmq.client.Channel; //导入MQ的包~
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/** RabbitMQ 连接配置类: **/
public class MQChannelUtil {
//得到一个连接的 channel
public static Channel getChannel() throws Exception {
//创建一个连接工厂, 设置连接: IP 端口 用户 密码
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("42.192.149.71");
// factory.setPort("设置对应的端口,默认就是: 5672");
factory.setUsername("admin");
factory.setPassword("123456");
//创建连接对象 信道对象
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
发送者
Producer.Java
import com.rabbitmq.client.Channel;
import com.wsm.Util.MQChannelUtil;
import java.util.Scanner;
/** 消息生产者 **/
public class Producer {
// 定义队列名称
private final static String QUEUE_NAME = "Word";
public static void main(String[] args) throws Exception {
// 工具类创建一个信道
Channel channel = MQChannelUtil.getChannel();
// Java控制台测试法消息:
Scanner scanner = new Scanner(System.in);
//创建交换机
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 循环多次发布消息:
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完毕");
}
}
}
消费者1
Consumer1.Java
import com.rabbitmq.client.*;
import com.wsm.Util.MQChannelUtil;
/** 消息消费者 **/
public class Consumer1 {
// 定义队列名称
private final static String QUEUE_NAME = "Word";
public static void main(String[] args) throws Exception {
// 工具类创建一个信道
Channel channel = MQChannelUtil.getChannel();
//收到消息后用来处理消息的回调对象
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
/** 消费者消费消息 - 接受消息: 注意参数两个回调函数~ */
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
消费者2
Consumer2.Java
和消费者1 一模一样换一个名字,两个消费者监听一个队列 进行数据处理....
结果测试:
消息被轮询消费
:
- 通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息
- MQ 发送消息,一般情况下只会被一个
消费者执行消费
消费者执行之后, 队列就会将消息删除,(ACK机制...
- 后面可以通过,交换机模式完成,一个消息被多个消费者消费…
消息确认接收机制 ACK
消息一旦被消费者接收,队列中的消息就会被删除
RabbitMQ怎么知道消息被接收了呢?
-
如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败!
-
但是RabbitMQ无从得知,这样消息就丢失了!
因此,RabbitMQ有一个ACK机制
- 当消费者获取消息后,会向RabbitMQ发送回执ACK 告知消息已经被接收。不过这种回执ACK分两种情况:
- 自动ACK: 消息一旦被接收,消费者自动发送ACK
- 手动ACK: 消息接收后,不会发送ACK,需要手动调用
自动ACK
-
RabbitMQ
默认此种模式:
消息发送后立即被认为已经传送成功!
消费者 接收到消息,就向队列发送ack,队列立刻就删除消息
-
这种模式需要在高吞吐量和数据传输安全性方面做权衡 仅适用在消费者可以高效并以 某种速率能够处理这些消息的情况下使用
手动ACK
-
消息接收后,不会发送ACK,需要手动代码进行调用
待消费者 执行完毕之后,在通过代码向 队列发送ack,队列接收到ack 之后会将消息删除!
-
channel.basicAck(long deliveryTag,boolean multiple); 用于肯定确认
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
参数1 long类型,表示处理的消息标识,MQ没发送一个消息都一个对于该消息的唯一标识…
就像序列化 序列号一样,用于网络传输..
参数2 boolean类型,表示是否支持批量处理
-
channel.basicNack(deliveryTag, false, true); 用于否定确认, 消费者 消息执行过程中失败,或服务器挂机…
参数1
同上,消息的唯一标识
参数2
表示是否支持批量处理
参数3
requeue true则重新入队列 false丢弃或者进入死信队列
-
channel.basicReject(deliveryTag, true); 用于否定确认
参数1
同上
参数2
requeue true则重新入队列 false丢弃或者进入死信队列
与 Channel.basicNack 相比少一个参数,不可以进行批量处理…
Multiple 批量消息处理:
-
true 代表批量应答处理
比如,现在队列上存在
1 2 3 4
四个消息,都发送给了消费者,而消费者逐一处理,4
结束了.不管是否
ACK|NACK
都直接将,其它的1 2 3
都以相同的,方式进行 批量处理!好处:在MQ 服务,稳定的时候,支持大量的消息处理速度…
缺点,容易造成数据丢失💀...
-
flase
建议使用,不批量应答
就是, 一次只处理当前消息的
ACK|NACK
消息自动重新入队
消费者设置了手动ACK 之后....
如果消费者由于某些原因失去连接 其通道已关闭,连接已关闭或 TCP 连接丢失
导致消息未发送 ACK 确认
- 消费者监听 队列消息,消费者开始处理,但是处理过程中,消费者突然与MQ 连接断开
消费者服务挂了
- MQ 正常情况下会与 消费者建立连接,当消费者突然断开,一段时间没有返回,消息处理的 ack,MQ就会当作消费者出现故障. 将消息重新交给其它消费者处理!心跳机制♥
生产者
Producer.Java
import com.rabbitmq.client.Channel;
import com.wsm.Util.MQChannelUtil;
import java.util.Scanner;
/** 消息生产者 **/
public class Producer {
// 定义队列名称
private final static String QUEUE_NAME = "ack_test";
public static void main(String[] args) throws Exception {
// 工具类创建一个信道
Channel channel = MQChannelUtil.getChannel();
// Java控制台测试法消息:
Scanner scanner = new Scanner(System.in);
// 创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 循环多次发布消息:
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完毕");
}
}
}
消费者1
Consumer1.Java
- basicConsume(); 消费者监听消息方法,第二个参数:
true自动ack
false手动ack
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wsm.Util.MQChannelUtil;
/** 消息消费者 **/
public class Consumer2 {
// 定义队列名称
private final static String QUEUE_NAME = "ack_test";
public static void main(String[] args) throws Exception {
// 工具类创建一个信道
Channel channel = MQChannelUtil.getChannel();
//收到消息后用来处理消息的回调对象
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 为了方便查看处理效果,我们将消费者 线程休眠一段时间 模拟处理数据;
try {
Thread.sleep(10000); // 毫秒 *1000;
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody());
System.out.println(message);
/** 接收处理完毕消息之后给MQ 回复ack
* 参数1 消息的唯一标识Tag
* 参数2 是否支持批量回复,一般建议false 保证数据安全!
* **/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
/** 消费者消费消息 - 接受消息: 注意参数两个回调函数~ */
/** 参数二 设置 false 手动应答 **/
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
}
}
消费者2
Consumer2.Java
和 消费者1 类似,为了方便测试更改了,线程休眠时间 30s
结果测试:
测试1
-
正常的发送消息,
AA BB CC DD
-
发现啊,消费者1 消费者2 还是轮询的进行消费数据,只不过消费者2 比较慢…
测试2
-
正常的发送消息,
AA BB CC DD
-
当消费者1 接收到消息后,消费者2 也正在接收消息,因为有
Thread.sleep(30000)
所以会有很长时间的处理,此时关闭消费者2
MQ 与 消费者2 断开连接,消费者2 也没有发送ACK 所以消费者2的消息将会
重回队列...又交给了 消费者1来进行消费!
-
当然,实际开发中因为
消费1 | 2 都存在处理了 BB
为了确保数据安全,还要进行幂等的处理!
RabbitMQ 持久化 简单
正常情况下,RabbitMQ 只是一个消息中间件 一边接收者生产者消息
等待消费者监听处理...
- 消息不会持久化保存在
队列中
- 如果:突然某天 RabbitMQ 突然挂机,那么就会造成数据的丢失:
队列中为处理的消息...
生产者最新发送的消息...
...
- 为了,保证数据的安全,我们需要将
队列
消息
都进行持久化处理,防止数据丢失~
队列持久化
然队列持久化非常简单, 只需要一个配置即可:
在声明队列的时候,就通过参数就可以完成对队列的持久化,注意:队列持久化 并不是 消息持久化 队列每次重启都会恢复但是内部的消息 还需要另外的配置!
// 让队列持久化
boolean durable = true; // false 不持久化 true 持久化
// 声明队列
channel.queueDeclare("队列名", durable, false, false, null); // 设置true 之后,每次MQ重启的时候,该队列都会自动重新在虚拟路径上自动加载...
消息持久化
需要在 生产者
发送消息的时候添加一个配置 MessageProperties.PERSISTENT_TEXT_PLAIN
,告诉MQ 这个消息很重要,要进行持久化保存!
// 发送者发送消息的时候,带上 MessageProperties.PERSISTENT_TEXT_PLAIN 告诉,MQ消息要进行持久化;
channel.basicPublish("交换机", "队列名", MessageProperties.PERSISTENT_TEXT_PLAIN, "要发送的消息,字节传输");
-
当然这里,
消息仍然存在丢失问题
当消息,刚发到MQ 中,还没有准备,
存储磁盘,消息还在缓存的一个"间隔点"
MQ 突然挂了… 也会影响到消息的 持久化;但这里对于,普通的存储已经绰绰有余了…
不公平 分发
前提是,设置消息手动ACK
在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发
-
但是在某种场景下这种策略并不是很好:
比方说有两个消费者在处理任务,
消费者 1 处理任务的速度非常快
消费者 2 处理速度却很慢
这个时候依然采用默认的
轮询分发
势必不太合理… -
为了避免这种情况,MQ 支持,我们切换不同的
分发模式
-
不公平 分发
我们可以通过 信道设置:
channel.basicQos(1);
意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务.
rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,如果没有空闲消费者,消息就会堆积在队列中去~
预取值 分发
预取值 分发
- 就是说,我们在设置
消息者 连接 队列
的时候,可以给 信道设置 预取值.
其实预取值,和 不公平分发 很类似
-
都是设置: 消费者 连接 信道 时候设置 ``channel.basicQos(?);` 消费者最高消息堆积数;
-
``channel.basicQos(0);` 默认0 轮询模式
-
``channel.basicQos(1);` 不公平分发模式
消费者1
消费者2
都设置,channel.basicQos(1);
消费者最高允许消息堆积数.两个消费者每次只能从,队列中拿一个消息进行消费,完了就立刻在从 队列中,在拿,这样做的快的消费者,自然就处理的消息多了!不公平分发
-
channel.basicQos(>1);
值大于>1
,预取值 分发消费者1
消费者2
分别跟据服务的性能设置,channel.basicQos(?);
消费者最高允许消息堆积数.假设: 消费者1
basicQos(2)
消费者2basicQos(5)
这样,假设队列中有消息:
1 2 3 4 5 6 7
,当最初消费者1 | 2
都空闲时候…消费者1获取1
消费者2获取2
消费者1获取3
消费者2获取4
消费者2获取5
消费者2获取6
....
消费者2 允许消息在信道中最大的堆积数 5当然也有可能会出现,消费者1 处理很快,消费者2 很慢,消费者1处理完1 3,消费者2还在处理2 4567,那后面的 8 9 10 都给1处理…
预取值 分发,就是预计这个消费者,性能高低,设置消费者 允许最高堆积?个消息等待这个处理!
RabbitMQ - 发布确认confirm
confirm
发布确认机制:
生产者将信道设置成 confirm 模式
-
一旦信道进入
confirm
模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID从1开始
-
发送者 —— 消息 —— 队列,上后
MQ
broker
就会发送一个确认给生产者
生产者就知道消息已经正确到达目的队列了. -
如果
消息队列 和 消息
进行了持久化设置
那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息已经,发送到队列上!
发布确认策略:
RabbiMQ 默认是没有开启 comfirm 发布确认机制
- 如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法
// 开启发布确认 channel.confirmSelect();
单个发布确认:
它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布
-
这种确认方式有一个最大的缺点就是:发布速度特别的慢,
一次只能发一个!
因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量;
-
优点:保证消息的100%发送,缺点:对于消息的发送实在太慢,对于大量数据不适合!
-
实现:
1. 在声明队列之后,开启发布确认,
channel.confirmSelect();
2. 通过信道
channel.waitForConfirms();
来判断当前消息是否发送成功!这个方法只有在消息被确认 的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常.
单个发布确认,只有一个消息发送 才能发送下一个消息
Producer.Java
/** 单个发布确认 **/
public static void singleConfirm() throws Exception {
Channel channel = MQChannelUtil.getChannel();
// 队列声明
channel.queueDeclare("singleConfirm", false, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 为了方便计算各个 发布确认策略 耗时: 开始-结束放一个系统时间获取毫秒数;
long begin = System.currentTimeMillis();
// 发送1000 个消息....
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", "singleConfirm", null, message.getBytes());
// 服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息发送成功");
}
// else{ /** 消息重发处理... **/ }
}
// 1000个消息结束...
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
}
本次执行耗时:67042毫秒
批量发布确认:
相比 单个发布确认
, 发送确认机制,实在是太慢了! 每次只能保证一个消息的发送成功,实在是太慢了!
批量发布确认:
-
其实和
单个发布确认
实现方式一样1. 开启发布确认
channel.confirmSelect();
2. 获取批量的消息,是否全部发送到MQ
channel.waitForConfirms();
-
waitForConfirms(); 方法()
单个发布 和 批量发布其实都一样,都是调用
waitForConfirms()
方法,查看当前消息是否都到达MQ不同的是,
单个发布每次发一条都验证
批量是在一定数量进行验证
waitForConfirms();
方法会使,当前 发生者线程进行阻塞,等待MQ 返回数据,MQ返回 上一次调用waitForConfirms() 到现在调用waitForConfirms() 所有发送的消息是否抵达MQ
全部抵达true 则false -
因此:批量发布确认,相当于 单个发布确认,一个批量执行,
大大节省了过程中冗余的一些步骤性能...
当然,如果其中有一个消息没有发送到MQ 它并不能确定是那一个 消息 没有抵达MQ
Producer.Java
/** 批量发布确认 **/
public static void batchConfirm() throws Exception {
Channel channel = MQChannelUtil.getChannel();
// 队列声明
channel.queueDeclare("batchConfirm", false, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 为了方便计算各个 发布确认策略 耗时: 开始-结束放一个系统时间获取毫秒数;
long begin = System.currentTimeMillis();
// 批量确认消息大小,当发送的消息数到 100 执行 waitForConfirms(); 询问MQ 当前所有的消息有没有抵达~
int batchSize = 100;
// 未确认消息个数, 每次发送消息 ++ 用于判断是否改批量验证消息发送;
int outstandingMessageCount = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
// 发送的消息,并发送~
String message = i + "";
channel.basicPublish("", "batchConfirm", null, message.getBytes());
// 每次发送一个消息进行计算当前是第几个,为批量验证的消息;
outstandingMessageCount++;
// 判断当前的 100 个消息有没有都发送的MQ上!
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}
本次执行耗时:1149毫秒
快速了很多...
异步发布确认:
异步发布确认 相对于上面两个比较复杂,但: 性价比 效率
具有显著的提升
- 它是采用回调的方式来完成,消息传递的可靠性.
实现原理:
-
① 发送者 只需要关注 发消息
**发送者 每次 给MQ 发送消息的时候,会默认的给每个消息带上一个
唯一的ID标识
**后面我们就可以通过这个标识来,确定是那一个消息发送 成功|失败
-
② 发送者,方法体中写一个 异步确认
监听器
addConfirmListener(ConfirmCallback,ConfirmCallback);
方法参数支持两个,ConfirmCallback类对象
一个表示接收消息做的事情
另一个是未接收到消息做的事情...
ConfirmCallback 是一个
函数式接口
, 支持 lambda表达式 和 内部类形式书写… -
③ 发送者 一直往MQ 上发送消息,MQ 每收到一个消息会,调用发送者的
addConfirmListener(ack,nack)
方法告知发送者,消息成功发送 | 或 未发送成功!
-
发送者,在根据:
addConfirmListener(ack,nack)
来处理,消息成功处理,消息失败处理…
Producer.Java
/** 异步发布确认 **/
public static void syncConfirm() throws Exception{
Channel channel = MQChannelUtil.getChannel();
// 队列声明
channel.queueDeclare("syncConfirm", false, false, false, null);
/** 开启发布确认 **/
channel.confirmSelect();
// 为了方便计算各个 发布确认策略 耗时: 开始-结束放一个系统时间获取毫秒数;
long begin = System.currentTimeMillis();
/** 步骤一: 创建一个线程安全的一个哈希表,用于记录每一个消息发送,这样MQ异步返回时候可以知道具体是那一个消息发送成功|失败 **/
/**
* 线程安全有序的一个哈希表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联: k,v 存储结构, k消息标识 v发送的消息体,每次发送消息前先存在集合中;
* 2.轻松批量删除条目只要给到序列号: 对于发现成功的消息,直接从集合中移除...
* 3.支持并发访问,Concurrent接口 是线程安全的; */
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
/** 步骤三: 编写回调监听器,因为: 消息发送出错要立刻进行监听所以,所以创建在发送消息之前; **/
/** ack 确认收到消息的一个回调 1.消息序列号 2.true 批量确认接受小于等于当前序列号的数据 false 确认当前序列号消息 */
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
System.out.println("消息成功接收:"+sequenceNumber);
// ConcurrentNavigableMap方法()返回的是小于|等于 K 的集合, true:小于等于 false:返回小于该序列号的数据集合;
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
// 清除该部分确认消息 confirmed 里保存的都是,MQ 已经接收的消息;
// 遍历 confirmed K, 根据 K 删除 outstandingConfirms 的值...
// outstandingConfirms 里面保存的都是,MQ 还未确认的消息...
}else{
//只清除当前序列号的消息
outstandingConfirms.remove(sequenceNumber);
}
};
// nack 消息失败执行{} 可以写,消息失败需要执行的代码...
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
// 这里就输出一下为被确认的消息...
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
};
// 发生者 等待MQ回调消息确认的 监听器, 本次程序值监听 ack成功的消息;
channel.addConfirmListener(ackCallback, null);
/** 步骤二: 发送者一直往MQ发送消息 **/
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
// channel.getNextPublishSeqNo() 获取下一个消息的序列号,通过序列号与消息体进行一个关联,全部都是未确认的消息体
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
// 发送消息;
channel.basicPublish("", "syncConfirm", null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}
耗时:96毫秒
超级快的好吧
- 发送者,只需要关注消息的发送,MQ 会将每条消息发布情况,回调给发生者
- 发送者每次发消息前,将消息存储在缓存中,成功了就删除,失败了就重新发送~
非常nice👍
RabbitMQ 交换机Exchange:
以上我们创建一个: 发送者
队列
消费者
就可以完成通信了
而,RabbitMQ 的核心思想是: 生产者 生产的消息不会直接发送到队队列上, 甚至不知道队列的存在,通过一个交换机
-
生产者,只需要关注 往交换机上发送消息即可!
交换机工作的内容非常简单:
一方面它接收来自生产者的消息
另一方面将它们推入队列
交换机必须确切知道如何处理收到的消息,这就的由交换机的类型来决定. -
Exchange 交换机类型:
fanout: 广播模式
发布/订阅
,交换机给所有的队列,发送相同的消息;direct : 路由模式
routing key
交换机,根据对应的routing key
的队列上发送消息;topic: 动态路由模式,可以用过一定的规则定义
roting key
使 交换机动态的多样性选择队列
-
* 表示一个单词
-
# 表示任意数量(零个或多个)单词
headers: 请求头模式,
目前用的很少了
,就像请求头一样,发送消息时候附带头部数据
,交换机根据消息的头部信息匹配对应的队列; -
无名交换机:
-
上面的 Demo案例,我们几乎没有对
交换机
进行任何的操作,但是,仍然可以进行消息发送|接收 -
因为:
channel.basicPublish("", "队列名", null, "发送的消息".getBytes());
对于 “” 空字符串的交换机,MQ 会有默认的交换机进行操作…
临时队列:
-
对于有些时候,我们需要连接一个队列, 而这个队列,并不常用,用完即丢的情况下,可以考虑使用:
临时队列
-
String queueName = channel.queueDeclare().getQueue();
让服务器 信道,给我们创建一个临时的队列,
随机队列名称
一旦我们断开了消费者的连接,队列将被自动删除
发布订阅模式 Publish/Subscribe 交换机类型:Fanout
Fanout 类型:
- 这种类型非常简单,它可以将,它知道的所有的消息,广播到所有队列中去. 也成为:广播模式
- 常见场景:
某某软件很多人关注/订阅了一个博主,博主一更新,所有的粉丝都收到更新消息!
Fanout 实战:
-
定义一个生产者,交换机,生产者不停的往交换机中发消息
-
交换机提前与 一个|多个队列
绑定
,每当有消息来的时候,交换机会将消息发送到所有的队列中去…每个消息者监听(订阅)一个队列,多个消息者可以同,监听到相同的消息;
生产者:
Producer.Java
/** 消息生产者 **/
public class Producer {
// 交换机名
public static final String EXCHANGE_NAME = "wsm";
public static void main(String[] args) throws Exception {
// 创建连接对象,声明交换机 发送消息
Channel channel = MQChannelUtil.getChannel();
/**
* 声明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型, 可以是String直接写,也可以是 枚举类型;
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
消费者1:
/** 消息消费者 **/
public class Consumer1 {
// 定义交换机名称
public static final String EXCHANGE_NAME = "wsm";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/** 生成一个临时的队列 队列的名称是随机的 当消费者断开和该队列的连接时 队列自动删除 */
String queueName = channel.queueDeclare().getQueue();
// 绑定: 把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串: Fanout模式 routingkey 没作用!
channel.queueBind(queueName, EXCHANGE_NAME, "");
// 发送回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("wsm 发布的最新消息:"+message);
};
// 消费者监听消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
消费者2:
因为:
-
队列使用了MQ自动生成,所有几乎代码无改动一模一样即可!
String queueName = channel.queueDeclare().getQueue(); 临时队列
-
生产者——发送消息> 交换机 ——将消息发送到对应的> 队列 ——> 消费者(监听队列,处理消息…
路由模式 Routing 交换机类型:DIRECT
DIRECT 模式:
-
和 Fanout 模式类似
-
绑定
交换机/队列
时候,需要指定 routing key一个队列,可以设置多个 routingkey
发送者发送消息, 会携带上 routingkey
-
队列在与交换机进行绑定的时候,会设置好 队列的
routingkey
生产者 往交换机上发送消息,交换机只会将消息 向匹配的队列上发送消息,
消费者 监听队列消息消费
DIRECT 实例:
-
创建一个生产者,同时发送两个消息,分别指定
Conkey1
Conkey2
-
创建两个接收者,
一个监听的队列 绑定交换机时指定 Conkey1
另一个绑定交换机时 绑定两个routingkey:Conkey1 Conkey2
-
启动:生成者 消费1 消费2 查看结果…
生产者:
Producer.Java
/** 消息生产者 **/
public class Producer {
// 交换机名
public static final String EXCHANGE_NAME = "DIRECT";
public static void main(String[] args) throws Exception {
// 创建连接对象,声明交换机 发送消息
Channel channel = MQChannelUtil.getChannel();
/**
* 声明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型, 可以是String直接写,也可以是 枚举类型; */
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
/** 发送消息 **/
channel.basicPublish(EXCHANGE_NAME, "Conkey1", null, "Conkey1 发送的消息".getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "Conkey2", null, "Conkey2 发送的消息".getBytes("UTF-8"));
}
}
消费者1:
Consumer1.Java
/** 消息消费者 **/
public class Consumer1 {
// 定义交换机名称
public static final String EXCHANGE_NAME = "DIRECT";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
/** 生成一个临时的队列 队列的名称是随机的 当消费者断开和该队列的连接时 队列自动删除 */
String queueName = channel.queueDeclare().getQueue();
// 绑定: 把该临时队列绑定我们的 exchange
// 参数二 设置该队列和交换和绑定的 routingkey
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey1");
// 发送回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("最新的消息是:"+message);
};
// 消费者监听消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
消费者2:
比 消费者1
多添加:
// 可以设置多个key
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey2");
-
可以多次调用
.queueBing("队列名","交换机","routingkey");
绑定 队列交换机 指定多个 routingkey -
生成者发送消息,时候指定消息的
routingkey
交换机根据 key 将对应的消息发送到 队列上进行处理!消费者2 可以同时接收到
Conkey1
和Conkey2
发送的消息
主题模式 Topic 交换机类型:TOPIC
该模式与Routingkey 非常类型,就相当于是一个 动态路由模式!!
-
TOPIC 就像是 DIRECT 的升级版
DIRECT 固定了
routingkey
而,TOPIC 可以动态的进行routingkey选择
使用上更加的个性化 -
主题模式 可以根据一些特殊的符合匹配多种 Routingkey 的匹配
通配符规则:
#:匹配一个或多个词
举例:wsm.# 等于:wsm.1 / wsm.w.s.m / wsm.sm .后多个单词
*:匹配不多不少恰好1个词
举例:wsm.* 等于:wsm.sm / wsm.m .后一个单词
TOPIC 实例:
- 修改上面的 DIRECT
- 修改生产者的
交换机类型
routingkey
,消费者交换机类型
绑定交换机时候队列的 routingkey
生产者:
Producer.Java
改变发送消息时候指定的 routingkey
还有交换机类型:TPOIC
//交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/** 发送消息 **/
channel.basicPublish(EXCHANGE_NAME, "Conkey.one", null, "Conkey.one 发送的消息".getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "Conkey.two.123", null, "Conkey.two.123 发送的消息".getBytes("UTF-8"));
消费者1:
Consumer1.Java
更改交换机类型 Topic
,接收消息,队列 交换机绑定时候,指定一下 routingkey通配符
// 参数二 设置该队列和交换和绑定的routingkey , Topic模式可以用过 通配符进行动态匹配: * 表示一个任意的单词;
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey.*");
消费者2:
// 参数二 设置该队列和交换和绑定的routingkey , Topic模式可以用过 通配符进行动态匹配: # 表示一个|多个任意的单词;
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey.#");
头部模式 交换机类型:HEAD
使用的很少,跟 TOPIC
动态路由类型,只不过它并不是通过 routingkey 进行消息与队列进行匹配
-
headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配
-
生产者发送消息的时候可以,给消息指定一个
head头部参数 map类型
-
交换机 与 队列绑定的时候也定义一组:
头部信息规则, 只有消息头部规则 和 队列的头部规则 匹配才能发送到对应的头部上!
不常用了解即可~
生产者:
Producer.Java
/** 消息生产者 **/
public class Producer {
// 交换机名
public static final String EXCHANGE_NAME = "HEADERS";
public static void main(String[] args) throws Exception {
// 创建连接对象,声明交换机 发送消息
Channel channel = MQChannelUtil.getChannel();
/**
* 声明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型, 可以是String直接写,也可以是 枚举类型; */
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
//发送的消息的消息头head map类型, 需要与队列的map 规范匹配才可以发送成功! 不然发送失败(交换机不知道往那个队列上发送;
HashMap<String ,Object> param = new HashMap<String, Object>();
param.put("id","1");
param.put("name","wsm");
//设置Map 匹配参数!
AMQP.BasicProperties.Builder builder=new AMQP.BasicProperties.Builder();
builder.headers(param);
/** 发送消息 **/
channel.basicPublish(EXCHANGE_NAME, "", builder.build(), "header的内容lalala~~".getBytes("UTF-8"));
}
}
消费者:
Consumer1.Java
/** 消息消费者 **/
public class Consumer1 {
// 定义交换机名称
public static final String EXCHANGE_NAME = "HEADERS";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
/** 生成一个临时的队列 队列的名称是随机的 当消费者断开和该队列的连接时 队列自动删除 */
String queueName = channel.queueDeclare().getQueue();
//设置队列上的 map 参数,用于匹配请求时候的参数!
//特殊参数 x-match 值 all 或 any
//all 在发布消息时携带的map 必须和绑定在队列上的所有map 完全匹配
//any 只要在发布消息时携带的有一对键值map 满足队列定义的多个参数map的其中一个就能匹配上
//注意: 这里是键值对的完全匹配,只匹配到键了,值却不一样是不行的;
HashMap<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","all");
param.put("id","1"); // 可以尝试改变,map 信息,生产者消息还能发送到队列上面~
param.put("name","wsm");
// 绑定: 把该临时队列绑定我们的 exchange
// 队列绑定时需要指定参数,注意虽然不需要路由键但仍旧不能写成null,需要写成空字符串"", 参数四: map参数,规范!
channel.queueBind(queueName, EXCHANGE_NAME, "",param);
// 发送回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("最新的消息是:"+message);
System.out.println("Map传入参数数据:"+delivery.getProperties().getHeaders());
};
// 消费者监听消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
Head 匹配规则:
//消费者:队列规则all 所有匹配即可
Map<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","all");
param.put("id","1");
param.put("name","wsm");
//生产者:传入head map;
Map<String ,Object> param = new HashMap<String, Object>();
param.put("id","2");
param.put("name","wsm");
//不匹配
//生产者
Map<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","all");
param.put("id","1");
param.put("name","wsm");
//匹配
//消费者:队列规则any 一个匹配即可
Map<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","any");
param.put("id","1");
param.put("name","wsm");
//生产者:传入head map;
Map<String ,Object> param = new HashMap<String, Object>();
param.put("id","1"); //匹配
//生产者:传入head map;
Map<String ,Object> param = new HashMap<String, Object>();
param.put("id","2"); //不匹配 key /value 都要匹配才可以
....
死信队列 DLX 💀
死信,顾名思义就是无法被消费的消息
正常情况下:
- producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费
但
-
某些时候由于特定的原因导致 queue 中的某些消息无法被消费
-
这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列
应用场景:
-
用户在商城下单成功并点击去支付后在指定时间支付,
下单成功
,指定时间内订单未支付下单失败
-
用户下单,向MQ 发送一条订单的消息,并设置消息时间 30分钟,到队列
succeed 成功队列
等待用户确认订单,支付订单,消息被消费 下单成功
如果 30 分钟用户没有下单,则
succeed 成功队列消息
超时,为了确保消息不丢失,将消息发送到死信交换机 —— defeated死信队列
消费者接收处理:下单失败!
死信队列:
- 相当于对于一个,特定时间|场景 需要被处理的事情,但因为某种原因没有正常处理,的一个兜底操作…
死信队列产生:
-
消息 TTL 过期
TTL是Time To Live的缩写, 也就是生存时间,
订单超时支付 下单失败
-
队列达到最大长度
队列满了,无法再添加数据到 mq 中
-
消息被拒绝
(basic.reject 或 basic.nack) 并且 requeue=false,
订单被用户取消 下单失败
死信队列的实现:
- 正常的
生产者 交换机 普通队列
- 不正常的,为了保证普通队列,消息稳定:
当消息出现意外, 普通队列上配置了 DXL交换机
,消息超时 超出队列... 直接发送到 DLX交换机———— DLX队列
消费TTL 过期⏰
生产者:
Producer.Java
/** 消息生产者 **/
public class Producer {
// 普通交换机名
public static final String EXCHANGE_NAME = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//设置消息的 TTL 时间 10s秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//该信息是用作演示队列个数限制
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(EXCHANGE_NAME, "zhangsan", properties, message.getBytes());
System.out.println("生产者发送消息:" + message);
}
}
}
消费者:
Consumer1.Java
/** 消息消费者 **/
public class Consumer1 {
// 普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
// 声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定:队列、交换机、路由键(routingKey)
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
// 正常队列绑定死信队列信息
HashMap<String, Object> params = new HashMap<>();
// 正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
// 正常队列
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息" + message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
先启动消费者,创建声明好队列之后,关闭,启动生产者发送消息…
DLX 消费者:
Consumer2.Java
/** 消息消费者 **/
public class Consumer2 {
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
//声明交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收到消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
队列达到最大长度
消息生产者代码去掉 TTL 属性
C1 消费者修改以下代码 (启动之后关闭该消费者 模拟其接收不到消息)
//设置正常队列的长度限制,例如发10个,4个则为死信 注意:此时需要把原先队列删除 因为参数改变了
params.put("x-max-length",6);
C2 消费者代码不变(启动 C2 消费者)
消息被拒绝
消息生产者代码同上 队列达到最大长度
生产者一致
C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息) 注释 params.put("x-max-length",6);
C2 消费者代码不变 ,启动消费者1关闭
然后再启动消费者 2
SpringBoot 集成 RabbitMQ 模式开发
延迟队列,其实就是 死信队列
的一种,所有为了方便查看,使用SpringBoot 来进行搭建顺便了解学习一些SpringBoot 集成 RabbitMQ
① 创建SpringBoot 工程 启动类....
② 引入Maven依赖:pom.xml
<dependencies>
<!-- SpringBoot依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 阿里巴巴fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<!-- lombok依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
③ 编写配置文件:application.yml | properties
# SpringBoot 配置RabbitMQ ip 端口 用户 密码;
spring.rabbitmq.host=47.243.109.199
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
延迟队列 TTL ⏰
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上
延时队列中的元素是希望 在指定时间到了以后或之前取出和处理 简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列
延迟队列使用场景:
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
- 用户注册成功后,如果三天内没有登陆则进行短信提醒
**这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务: **就几乎等于一个 死信队列
RabbitMQ 的 TTL
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒 RabbitMQ 有两种方式:
队列设置TTL
在创建队列的时候设置队列的“x-message-ttl”属性
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 10000);
//队列绑定交换机
QueueBuilder.durable(QUEUE_A).withArguments(args).build();
消息设置TTL
是针对每条消息设置TTL 生产者 生产消息时候设置:
// Spring方式
// 编辑参数
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
// 发送消息
channel.basicPublish("交换机", "队列", properties, "消息".getBytes());
// SpringBoot方式
// 通过 rabbitTemplate 发送消息...
rabbitTemplate.convertAndSend("交换机", "队列", "消息", correlationData -> {
correlationData.getMessageProperties().setExpiration("10000");
return correlationData;
});
队列设置TTL
代码实现:
-
创建一个交换机 X 和死信交 换机 Y,它们的类型都是direct
-
创建两个队列 QA 和 QB
两者队列 TTL 分别设置为 10S 和 40S
,消息超时会进入到死信交换机 —— 发送到 QD死信队列
交换机 绑定 队列 配置文件:
因为,项目采用SpringBoot 进行管理 原先配置队列信息,写在了生产者和消费者代码中,现在可写咋配置类中,生产者只发消息,消费者只接受消息
TtlQueueConfig.Java
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlQueueConfig {
// 普通交换机 普通队列
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
// 死信交换机
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
// 死信队列
public static final String DEAD_LETTER_QUEUE = "QD";
// 声明 xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
// 声明 死信队列交换机
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//声明队列 A ttl 为 10s 并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA() { // 导包: org.springframework.amqp.core
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//声明队列 B ttl 为 40s 并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
//声明队列 B 绑定 X 交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}
//声明死信队列 QD
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}
//声明死信队列 QD 绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
生产者:
controller 用户发送一个请求,服务将数据进行处理直接发送到MQ,由其它服务模块处理…
SendMsgController .Java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
// RabbitTemplate 对rabbitmq 的服务接口API 进行了封装;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);
}
}
消费者:
DeadLetterQueueConsumer .Java
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
/**
* 消费者 - 死信队列
* @author wsm
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
//@RabbitListener 负责监听具体那个队列...
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}
-
@RabbitListener 声明在放上,处理要监听的队列,消息进行处理...
-
流量器请求
http://localhost:8080/ttl/sendMsg/拉拉拉
-
间隔 10s 40s 控制台输出:
2022-01-23 19:12:01.380 INFO 1508 --- [nio-8080-exec-3] c.example.controller.SendMsgController : 当前时间:Sun Jan 23 19:12:01 CST 2022,发送一条信息给两个 TTL 队列:嘻嘻嘻 2022-01-23 19:12:11.684 INFO 1508 --- [ntContainer#0-1] c.e.consumer.DeadLetterQueueConsumer : 当前时间:Sun Jan 23 19:12:11 CST 2022,收到死信队列信息消息来自 ttl 为 10S 的队列: 拉拉拉 2022-01-23 19:12:41.566 INFO 1508 --- [ntContainer#0-1] c.e.consumer.DeadLetterQueueConsumer : 当前时间:Sun Jan 23 19:12:41 CST 2022,收到死信队列信息消息来自 ttl 为 40S 的队列: 拉拉拉
消息设置TTL 存在问题bug
上面通过,SpringBoot 集成了 RabbitMQ 通过 队列设置TTL
-
如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列, 所以一般建议使用:
消息设置TTL
-
生产者 每次发送消息的时候,设置消息的存活时间,这样:
即使只有一个队列,也可以设置不同消息的 延迟时间
修改上面业务,添加一个 QC
普通队列,不设置队列 延迟时间
… 每次发送消息给消息设置延迟时间...
MQ 配置文件:
MsgTtlQueueConfig.Java
@Configuration
public class MsgTtlQueueConfig {
// 死信交换机
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
// 普通的队列
public static final String QUEUE_C = "QC";
//声明队列 C 死信交换机
@Bean("queueC")
public Queue queueB() {
HashMap<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//没有声明 TTL 属性
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
//声明队列 B 绑定 X 交换机
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
生产者:
SendMsgController.Java
/**
* 延时队列优化
* @param message 消息
* @param ttlTime 延时的毫秒
*/
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
// 发送消息的时候,设置消息的延迟时间...
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);
}
消费者:
不需要任何更改,只需要等待接收消息即可…
结果查看 存在bug
浏览器请求:
-
http://localhost:8080/ttl/sendExpirationMsg/你好1/20000
-
http://localhost:8080/ttl/sendExpirationMsg/你好1/4000
-
因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,
如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行 这是RabbitMQ 的bug 好在已经存在插件可以解决该问题💡
消息设置TTL 插件解决bug
安装插件:rabbitmq_delayed_message_exchange
# 工具引入,插件安装包;
[root@iZj6ciuzx7luldnazt4iswZ wsm]# ls
erlang-21.3-1.el7.x86_64.rpm rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq-server-3.8.8-1.el7.noarch.rpm
# 将插件移动到 RabbitMQ的plugins 包下: /usr/lib 是linux 默认安装服务路径...
[root@iZj6ciuzx7luldnazt4iswZ wsm]# cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
# rabbitmq 开始安装启动插件补丁...
[root@iZj6ciuzx7luldnazt4iswZ wsm]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@iZj6ciuzx7luldnazt4iswZ:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@iZj6ciuzx7luldnazt4iswZ...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
# 重启服务:
[root@iZj6ciuzx7luldnazt4iswZ wsm]# systemctl restart rabbitmq-server
安装成功,查看页面中发现,交换机多了一种信的类型:x-delayed-message
测试实现:
新增了一个队列delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下
- 正常的一组生产者消费者,设置自定义交换机类型
生产者发送消息指定消息 延迟,到交换机上 到达固定的时间才会发送到交换机上...
来实现消息的延迟TTL
配置文件类代码
在我们自定义的交换机中,这是一种新的交换类型
- 该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中
- 而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中
DelayedQueueConfig.Java
@Configuration
public class DelayedQueueConfig {
// 队列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
// 交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
// key
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
// 声明队列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
// 自定义交换机 我们在这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange() {
HashMap<String, Object> args = new HashMap<>();
//自定义交换机的类型
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 绑定 队列和交换机;
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
生产者
SendMsgController.Java
// 交换机 和 routingkey
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
// 请求接口:
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
correlationData -> {
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(), delayTime, message);
}
消费者
DelayQueueConsumer.Java
/**
* 消费者 - 基于插件的延时队列
*
* @author wsm
*/
@Slf4j
@ComponentScan
public class DelayQueueConsumer {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
}
}
结果查看
浏览器请求:
-
http://localhost:8080/ttl/sendDelayMsg/wsm/20000
-
http://localhost:8080/ttl/sendDelayMsg/www/4000
-
ok,发送的消息,确实在 4s 20s 进行了处理
发布确认高级:
发布确认 springboot 版本
在生产环境中由于一些不明原因,导致 RabbitMQ 重启
- 在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复
- 于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?
确认机制方案:
-
① 生产者 每次发送消息的时候,将消息存入缓存中,
Map kv结构:k每个消息唯一的标识 v每个消息体
-
② 将消息发送到 交换机
交换机接收到消息,返回 生产者
ack
生产者根据对于的k 删除缓存数据交换机超时|宕机,没有收到消息,生产者
回调 nack
生产者,重新发送消息,或其他处理
实例说明:
-
① SpringBoot 配置文件
开启发布确认模式
-
② 添加配置类,声明定义:
交换机 direct类型
队列
绑定
信息为了方便测试,消息没有发送到队列上,消息丢失的场景,使用
direct类型:发送消息指定 routingkey 只会发送到相同的 队列上
,没有匹配的队列 消息丢失
-
③ 编写消息回调类
生产者——发送消息——交换机(接收到消息,进行回调
ack
, 长时间没有收到也会回调触发nack
)但,注意
交换机将消息发送到对应队列上,如果,消息没有匹配的队列,所以消息还是会丢失(没有匹配的队列,发送;
-
④ 编写消息生产者:
发送两个消息,一个与队列匹配routingkey 另一个不匹配…
-
⑤ 编写消费者,监听队列进行消费…
注意:首先要开启Rabbit MQ的发布确认模式:
# 开启RabbitMQ 发布确认模式:
spring.rabbitmq.publisher-confirm-type=correlated
# NONE 值是禁用发布确认模式,是默认值
# CORRELATED 值是发布消息成功到交换器后会触发回调方法
# SIMPLE 值经测试有两种效果
# 其一效果和 CORRELATED 值一样会触发回调方法
# 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果
# 根据返回结果来判定下一步的逻辑,注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel 则接下来无法发送消息到 broker
添加配置类
声明定义:交换机 队列 并进行绑定
ConfirmConfig.Java
/** SpringBoot 消息确认模式 **/
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
//声明业务 Exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
// 声明确认队列
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
// 声明确认队列绑定关系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}
- 声明
交换机confirm.exchange
队列 confirm.queue
队列与交换机绑定 routingkey: key1
消息回调类
com.example.producercallack包下: MyCallBack.Java
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 交换机不管是否收到消息的一个回调方法
*
* @param correlationData 消息相关数据
* @param ack 交换机是否收到消息, true(ack) false(nack)
* @param cause 为收到消息的原因: 异常信息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", id);
} else {
log.info("交换机还未收到 id 为:{}消息,原因:{}", id, cause);
}
}
}
生产者
ProducerController.Java
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
@Autowired // rabbitmq 模板对象;
private RabbitTemplate rabbitTemplate;
@Autowired // 发布确认消息,消息回调方法类;
private MyCallBack myCallBack;
//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(myCallBack);
}
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
/**
* 消息回调和退回
* @param message
*/
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
//指定消息 id 为 1
CorrelationData correlationData1 = new CorrelationData("1");
String routingKey = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
log.info(routingKey + "发送消息内容:{}", message + routingKey);
CorrelationData correlationData2 = new CorrelationData("2");
routingKey = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2);
log.info(routingKey + "发送消息内容:{}", message + routingKey);
}
}
- 生产者发送两个消息,一个消息key1有匹配的队列,另一个key2没有匹配的队列
消费者
ConfirmConsumer.Java
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message) {
String msg = new String(message.getBody());
log.info("消费方法接受到队列 confirm.queue 消息:{}", msg);
}
}
结果测试:
浏览器请求: http://localhost:8080/confirm/sendMessage/你好
- 图片 cmd 输出有一点问题…
回调方法接收
应该是消费方法接收
消息回退🔙:
对于上面的操作: 如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的
- 如何让无法被路由的消息帮我想办法处理一下?
最起码通知我一声,我好自己处理啊
通过设置 mandatory
参数可以在当消息传递过程中不可达目的地时将消息返回给生产者:
#消息退回
spring.rabbitmq.publisher-returns=true
修改回调接口:
com.example.producercallack包下: MyCallBack.Java
-
实现:
RabbitTemplate.ReturnsCallback接口
低版本可能没有
RabbitTemplate.ReturnsCallback
请用RabbitTemplate.ReturnCallback
-
添加接口实现:
returnedMessage(ReturnedMessage returned)
当消息无法路由的时候的回调方法
//当消息无法路由的时候的回调方法
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("消息:{},被交换机 {} 退回,原因:{},路由key:{},code:{}",
new String(returned.getMessage().getBody()), returned.getExchange(),
returned.getReplyText(), returned.getRoutingKey(),
returned.getReplyCode());
}
低版本:消息无法路由回调方法()
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String
exchange, String routingKey) {
log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由 key:{}",new String(message.getBody()),replyText, exchange, routingKey);
}
修改发送者 ProducerController
- 修改RabbitTemplate 的 init() 初始化配置:
//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(myCallBack);
/**
* true:交换机无法将消息进行路由时,会将该消息返回给生产者
* false:如果发现消息无法进行路由,则直接丢弃
*/
rabbitTemplate.setMandatory(true);
//设置回退消息交给谁处理
rabbitTemplate.setReturnsCallback(myCallBack);
}
重启测试:
http://localhost:8080/confirm/sendMessage/你好
ok, 消息成功回退,剩下的处理代码可以自定义了…
Rabbit MQ概念:
幂等
因为消息 ack 持久化机制存在一定的缺点
-
持久化机制保证消息100%消费:
消费者处理消息 突然崩溃,长时间没有处理完,队列中的消息不会删除,而是发送给其它消费者处理,如果这个时候消费者恢复了
就有相同消费者,消费同一个数据的情况了!
-
为了解决这个问题 RabbitMQ 消费者通常都需要做
幂等性
操作 -
幂等:
无论,程序执行多少次,结果不会发送任何改变!
实现原理:
-
MQ 消费者的幂等性的解决一般使用全局 ID
或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断
-
利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费
redis
的setnx 也就是只有不存在key的时候才设置
每个消息具有一个唯一的标识, 消费者第一次消费成功的时候,使用
setnx
设置,这样无论后面多少次操作,都不在进行操作了!
优先级
使用场景:
-
通常商城项目中:
订单催付的场景
我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧
但是,tmall 商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润 所以理应当然,他们的订单必须得到优先处理
如何实现:
- 控制台页面添加
-
队列中代码添加优先级
Map<String, Object> params = new HashMap(); params.put("x-max-priority", 10); channel.queueDeclare("hello", true, false, false, params);
-
消息中代码添加优先级
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build();
注意事项:
要让队列实现优先级需要做的事情有如下事情
- 队列需要设置为优先级队列,消息需要设置消息的优先级
- 消费者需要等待消息已经发送到队列中才去消费因为,这样才有机会对消息进行排序
生产者:
public class PriorityProducer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//给消息赋予一个 priority 属性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build();
for (int i = 1; i < 11; i++) {
String message = "info" + i;
if (i == 5) {
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
} else {
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("发送消息完成:" + message);
}
}
}
消费者:
public class PriorityConsumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
惰性队列
使用场景
-
RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念
惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储
当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时
-
默认情况下,当生产者将消息发送到 RabbitMQ 的时候:
队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者
持久化
在被写入磁盘的同时也会在内存中驻留一份备份当RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息.
两种模式
队列具备两种模式:default默认
和 lazy
-
lazy 模式即为惰性队列的模式,
可以通过调用 channel.queueDeclare 方法的时候在参数中设置
也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级
-
在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
内存开销对比
在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅 占用 1.5MB
常见错误:
队列声明错误:
channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_test' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
有的时候对于,已经声明的队列,更改了其配置,需要在RabbitMQ管理页面手动删除MQ的队列,才能进行重新声明,不然会报错…
完:
终于写完了…
需要代码,安装工具的兄弟可以下方下载: 点个👍吧!
链接:https://pan.baidu.com/s/1M0m0xKBtZlAs3v3FYKq6Tw
提取码:2540
MQ
-
大神笔记:
-
https://juejin.cn/post/7051469607806173221 超级详细笔记
https://note.oddfar.com/rabbitmq/#%E8%A7%86%E9%A2%91%E6%95%99%E7%A8%8B
RPC + MQ: https://zhuanlan.zhihu.com/p/48230422
更多推荐
所有评论(0)