• 简介
    AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
    RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

一,概念

1,Messge
消息,由消息头和消息体组成。消息体是不透明的,消息头由一些列可选属性组成,这些属性包括:routing-key(路由键)、priority(优先级)、delivery-mode(消息是否可持久性存储)。
2,Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
3,Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端程序
4,Exchange
交换器,用来接收生产者发送的消息,并将这些消息路由给服务器中的队列。
三种常见的交换器:direct(发布与订阅,完全匹配)、fanout(广播)、topic(主题,规则匹配)。 交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟队列和交换器路由键匹配,那么消息就会被路由到该绑定的队列中。也就是说,消息到队列的过程,消息首先会经过交换器,接下来交换器再通过路由键分发消息到具体的队列中。
5,Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器与消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
6,Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可以投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列取走消息。
在这里插入图片描述
直接(Direct):直接交换机通过消息上的路由键直接对消息进行分发。
扇出(Fanout):一个扇出交换机会将消息发送到所有和它进行绑定的队列上。
主题(Topic):这个交换机会将路由键和绑定上的模式进行通配符匹配。
消息头(Headers):消息头交换机使用消息头的属性进行消息路由

7,Routing-key
路由键,RabbitMQ决定消息该投递到哪个队列的规则。队列通过路由键绑定到交换器。消息发送到MQ服务器时,消息拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。如果相匹配,消息会投递到该队列,如果不匹配,消息将会进入黑洞。
8,Connection
链接,指rabbitMQ服务器和服务建立的TCP连接。
9,Channel
 信道,是TCP里面的虚拟连接。例如,电缆相当于TCP,信道是一个独立的光纤束,一条TCP连接上创建多条信道是没有问题的。TCP一旦打开,就会创建AMQP信道,无论发布消息、接受消息、订阅队列,这些动作都是通过信道完成的。TCP的创建和销毁开销特别大,创建需要3次握手,销毁需要4次挥手。如果不用信道,那么应用程序就会以TCP连接到RabbitMQ,高峰时每秒成千上万条连接会造成资源的巨大浪费,而且操作系统每秒处理TCP连接数也是有限制的,必定造成性能瓶颈。
10,Virtual Host
虚拟主机,表示一批交换机,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器。每个virtualhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是/。
11,Broker
表示消息队列服务器实体。
在这里插入图片描述

二,原理

假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示:

  1. P1生产消息,发送给服务器端的Exchange
  2. Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1
  3. Queue1收到消息,将消息发送给订阅者C1
  4. C1收到消息,发送ACK给队列确认收到消息
  5. Queue1收到ACK,删除队列中缓存的此条消息

Consumer收到消息时需要显式的向rabbit broker发送basic.ack消息或者consumer订阅消息时设置auto_ack参数为true。在通信过程中,队列对ACK的处理有以下几种情况:

  1. 如果consumer接收了消息,发送ack后,rabbitmq会删除队列中这个消息,发送另一条消息给consumer。
  2. 如果cosumer接受了消息,但在发送ack之前断开了连接,rabbitmq会认为这条消息没有被deliver,在consumer再次连接的时候,这条消息会被redeliver。
    3 . 如果consumer接受了消息,但是程序中有bug,忘记了发送ack,rabbitmq不会重复发送消息。
  3. rabbitmq2.0.0和之后的版本支持consumer reject某条(类)消息,可以通过设置requeue参数中的reject为true达到目地,那么rabbitmq将会把消息发送给下一个注册的consumer。
    在这里插入图片描述

在这里插入图片描述

三,消息发送及接受过程

假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。
基本的通信流程大概如下所示:

  • P1生产消息,发送给服务器端的Exchange
  • Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1
  • Queue1收到消息,将消息发送给订阅者C1
  • C1收到消息,发送ACK给队列确认收到消息
  • Queue1收到ACK,删除队列中缓存的此条消息

Consumer收到消息时需要显式的向rabbit broker发送basic。ack消息或者consumer订阅消息时设置auto_ack参数为true。

在通信过程中,队列对ACK的处理有以下几种情况:

  • 如果consumer接收了消息,发送ack,rabbitmq会删除队列中这个消息,发送另一条消息给consumer。
  • 如果cosumer接受了消息,但在发送ack之前断开连接,rabbitmq会认为这条消息没有被deliver,在consumer再次连接的时候,这条消息会被redeliver。
  • 如果consumer接受了消息,但是程序中有bug,忘记了ack,rabbitmq不会重复发送消息。
  • rabbitmq2.0.0和之后的版本支持consumerreject某条(类)消息,可以通过设置requeue参数中的reject为true达到目地,那么rabbitmq将会把消息发送给下一个注册的consumer。

消息的ACK机制

即消息的Ackownledge确认机制,为了保证消息不丢失,消息队列提供了消息Acknowledge机制,即ACK机制,当Consumer确认消息已经被消费处理,发送一个ACK给消息队列,此时消息队列便可以删除这个消息了。如果Consumer宕机/关闭,没有发送ACK,消息队列将认为这个消息没有被处理,会将这个消息重新发送给其他的Consumer重新消费处理。

消息的事务支持

消息的收发处理支持事务,例如:在任务中心场景中,一次处理可能涉及多个消息的接收、处理,这应该处于同一个事务范围内,如果一个消息处理失败,事务回滚,消息重新回到队列中。

消息的持久化

消息的持久化,对于一些关键的核心业务来说是非常重要的,启用消息持久化后,消息队列宕机重启后,消息可以从持久化存储恢复,消息不丢失,可以继续消费处理。

消息处理模式
fanout 模式

模式特点:

  • 可以理解他是一个广播模式
  • 不需要routing key它的消息发送时通过Exchange binding进行路由的~~在这个模式下routing key失去作用
  • 这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定
  • 如果接收到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。

direct 模式
任何发送到Direct Exchange的消息都会被转发到routing_key中指定的Queue。

  • 一般情况可以使用rabbitMQ自带的Exchange:”” (该Exchange的名字为空字符串), 也可以自定义Exchange

  • 这种模式下不需要将Exchange进行任何绑定(bind)操作。当然也可以进行绑定。可以将不同的routing_key与不同的queue进行绑定,不同的queue与不同exchange进行绑定

  • 消息传递时需要一个“routing_key”

  • 如果消息中不存在routing_key中绑定的队列名,则该消息会被抛弃。

  • 如果一个exchange声明为direct,并且bind中指定了routing_key,那么发送消息时需要同时指明该exchange和routing_key。

简而言之就是:生产者生成消息发送给Exchange, Exchange根据Exchange类型和basic_publish中的routing_key进行消息发送 消费者:订阅Exchange并根据Exchange类型和binding key(bindings 中的routing key) ,如果生产者和订阅者的routing_key相同,Exchange就会路由到那个队列。

topic 模式
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。

topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同。
它约定:

*匹配一个标识符
#匹配0个或多个标识符
  • routing key为一个句点号“. ”分隔的字符串(我们将被句点号“。 ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  • binding key与routing key一样也是句点号“. ”分隔的字符串
  • binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
    rabbitmq1.png
    以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。
优缺点

组成部分
生产者 : 消息的生产者,给交换机发送消息
消费者 : 消息的消费者,监听队列,一旦监听到消息,就消费
交换机 : 交换机用来决定消息发送到哪个队列上
队列 : 接收交换机上发送过来的消息

简单队列模式
autoAck : true
消费者无论是否处理完,都回复一个ack true

Work模式
能够实现“能者多劳”,性能强的消费者多消费。
basicQos(1) : 消费者告知broker一次值接收一条消息
将自动Ack改成手动Ack,处理完消息后告诉broker可以发送下一条消息过来了,这样就能实现“能者多劳”。

发布订阅者模式
此模式要使用fanout交换机。
此模式可以将一条消息发送给多个队列,被多个消费者消费。
弊端:不能指定队列接收,绑定在交换机上的所有队列都会受到消息。

路由模式
此模式要使用direct交换机。
解决“发布订阅者模式”的弊端,可以指定发送给绑定在交换机上的队列。
在将队列和交换机进行绑定时,要指定routing-key。
在生产者发送消息时,要指定routing-key。

通配符模式
让routing-key在匹配的时候达到模糊匹配的效果。
通配符:
*:匹配当前级 例如:roouting-key为java.*只能匹配java.api,不能匹配java.api.yes
#:匹配多级 例如:roouting-key为java.#可以匹配java.api,也可以匹配java.api.yes

四,使用

这里我们使用docker构建好了rabbitmq的容器,并做了端口映射

1.创建一个交换机exchange

在这里插入图片描述
Exchange 属性说明:

  • Feature中没有 D 的话就是非持久化,关闭rabbitmq的话就会删除
  • Virtual host:属于哪个Virtual host。(如果有多个Virtual host的有此属性,一般默认的Virtual
    host是"/",Virtual host可以做最小粒度的权限控制。)
  • Name:名字,同一个Virtual host里面的Name不能重复。
  • Durability: 是否持久化 (Durable:持久化,Transient:不持久化)。
  • Auto delete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。
  • Internal: 是否是内部专用exchange,是的话,就意味着我们不能往该exchange里面发消息。
  • Arguments:
    参数,是AMQP协议留给AMQP实现做扩展使用的。alternate_exchange配置的时候,exchange根据路由路由不到对应的队列的时候,这时候消息被路由到指定的alternate_exchange的value值配置的exchange上。
2.创建一个队列queue并绑定交换机

在这里插入图片描述

在这里插入图片描述

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐