消息队列MQ
MQ的原理可以简单概括为生产者将消息发送到队列中,消费者从队列中获取消息进行处理。具体来说,MQ的原理包括以下几个方面:生产者:生产者将消息发送到MQ服务器中,消息可以是文本、对象、文件等形式。生产者可以使用API或者其他工具将消息发送到MQ服务器,同时可以指定消息的优先级、过期时间等属性。队列:MQ服务器接收到生产者发送的消息后,将其存储在队列中。队列是存储消息的容器,可以保证消息的有序性和一致
一、MQ原理
MQ的原理可以简单概括为生产者将消息发送到队列中,消费者从队列中获取消息进行处理。具体来说,MQ的原理包括以下几个方面:
-
生产者:生产者将消息发送到MQ服务器中,消息可以是文本、对象、文件等形式。生产者可以使用API或者其他工具将消息发送到MQ服务器,同时可以指定消息的优先级、过期时间等属性。
-
队列:MQ服务器接收到生产者发送的消息后,将其存储在队列中。队列是存储消息的容器,可以保证消息的有序性和一致性。MQ服务器可以支持多个队列,每个队列可以有不同的属性和配置。
-
消费者:消费者从MQ服务器中获取消息并进行处理。消费者可以使用API或者其他工具从队列中获取消息,一般采用轮询的方式获取消息。消费者获取到消息后,可以进行处理、存储、转发等操作。
-
订阅发布模式:部分MQ产品支持订阅发布模式,即生产者将消息发送到主题中,多个消费者可以订阅主题并接收消息。订阅发布模式可以实现广播、多播等功能,广泛应用于分布式系统中。
总之,MQ通过将消息存储在队列中,实现了生产者和消费者之间的解耦,提高了系统的可维护性和可扩展性,同时也提高了系统的性能和可靠性。
二、RabbitMQ原理
RabbitMQ 是一款开源的 AMQP(Advanced Message Queueing Protocol)消息中间件,它拥有一系列的特性,如高可用、可扩展、易用性、跨平台支持和多种协议兼容性等。下面简单介绍一下 RabbitMQ 的原理。
- 消息模型和基本概念
RabbitMQ 的消息模型基于观察者模式,主要有以下基本概念:
- Producer:消息生产者。
- Exchange:消息交换机,用来接收、存储和转发消息。
- Queue:消息队列,用来存储消息。
- Binding:绑定,用来把 Exchange 和 Queue 之间的关系互相绑定。
- Consumer:消息消费者。
- 消息的传递过程
根据 RabbitMQ 的模型,消息的传递过程大致如下:
- Producer 将消息发送到 Exchange,Exchange 根据 Routing Key 决定将消息发送到对应的 Queue 中。
- Consumer 从队列中取出消息进行处理。
其中,Exchange 根据 Routing Key 进行分类,有以下三种类型:
- Direct Exchange:根据指定的 Routing Key,将消息路由到唯一绑定的队列中。
- Fanout Exchange:将消息路由到与之绑定的所有队列中,不需要 Routing Key,类似广播。
- Topic Exchange:根据 Routing Key 的通配符进行模糊匹配,将消息路由到匹配的队列中。
- 消息的持久化
为了实现消息的持久化,RabbitMQ 提供了以下两种方式:
- 消息的投递时持久化。
- 消息的消费时持久化。
在投递时持久化时,消息会被存储在磁盘中,以防止系统崩溃或者 RabbitMQ 宕机,消息也不会丢失。在消费时持久化时,需要设置消息的 ack 标记,表示消息已经被消费,最终需要在消费者端手动确认消息是否被消费。
- HA(High Availability)机制
RabbitMQ 提供了多种 HA 方案,其中主备复制模式是最常见的一种。在主备模式下,将 RabbitMQ 集成到不同的节点上,其中一个节点充当主角色,其他节点充当备份角色,主节点会将消息进行复制,发送到大部分备份节点上,以保证集群中至少存在一条可用的消息通路。
- 消息的确认机制
RabbitMQ 提供了消息确认机制(acknowledgment)来保证消息能够成功投递到指定的消费者端。消费者在接收到消息后,需要通过确认(ack)的方式告知 RabbitMQ 自己已经成功消费了消息。如果 RabbitMQ 在接收到消息之后没有接收到确认,则认为消息投递失败,需要重新进行投递。
总体来说, RabbitMQ 的消息传递过程和原理非常清晰,这也是它被广泛使用的原因之一。RabbitMQ 主要实现了 AMQP 协议规范,更具有彻底的开放性和灵活性。
三、MQ应用场景
MQ在分布式系统中有广泛的应用场景,其中一些典型的应用场景包括:
-
异步处理:应用程序可以通过将消息发送到MQ中来实现异步处理,提高系统的并发性能和可靠性。例如,用户注册时发送一条消息到MQ中,由后台系统异步处理注册信息,避免用户等待时间过长。
-
应用解耦:不同的应用程序之间可以通过MQ进行通信,从而解耦应用程序之间的依赖关系,提高系统的可维护性和扩展性。例如,电商系统中,订单系统和库存系统可以通过MQ进行通信,订单系统发送订单信息到MQ中,库存系统从MQ中获取订单信息进行处理。
-
流量削峰:应用程序可以通过MQ实现流量削峰,避免系统崩溃和服务不可用。例如,电商系统中,当有大量用户同时下单时,可以将订单信息发送到MQ中,由后台系统异步处理,避免系统崩溃。
RabbitMQ的流量削锋可以通过以下方式实现:
- 消息暂存:可以设置消息暂存的大小,当消息数量达到一定阈值时,就暂停接收新消息,直到已有消息被处理完毕。这种方式可以有效控制系统的流量。
- 消费者限流:可以通过设置消费者的QoS(Quality of Service)参数来限制消费者的消息处理速度,避免消费者消费速度过快而导致系统负载过大。
- 消息优先级:可以通过设置消息的优先级来控制消息的处理顺序,优先处理重要的消息,避免系统过载。
- 集群部署:可以将RabbitMQ部署在多个节点上,通过负载均衡的方式来分摊流量,提高系统的可用性和性能。
- 消息持久化:可以将消息持久化到磁盘上,避免消息在内存中过多积压,导致系统负载过大。
- 缓存机制:可以使用缓存机制来减轻RabbitMQ的负载压力,如使用Redis等缓存服务器来缓存经常被查询的数据。这种方式可以提高查询效率,但需要注意缓存的数据一致性和更新问题。
-
日志处理:应用程序可以将日志信息发送到MQ中,由后台系统异步处理,从而避免日志记录对系统性能的影响。例如,电商系统中,用户行为日志可以发送到MQ中,由后台系统进行分析和处理。
-
分布式事务:MQ可以作为分布式事务的一部分,实现分布式事务的提交和回滚。例如,电商系统中,下单成功后,可以将订单信息和库存信息发送到MQ中,由分布式事务统一进行提交或回滚。
-
触发器:将消息队列用作触发器系统,例如某些事件的触发需要发布消息到指定队列,而后有某些处理器需要在接收到消息后执行指定操作。例如可以将用户在 Web 界面上的某次点击操作发布一个消息,然后有一个独立的处理器定期检查队列中的信息并执行相关操作。
-
任务队列:任务队列是一种经典的应用场景,将需要处理的任务都放入队列,通过开启多个消费者并行执行任务,提高任务处理的效率。例如将多次短信发送任务加入队列中,多个消费者并行处理。
-
事件通知:当某些特定事件发生时,可以将事件和信息发布到 RabbitMQ。多个消费者可以接收到对应的信息并发起异步处理,例如将某个服务的状态变化信息发布到 RabbitMQ,然后多个消费者接收消息并触发相应的操作。
综上所述,MQ在分布式系统中有广泛的应用场景,可以提高系统的并发性能、可靠性、可维护性和扩展性。
四、如何配置RabbitMQ
以下是 RabbitMQ 的基本配置和步骤:
- 安装 RabbitMQ
在 RabbitMQ 的官方网站下载对应的版本(https://www.rabbitmq.com/download.html),并安装到指定的目录中。
- 启动 RabbitMQ
在安装完成后,使用命令行窗口进入 RabbitMQ 的目录,输入以下命令来启动 RabbitMQ:
rabbitmq-server start
- 创建用户和权限
通过以下命令来创建用户和权限:
rabbitmqctl add_user username password
rabbitmqctl set_user_tags username administrator
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
其中,username 为用户名,password 为密码,通过 set_user_tags 命令将用户标记为管理员,通过 set_permissions 命令给此用户赋予 / 下的所有权限。
- 创建 Exchange 和 Queue
在 RabbitMQ 中,Exchange 和 Queue 是消息传递过程中的核心。创建 Exchange 和 Queue 的命令如下:
# 创建 Exchange
rabbitmqctl add_exchange exchange_name type
# 创建 Queue
rabbitmqctl add_queue queue_name
其中,exchange_name 为 Exchange 名称,type 包括 direct、fanout、topic 和 headers 等四种类型。queue_name 为 Queue 名称。
- 绑定 Queue
将 Exchange 绑定到 Queue 上:
rabbitmqctl bind_queue queue_name exchange_name routing_key
其中,routing_key 为绑定的关键字。
- 测试连接
在配置完成后,打开 RabbitMQ 的管理界面 http://localhost:15672,使用创建的管理员账户和密码进行登陆,如果能够正常登陆,则证明 RabbitMQ 配置成功。
以上是 RabbitMQ 的基本配置步骤,可以根据实际需求进行定制和优化,如配置 HA(High Availability)集群、负载均衡、SSL 安全认证等。
五、mq消费阻塞怎么办
MQ消费阻塞可能有多种原因,包括消费者处理消息的速度慢、网络延迟等。针对不同的原因,可以采取不同的解决方案:
-
增加消费者数量:可以通过增加消费者数量来提高消息消费速度,减少消息阻塞的可能性。
-
提高消费者处理消息的速度:可以优化消费者的代码逻辑、减少消费者的处理时间等方式来提高消费者处理消息的速度。
-
调整MQ的配置参数:可以通过调整MQ的配置参数,如最大消息数、最大连接数等来优化MQ的性能,减少消息阻塞的可能性。
-
增加MQ的容量:可以通过增加MQ的容量来提高MQ的处理能力,减少消息阻塞的可能性。
-
使用消息确认机制:在消息消费完成后,可以使用消息确认机制来通知MQ,避免消息重复消费或丢失。
综上所述,针对MQ消费阻塞问题,应该结合实际情况采取相应的解决方案。
六、RabbitMQ常用命令
RabbitMQ 是一款高性能、多协议、可靠的消息中间件,其能够为企业提供高效、高效的异步消息传递服务。为了更好地管理 RabbitMQ 实例,以下是 RabbitMQ 常见命令:
- rabbitmqctl
rabbitmqctl 是 RabbitMQ 的控制工具,可以执行常见的管理操作,例如添加、删除用户,开启和关闭插件,备份和恢复队列等。
- 启动 RabbitMQ
sudo rabbitmq-server start
- 关闭 RabbitMQ
sudo rabbitmqctl stop
- 添加用户
sudo rabbitmqctl add_user username password
- 删除用户
sudo rabbitmqctl delete_user username
- 修改用户密码
sudo rabbitmqctl change_password username new_password
- 列出所有用户
sudo rabbitmqctl list_users
- 列出虚拟主机
sudo rabbitmqctl list_vhosts
- 添加虚拟主机
sudo rabbitmqctl add_vhost vhost_name
- 删除虚拟主机
sudo rabbitmqctl delete_vhost vhost_name
- 列出所有权限
sudo rabbitmqctl list_permissions
- 授权
sudo rabbitmqctl set_user_tags username tag
sudo rabbitmqctl set_permissions -p vhost_path username ".*" ".*" ".*"
- rabbitmq-plugins
rabbitmq-plugins 是 RabbitMQ 的插件管理工具,可以管理 RabbitMQ 的各种插件,例如管理、启用或禁用插件等。
- 查看所有插件
sudo rabbitmq-plugins list
- 启用插件
sudo rabbitmq-plugins enable plugin_name
- 禁用插件
sudo rabbitmq-plugins disable plugin_name
- rabbitmqadmin
rabbitmqadmin 是 RabbitMQ 的 Web 管理工具,可以通过命令行管理 RabbitMQ,例如创建和删除队列,发送消息等。
- 帮助
sudo rabbitmqadmin help
- 创建队列
sudo rabbitmqadmin declare queue name=<queue_name>
- 删除队列
sudo rabbitmqadmin delete queue name=<queue_name>
- 发送消息
sudo rabbitmqadmin publish exchange=<exchange_name> routing_key=<routing_key> payload="Hello, world"
上述是 RabbitMQ 常用的命令,可以根据实际需求进行操作。了解和熟悉这些命令可以方便日常管理 RabbitMQ 实例。
七、go如何使用rabbitMQ
Go 语言有丰富的 RabbitMQ 库,包括 AMPQ 和 STOMP 等多种协议支持,操作简单,易上手。下面简要介绍在 Go 中如何使用 RabbitMQ:
- 安装 RabbitMQ Client 库
安装 streadway/amqp 库,这是 Go 语言开发 RabbitMQ 应用的主要库。
go get github.com/streadway/amqp
- 连接 RabbitMQ
连接 RabbitMQ,在连接前需要先启动 RabbitMQ。
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
func main() {
fmt.Println("Connecting to RabbitMQ...")
// 格式: amqp://账号:密码@地址:端口号/Virtual Host
conn, err := amqp.Dial("amqp://username:password@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
fmt.Println("Connection established with RabbitMQ.")
}
- 定义消息队列
定义消息队列,如果消息队列不存在,则创建一个新的消息队列。
func main() {
// 连接RabbitMQ...省略
channel, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a RabbitMQ channel: %v", err)
}
defer channel.Close()
queue, err := channel.QueueDeclare(
"hello", // 队列名称
false, // 是否持久化
false, // 是否自动删除
false, // 是否独占
false, // 是否等待
nil, // 额外参数
)
if err != nil {
log.Fatalf("Failed to declare a RabbitMQ queue: %v", err)
}
fmt.Printf("RabbitMQ queue [%s] declared.\n", queue.Name)
}
- 发送消息
发送一个消息到指定队列。
func main() {
// 连接RabbitMQ...省略
// 定义消息队列...省略
for i := 0; i < 5; i++ {
message := fmt.Sprintf("Hello World! %d", i)
err = channel.Publish(
"", // 当前测试使用默认 Exchange
queue.Name, // 队列名称
false, // 不需要将消息发送给某个特定的消费者
false, // 不需要消息持久化
amqp.Publishing{
ContentType: "text/plain", // MIME 类型
Body: []byte(message), // 消息内容
},
)
if err != nil {
log.Fatalf("Failed to publish a message to RabbitMQ: %v", err)
}
fmt.Println("[x] Sent:", message)
}
}
- 接收消息
从队列中接收消息。
func main() {
// 连接RabbitMQ...省略
// 定义消息队列...省略
// 发送消息...省略
msgs, err := channel.Consume(
queue.Name, // 队列名称
"", // 消息标题
true, // 是否自动应答
false, // 是否独占
false, // 是否阻塞
false, // 额外参数
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
// 接收阻塞,这里使用 select 作为阻塞条件,模拟生产环境中阻塞回调
select {}
}
以上简要介绍了在 Go 语言中如何使用 RabbitMQ,实际应用中还需根据业务需求进行定制化开发。
八、RabbitMQ 的性能优化策略
在 Go 语言中使用 RabbitMQ 进行性能优化,也需要实现类似的优化策略,下面列举几个重要的优化策略:
- Prefetch Count
在消费者分发消息时,RabbitMQ 中的QoS(Quality of Service)定义了对每个消费者未确认的消息数量进行限制。QoS 是预取计数 (Prefetch Count) 的值,即每个消费者最多获取的消息数量。需要根据实际情况设置 Prefetch Count 的大小,可以使用 basic_qos 函数来控制预取计数和服务器端缓存大小,从而避免消费者出现超时问题。
func main() {
// 连接 RabbitMQ...省略
// 创建 Channel 和 Queue...省略
// 设置 Prefetch Count,即消费者未确认的消息数量限制
err = channel.Qos(1, 0, false)
if err != nil {
log.Fatalf("Failed to set QoS: %v", err)
}
//开始消费从 Queue 中获取的消息
msgs, err := channel.Consume(
queue.Name, // 队列名称
"", // 消费者编号
false, // 是否自动回答确认
false, // 是否独占
false, // 是否阻塞
false, // 额外参数
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// 返回确认信息
d.Ack(false)
}
}()
// 等待程序结束
select {}
}
- 消息 TTL
对于生命周期较短的消息,可以设置消息TTL(Time To Live),即消息过期时间。超过过期时间的消息将会被消费掉而不会留存在队列中。在消息的头部设置 TTL,可以让 RabbitMQ 定期将过期消息删除,避免队列过大和需要占用额外的存储空间。可以通过在发布消息之前设置消息的 TTL 变量来设置其过期时间。
func main() {
// 连接 RabbitMQ...省略
// 创建 Channel 和 Queue...省略
// 定义消息
msg := amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Hello World!"),
Expiration: "60000", // 消息过期时间为 60 秒
}
// 发送消息到 Queue
err = channel.Publish(
"", // Exchange
queue.Name, // 队列名称
false, // 是否需要投递给特定的消费者
false, // 是否需要消息持久化
msg,
)
if err != nil {
log.Fatalf("Failed to publish a message to RabbitMQ: %v", err)
}
// ...
}
- 增加缓存和并发操作
如果需要处理大量的消息,可以增加缓存大小来加快队列的速度。同时,也可以通过增加并发操作来提高消息处理效率。RabbitMQ 允许多个消费者处理相同的队列,同时消费者还可以在不同的节点上运行。在并发处理消息时要注意消费者之间的竞争关系以及竞争关系可能导致的死锁问题。
增加缓存和并发操作可以使用 sync
包和 go
关键字实现。例如,在多个 goroutine 中同时消费消息,可以通过创建多个消费者进行分发和处理。示例代码如下:
func main() {
// 连接 RabbitMQ...省略
// 创建 Channel 和 Queue...省略
// 创建多个消费者
numConsumers := 5
for i := 0; i < numConsumers; i++ {
go func(i int) {
msgs, err := channel.Consume(
queue.Name, // 队列名称
"", // 消费者编号
false, // 是否自动回答确认
false, // 是否独占
false, // 是否阻塞
false, // 额外参数
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
for d := range msgs {
log.Printf("Consumer %d Received a message: %s", i, d.Body)
d.Ack(false)
}
}(i)
}
// 等待程序结束
select {}
}
- 调整队列大小
队列大小是 RabbitMQ 性能的一个重要因素,队列大小过大会占用过多的存储空间,导致系统变慢。同时队列大小过小又会导致消息积压。可以通过调整队列大小来优化系统性能。可创建一个预留空间大小为 1.5 倍预期大小的队列,在队列达到预期大小后,停止向队列中添加新的消息。
在 Go 语言中使用 RabbitMQ 进行调整队列大小,需要使用 channel.QueueDeclare
方法来设置队列的选项。队列大小需要根据实际业务情况和消息负载来进行合理的设置,避免队列过大导致占用过多的存储空间,或者队列过小导致消息丢失或积压。
在使用 channel.QueueDeclare
方法时,需要传入一个 amqp.QueueDeclareOptions
参数,可以通过设置该参数中的 MaxQueueLength
字段来指定队列的最大长度。示例代码如下:
func main() {
// 连接 RabbitMQ...省略
// 创建 Channel...省略
queueOptions := amqp.QueueDeclareOptions{
Name: "my_queue",
Durable: true,
MaxQueueLength: 10000, // 队列最大长度为 10000
MaxBodySize: 131072,
MessageTTL: "",
Expiration: "",
DeadLetterExchange: "",
DeadLetterRoutingKey: "",
MaxDeliveryCount: 0,
NoWait: false,
Arguments: make(amqp.Table),
}
queue, err := channel.QueueDeclare(queueOptions)
if err != nil {
log.Fatalf("Failed to declare the RabbitMQ queue: %v", err)
}
// ...
}
在此示例中,MaxQueueLength 字段设置为 10000,即队列的最大长度为 10000。这个值可以根据实际情况进行调整。需要注意的是,队列最大长度设置得太小会导致消息积压、队列溢出;设置得太大则可能会导致占用大量内存空间,影响 RabbitMQ 的性能。
最佳的队列长度取决于消息的相对大小、消息到达的速度以及消费者的数量和处理速度。在设置队列长度时,可以考虑在某个阈值后停止向队列中添加新的消息,或者使用其他策略来控制队列的大小,例如使用 TTL(Time To Live)选项删除过期的消息,或者使用消费者优先级和限制来控制消息处理的速度。
- 调整 CPU 相关设置
RabbitMQ 是基于 Erlang VM 构建的,性能取决于 Erlang VM 在服务器中的分配和优化设置。可以在 RabbitMQ 服务器上进行 CPU 相关设置调整,在 /etc/default/rabbitmq-server 配置文件中可以设置 ERL_FULLSWEEP_AFTER 环境变量来控制 Erlang VM 安装基准的 GC 频率。修改 /proc/sys/net/core 配置也可以有助于提高系统性能。
在启动脚本中添加如下行:
export ERL_FULLSWEEP_AFTER=0
可以将垃圾回收延迟到后台进行,从而减少系统的 CPU 使用率。注意,更改该变量可能会影响 RabbitMQ 的性能,而且该变量对应的含义可能会随着不同版本的 Erlang VM 而有所不同,因此在更改该变量之前需要慎重考虑。
对于其他 CPU 相关设置,Go 语言中使用 RabbitMQ 与使用其他语言的 RabbitMQ 客户端库并无区别,可以参考其他语言的设置方法,例如使用 RabbitMQ 的 Erlang 库使用的方法基本上是相同的。一般建议根据实际情况来调整 CPU 相关设置,避免对 RabbitMQ 性能产生负面影响。
以上是 RabbitMQ 的一些性能优化策略,实际应用中需要根据实际场景进行具体的调优和优化方案。
九、Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?
十、如何保证RabbitMQ消息的顺序性?
-
拆分多个 queue(消息队列),每个 queue(消息队列) 一个 consumer(消费者),就是多一些 queue (消息队列)而已,确实是麻烦点;
-
一个 queue (消息队列)但是对应一个 consumer(消费者),然后这个 consumer(消费者)内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
十一、RabbitMQ的工作模式
RabbitMQ 是一个消息队列系统,采用了 AMQP(高级消息队列协议)来实现消息的传输和路由。在 RabbitMQ 中,主要有以下几种工作模式:
- 简单模式
简单模式是最基本的工作模式。在此模式下,生产者向队列中发送消息,消费者从队列中接收消息并处理。此时只有一个生产者和一个消费者。这种模式既简单又易于理解,适合低并发的场景。
- 工作模式
工作模式是一组消费者共享一个队列的模式。此时多个生产者向同一个队列发送消息,多个消费者从该队列中接收消息并处理,多个消费者共享同一个队列。消息被平均地发送到多个消费者中,以完成并行处理任务。
- 发布/订阅模式
发布/订阅模式是一种广播形式的模式。在此模式下,消息首先被发送到 Exchange 中,Exchange 根据其类型将消息发送至多个队列,多个消费者从队列中接收并处理消息。此模式可以用于群发广告、新闻订阅等应用场景。
- 路由模式
路由模式是一种根据关键字或者正则表达式将消息发送到多个队列中的模式。在此模式下,消息首先会被发送到 Exchange 中,Exchange 根据消息的 routing key 将消息发送至特定的队列。多个消费者从各自的队列中接收消息并处理。
- 主题模式
主题模式是一种将消息发送到多个队列中的模式。通常使用通配符将队列的名称与 routing key 进行匹配。此模式可以将消息根据 routing key 的匹配结果发送到多个队列中,以适应不同场景的需求。
十二、如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
发送方确认模式
将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID。
一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。
如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息。
发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。
接收方确认机制
消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。
这里并没有用到超时机制,RabbitMQ 仅通过 Consumer 的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息。保证数据的最终一致性;
下面罗列几种特殊情况
如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)
如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。
十三、如何保证RabbitMQ消息的可靠传输?
消息不可靠的情况可能是消息丢失,劫持等原因;
丢失又分为:生产者丢失消息、消息列表丢失消息、消费者丢失消息;
1.生产者丢失消息
生产者丢失消息:从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;
transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;
confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;
rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
2.消息队列丢数据
消息队列丢数据:消息持久化。
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。
这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。
这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢?
这里顺便说一下吧,其实也很容易,就下面两步
1. 将queue的持久化标识durable设置为true,则代表是一个持久的队列
2. 发送消息的时候将deliveryMode=2
这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据
3.消费者丢失消息
消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!
消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;
如果这时处理消息失败,就会丢失该消息;
解决方案:处理消息成功后,手动回复确认消息。
十四、为什么不应该对所有的 message 都使用持久化机制?
- 首先,必然导致性能的下降,因为写磁盘比写 RAM 慢的多,吞吐量可能有 10 倍的差距。
- 如果在 RabbitMQ 集群中启用了持久化机制,并且某个节点崩溃或者重启,那么该节点上的消息会被重新分配到其他节点上。这个过程可能会导致消息重复消费或者丢失的问题。
十五、如何保证RabbitMQ集群高可用?
-
配置镜像队列:在 RabbitMQ 中,可以配置镜像队列来实现队列的高可用性。镜像队列包含一个主队列和多个镜像队列,主队列和镜像队列之间会同步消息数据。当主队列节点宕机或不可用时,镜像队列中的节点可以接管并继续提供服务,从而保证队列的高可用。
-
配置主备节点:RabbitMQ 集群中可以配置主备节点(主从模式),当主节点不可用时,备节点会自动接管主节点的工作。主备节点可以实现 RabbitMQ 集群的自动故障转移,从而提高 RabbitMQ 集群的可用性。
-
保证节点之间的网络互通:RabbitMQ 集群中的节点需要经常进行心跳检测,以保证节点之间的网络互通。如果节点之间出现网络分区或者通信错误,可以通过配置异步镜像队列来维护消息的一致性,并尽可能减少消息丢失。
-
使用监控和报警系统:为 RabbitMQ 集群配置监控和报警系统,及时发现并处理节点故障、磁盘空间使用等问题,以避免更大的中断和故障。
-
使用负载均衡:RabbitMQ 集群可以通过配置负载均衡来分摊流量和负载,避免单个节点出现过多的流量和负载,从而提高 RabbitMQ 集群的可用性和性能。
十六、如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?
解决消息队列的延时问题:
- 定时任务轮询消息队列,发现有延时消息则进行处理;
- 消息队列自带的延时功能,将消息设置好延时时间,到达延时时间后再进行处理。
解决过期失效问题:
- 消息队列中的TTL(Time-To-Live)属性,设置消息的过期时间,到达过期时间后将消息丢弃或者进行其他处理。
当消息队列已经满了怎么办:
- 扩展消息队列的容量,增加队列的长度或者增加队列的数量;
- 消息队列的阻塞/非阻塞模式,通过阻塞模式等待队列有空余空间,
- 非阻塞模式将消息进行丢弃或者进行其他处理,以保证消息队列的正常运行。
当有几百万消息持续积压几小时时怎么办:
- 扩展消息队列的处理能力,增加队列的消费者数量,
- 或者采用分布式的方式将消息进行处理;
- 对于重要性不高的消息,可以进行消息的丢弃或者延迟处理,以减轻消息队列的负载。
- 对消息队列进行性能优化,例如采用消息压缩、减少消息体大小等方式,提升消息队列的处理能力。
更多推荐
所有评论(0)