消息中间件


一、何为消息中间件

  • 1.概述

1.消息中间件利用可靠些传递机制进行系统和系统之间的通讯.

2.通过提供消息队列传递和消息队列排队机制,它可以在分布式系统架构上扩展进程之间的通讯.

  • 2.消息中间件的应用场景

1.跨系统数据传递

2.数据的并发和异步处理

3.大数据分析与传递

5.分布式事务

例如:

在进行数据迁移和高并发请求的时候,此时有个10W的并发请求下单,这个时候我们就可以采用消息队列缓存中间件,先把订单数据放到消息队列中,让这批数据稳健、可靠的入库和执行。

  • 3.常见的消息中间件
ActiveMQRabbitmqkafkaRocketmq

消息中间件的本质和设计就是一种用来接收数据、数据请求、存储数据、发送数据的过程的技术服务。

MQ消息队列:负责数据的传接收、存储和传递,所以性能要过于普通服务和技术。

二、消息队列协议

  • 1.网络协议三要素

1.语法:语法是用户数据与控制信息的数据与格式以及数据出现的顺序

2.语义:语义是解释控制信息每个部分的意义,它规定了需要发出扫描控制信息以及完成的动作和做出什么样的响应.

3.时序:时序是对事件发生的顺序的详细说明.

①.语法:http规定了请求报文和响应报文的格式

②.语义:客户端主动发起了请求称之为请求

③.时序:一个请求对应一个响应

  • 2.为什么消息中间件不采用http作为传输协议?

1.应为http协议的自身机制(报文头和响应报文)是比较复杂的,其中还包含了cookies和数据加密解密,状态码,响应码等,对于一个消息传递存储分法功能的协议是不需要这么复杂的功能这么多反而会拖垮性能,反之要追求高性能、简介、快速。

2.一般情况下http面对的都是短连接,在实际应用中容易中断,造成数据丢失,这样就不利于消息中间件的业务场景,因为消息中间件是一个长期获取消息的过程,出现问题和故障需要保证数据持久化,目的就是为了能更好的对数据进行存储、转发保证数据的高可用和稳健运行。

  • 3.消息中间件采用的最常见的协议
OpenWireAMQPAMQPMQTTKafkaOpenMessage

AMQP:

AMQP:全称(Advanced Message Queuing Protocol)是消息队列协议,由摩根大通集团联合其它公司共同设计,是一个提供消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.基于该协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品.不同的开发语言等条件的限制,Erlang语言开发,Erlang语言是基于C语言设计的语言,因此该协议性能也是十分的强悍.Erlang实现有Rabbitmq等.

AMQP协议支持Rabbitmq、Activemq

特征:

①.分布式事务支持

②.消息的持久化支持

③.高性能和高可靠的消息处理优势.


MQTT协议:

MQTT协议:(Message Queueing Telemetry Transport)消息队列是IBM开放的一个即时通讯协议,物联网架构中的重要组成部分.

特点:

1、轻量
2、结构简单
3、传输快,不支持事务
4.没有持久化设计

应用场景:

1、适用于计算能力有限
2、网络不稳定的场景

支持者:

Rabbitmq、Activemq

OpenMessage协议:

最近几年由阿里、雅虎、滴滴、Stremalio等公司共同参与发布的一款消息中间件、流处理等领域的应用开发标准。

特点:

①、结构简单

②、解析速度快

③、支持事务和持久化设计


Kafka协议:

kafka协议是基于TCP/IP的二进制协议,消息内部是通过长度来分割,由一些基本数据类型组成.

特点:

①.结构简单

②.解析速度快

③.无事务支持

④.有持久化的设计

消息队列持久化


三、持久化

  • 1.概述

持久化就是将数据存储进入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。

  • 2.常见的持久化方式
ActivemqRabbitmqKafkaRocketmq
文件存储支持支持支持支持
数据库存储支持

消息的分发策略-高可用高可靠


四、消息分发策略

1.MQ消息队列有如下几个角色:

①.生产者

②.存储消息

③.消费者

2.生产者生成消息消费者是如何获取消息的呢?

获取消息一般分为两种方式,分别是推(push)、拉(pull),我们平常使用的git就有这个推拉机制,我们发送http请求就是一种典型的拉取数据库数据返回的过程.而消息队列MQ是一种推送过程,而这些机制会适用到很多业务场景也有很多对应推机制策略.

在发送消息的过程中可能会出现异常,或者网络的抖动,故障等等因为造成消息的无法消费,比如用户在下订单,消费MQ接受,订单系统出现故障,导致用户支付失败,那么这个时候就需要消息中间件必须支持消息重试机制策略,也就是当出现问题和故障的情况下,消息不丢失还可以进行重发.

五、消息分发策略的机制对比

ActiveMQRabbitMQKafkaRocketMQ
发布订阅支持支持支持支持
轮询分发支持支持支持
公平分发支持支持
重发支持支持支持
消息拉取支持支持支持

轮询分发和公平分发的区别,轮询分发它侧重于公平性,不管服务器的算力资源的优劣,分发的数据始终都是平均分配,公平分发则讲究"能者多劳",对于算力资源更优处理数据更快的服务器采取分配更多.反之,对算力资源数据处理较为劣的服务器采取分配更少的数据.

六、消息队列高可用

  • 1.概述

①.所谓的高可用,是指在产品在规定的条件和规定的时刻或者时间内处于可执行规定功能状态的能力.

②.当业务量增加是,请求也过大,一台消息中间件服务器的会触及硬件(CPU、内存、磁盘)的极限,一台消息服务区你已经无法满足业务需求,所以消息中间件必须支持集群部署,来达到高可用的目的。

  • 2.主从数据库的部署模式

在这里插入图片描述

生产者将消费发送到Master节点,所有的都连接这个消息队列共享这块数据区域,Master节点负责写入,一旦Master挂掉,Slave节点继续服务,从而形成高可用.

  • 3.Master-Slaver主从同步部署方式

在这里插入图片描述

这种模式写入消息同样在Master主节点上,但是主节点会同步数据到Slaver节点形成副本,和zookeeper或者Redis主从节点机制很相同.这样可以达到负载均衡的效果,如果消费者有多个这样就可以去不同的节点就行消费,以为消息的拷贝和同步会暂用很大的带宽和网络资源.在后续的Rabbitmq中会有使用.

  • 4.多住集群转发部署模式

在这里插入图片描述

①.如果插入的数据是broker-1中,元数据信息会存储数据的相关描述和记录存放的位置(队列).

②.它会对描述的信息也就是元数据进行同步,如果消费者在broker-2中进行消费,发现自己几点没有对应的消息,可以从对应的元数据中去查询,然后返回对应的消息信息.

场景:

比如买或者和或者演唱会门票,第一个黄牛有顾客说要买演唱会门票,自己这里没有但是它回去联系其它的黄牛询问,如果有就返回.

  • 5.总结三句话

1.要买进行数据共享

2.要么消息同步

3.要么元数据共享

  • 5.高可靠

所谓高可靠是指系统可以无故障地持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠

Rabbitmq安装

七、安装Rabbitmq

  • 1.安装rabbitmq和Erlang语言环境
  • https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.6/rabbitmq-server-3.10.6-1.suse.noarch.rpm
  • wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm

①.安装Erlang环境

②.验证安装erlang环境是否成功

③.安装socat

④.安装rabbitmq

[root@zabbix-server ~]# rpm -Uvh erlang-solutions-2.0-1.noarch.rpm
[root@zabbix-server ~]# yum install erlang -y

[root@zabbix-server ~]# erl -v
Erlang/OTP 24 [erts-12.3.2.1] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1]

Eshell V12.3.2.1  (abort with ^G)
1> 
[root@zabbix-server ~]# yum install socat -y
[root@zabbix-server ~]# rpm -Uvh rabbitmq-server-3.10.6-1.suse.noarch.rpm 
warning: rabbitmq-server-3.10.6-1.suse.noarch.rpm: Header V4 RSA/SHA512 Signature, key ID 6026dfca: NOKEY
Preparing...                          ################################# [100%]
Updating / installing...
   1:rabbitmq-server-3.10.6-1.suse    ################################# [100%]

①.启动rabbitmq

②.查看状态,设置rabbitmq开机自启动

[root@zabbix-server ~]# systemctl start rabbitmq-server.service
[root@zabbix-server ~]# systemctl enable rabbitmq-server.service 
Created symlink from /etc/systemd/system/multi-user.target.wants/rabbitmq-server.service to /usr/lib/systemd/system/rabbitmq-server.service.

[root@zabbix-server ~]# systemctl status rabbitmq-server.service 
● rabbitmq-server.service - RabbitMQ broker
   Loaded: loaded (assets/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
   Active: active (running) since Sun 2022-07-31 23:18:35 CST; 3s ago
 Main PID: 2426 (beam.smp)
   CGroup: /system.slice/rabbitmq-server.service
           ├─2426 /usr/lib64/erlang/erts-12.3.2.1/bin/beam.smp -W w -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs ...
           ├─2441 erl_child_setup 32768
           ├─2470 /usr/lib64/erlang/erts-12.3.2.1/bin/epmd -daemon
           ├─2500 inet_gethost 4
           └─2501 inet_gethost 4

Jul 31 23:18:31 zabbix-server rabbitmq-server[2426]: Doc guides:  https://rabbitmq.com/documentation.html
Jul 31 23:18:31 zabbix-server rabbitmq-server[2426]: Support:     https://rabbitmq.com/contact.html
Jul 31 23:18:31 zabbix-server rabbitmq-server[2426]: Tutorials:   https://rabbitmq.com/getstarted.html
Jul 31 23:18:31 zabbix-server rabbitmq-server[2426]: Monitoring:  https://rabbitmq.com/monitoring.html
Jul 31 23:18:31 zabbix-server rabbitmq-server[2426]: Logs: /var/log/rabbitmq/rabbit@zabbix-server.log
Jul 31 23:18:31 zabbix-server rabbitmq-server[2426]: /var/log/rabbitmq/rabbit@zabbix-server_upgrade.log
Jul 31 23:18:31 zabbix-server rabbitmq-server[2426]: <stdout>
Jul 31 23:18:31 zabbix-server rabbitmq-server[2426]: Config file(s): (none)
Jul 31 23:18:35 zabbix-server rabbitmq-server[2426]: Starting broker... completed with 0 plugins.
Jul 31 23:18:35 zabbix-server systemd[1]: Started RabbitMQ broker.

  • 2.docker安装rabbitmq
[root@zabbix-server ~]# docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

[root@zabbix-server ~]# docker ps -a
CONTAINER ID   IMAGE                 COMMAND                  CREATED          STATUS    PORTS     NAMES
3e9954136db4   rabbitmq:management   "docker-entrypoint.s…"   22 seconds ago   Created             myrabbit

[root@zabbix-server ~]# docker images
REPOSITORY   TAG          IMAGE ID       CREATED        SIZE
rabbitmq     management   6c3c2a225947   7 months ago   253MB


#由于之前手动安装的rabbitmq和docker安装的rabbitmq存在着端口冲突
#我们需要手动关停rabbitmq,启动docker rabbitmq

[root@zabbix-server ~]# systemctl stop rabbitmq-server.service 
[root@zabbix-server ~]# docker start 3e9954136db4
3e9954136db4

[root@zabbix-server ~]# docker ps -a
CONTAINER ID   IMAGE                 COMMAND                  CREATED         STATUS         PORTS                                                                                                                                                                                                                                                                               NAMES
3e9954136db4   rabbitmq:management   "docker-entrypoint.s…"   2 minutes ago   Up 4 seconds   4369/tcp, 0.0.0.0:1883->1883/tcp, :::1883->1883/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp, 0.0.0.0:61613->61613/tcp, :::61613->61613/tcp, 15691-15692/tcp, 0.0.0.0:25673->25672/tcp, :::25673->25672/tcp   myrabbit


做完上述操作后,浏览器访问rabbitmq,账号密码:“admin”,登录成功则表示安装成功.


Rabbitmq Web界面管理授权


八、启动web页面访问功能

[root@zabbix-server ~]# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@zabbix-server:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@zabbix-server...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

started 3 plugins.

浏览器输入:启动rabbitmq的主机ip地址:15672

①.rabbitmq默认访问账户密码:“guest”、“guest”

②.由于这个账号仅限于localhost本地访问,所以我们要授权账号访问

在这里插入图片描述

九、授权账号设置密码

1.创建一个账户admin和密码

[root@zabbix-server ~]# rabbitmqctl add_user admin admin

2.为设置用户分配操作权限

在rabbitmq中用户级别分为如下几个

①.administrator:可以登录控制台,查看所有的信息,可以对rabbitmq进行管理

②.monitoring:监控者,登录控制台,指定策略

③.policymaker:策略制定者,登录控制台,指定策略

④.managment:普通管理员,登录控制台

[root@zabbix-server ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...

#设置admin用户具有访问根目录的所有权限
[root@zabbix-server ~]# rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/" ...


rabbitmqctl add_user 账号 密码
rabbitmqctl set_user_tags 账号 administrator 将账号赋予administrator账号类型
rabbitmqctl change_password Username Newpassword 修改密码
rabbitmqctl delete_user Username 删除用户
rabbitmqctl list_users 查看用户清单
rabbitmqctl.bat set_permissions -p / 用户名 ".*" ".*" ".*" 为用户设置administrator角色
rabbitmqctl.bat set_permissions -p / root ".*" ".*" ".*"

Rabbitmq角色分类

十、Rabbitmq角色分类

1.none:

不能访问management plugin,无法登录Rabbitmq web管理界面.

2.management:

①.可以列出自己通过AMQP登录的虚拟机

②.查看自己的虚拟机节点,virtual hosts和却学生,exchange和bindings信息

③.查看和关闭自己的channel和connections

④.查看有关自己的虚拟机节点和virtual hosts统计信息包括其它用户在这个节点virtual hosts中的活动信息,有点类似于个人中心,只能查看到和自己相关的信息.

3.Policymaker:

①.包含management所有权限

②.查看和创建、删除自己的virtual hosts所属的policies和parameters信息。

4.Monitoring:

①.包含management所有权限

②.罗列出所有的virtual hosts,包括不能登录virtual hosts.

③.查看其它用户connection和channels信息

④.查看节点级别的数据如clustering和memory使用情况

⑤.查看所有的virtual hosts的全局统计信息

5.Administrator:

①.最高权限

②.可以创建和删除virtual hosts

③.可以查看,创建和删除users

④.查看创建permissions

⑤.关闭所有用户的connection


Rabbitmq组件和架构

十一、AMQP

  • 1.什么是AMQP

AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议),是应用层协议的一个开发标准,为面向消息的中间件设计的一款协议.

  • 2.Rabbitmq数据流转的过程

①.数据生产者和broker节点之间首先验证密码成功后建立数据传输连接,一般一次连接连接会产生多个通道(channel).

②.成立通道后开始发送数据消费,在数据消费前有个应答过程,一般情况下是手工应答 ,代码队ack进行确认,这一过程又被称之可靠消费.

十二、Rabbitmq的核心组成部分

AMQP简介:

Rabbitmq中的交换机、交换机类型、队列、绑定、路由键等都遵循AMQP协议中相对应的概念,AMQP是应用层的协议,包括三层

  • Module Layer:位于协议的最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑.例如客户端可以使用Queue.Declare命令声明一个队列或者使用Basic.Consume订阅一个队列中的消息
  • Session Layer:位于中间层,主要负责将客户端的命令发送给服务器,再将服务器的应答返回给客户端,为客户端与服务器之间的通信提供可靠性同步机制和错误处理
  • Transport Layer:位于最底层,主要传输二进制数据流,提供帧处理、信道复用、错误检测和数据表示
  • 1.核心概念

在这里插入图片描述

**Server:**又称Broker,接收客户端的连接,实现AMQP实体服务,安装rabbitmq-server.

**Connection:**连接,应用程序与Broker的网络连接TCP/IP三次、四次握手.

**Channel:**网络通道,几乎所有的操作都在channel中进行,channel是进行消息读写的通道,客户端可以建立对各Channel,每个channel代表一个回话任务.

**Message:**消息,服务与应用程序之间传送的数据,由Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的具体内容.

**Bindings:**exchange和queue之间的虚连接,binding中可以保护多个routing key.

**Routing Key:**是一个路由规则,虚拟机可以用它来确定如何路由一个特定的消息.

**Queue:**队列:成为Message Queue消息队列,保存消息并将它们发给消费者.

  • 2.Rabbitmq的整体架构

在这里插入图片描述

数据生产者将数据投递给rabbitmq的服务节点,通过rabbitmq交换机投递给队列,如果要进行数据过滤则添加Routingkey,对投递目标进行匹配.

  • 3.Rabbitmq的运行流程

在这里插入图片描述

  • 为什么AMQP协议需要采用通道来代替连接请求?

如果使用三次握手四次挥手则会产生延时,延时会产生性能的损耗,所以 AMQP协议所有的操作都在 Channel 中进行, Channel 是进行消息读写的通道,客户端可以建立对各的 Channel,每个 Channel 代表一个回话任务

  • 创建虚拟节点(Virtual Host)

在这里插入图片描述

  • 进入创建的虚拟节点

在这里插入图片描述

# Rabbitmq工作模式:
> rabbitmq采用通道和不同的队列建立连接,通过broker上的virtual host的交换机投递给Queue,每个virtual host都有一个默认的交换机,最后由queue(队列)经过channel到达consumer(消费者)

Rabbitmq 队列模式

简单模式(Simple)

  • 工作模式(Work):

  • web操作查看视频

  • 类型

  • 分发机制

  • 发布订阅模式:

  • web操作查看视频

  • 类型:fanout

  • 特点:Fanout一发布与订阅模式,是一种广播机制,它是没有路由key的模式

  • 路由模式

  • Web操作查看视频

  • 类型:direct

  • 特点:有routing-key的匹配模式

  • 主题Topic模式

  • web操作查看视频

  • 类型:topic

  • 特点:模糊的routing-key的匹配模式

  • 参数模式

  • web操作查看视频

  • 类型:headers

  • 热点:参数匹配模式


Rabbitmq工作模式

十三、Rabbitmqmq不同模式

(1)、简单队列模式

在这里插入图片描述

使用图形化界面输出“Hello,World”

  1. 新建一个queue(队列)

在这里插入图片描述

02.使用默认Exchanges(交换机)发送一条消息

在这里插入图片描述

  • 键入“Hello,World!”,点击Publish Message

    看到Message published则表示发送成功

在这里插入图片描述

  • 验证

在这里插入图片描述

在这里插入图片描述

Tips:

在使用Rabbitmq的时候,**Queue(队列)**的消费模式选择Nack message requeue true模式,不要选择Ack模式,如果选择了Ack那么在进行数据投递的时候会导致数据在传递的时候完成一次消费,无法让接收的队列接收,从而导致数据丢失.

(2)、发布与订阅模式

  • 创建一个类型为fanout模式的exchange

在这里插入图片描述

  • 创建三个Queues

在这里插入图片描述

  • 在exchange上绑定相应的queue

在这里插入图片描述

在这里插入图片描述

  • 在fanout-exchang上发送一条消息

在这里插入图片描述

  • 验证:在queue上查看发布的消息

在发布与订阅模式里,只要是所有绑定了对应的交换机的queue都能收到对应交换机发过来的消息

在这里插入图片描述

(3)、路由模式

路由模式:

在一个交换机里绑定了多个queue,交换机通过指定RouteKey来决定这个消息具体投递到哪个queue,通过这个Routekey可以决定哪些queue收到这些消息.

有点类似于一个过滤 的条件

  • 创建Direct模式的Exchange

在这里插入图片描述

  • 绑定关系

在这里插入图片描述

  • 发送消息

在这里插入图片描述

  • 验证:看到Email的queue1收到了消息,但是其它的queue没有

在这里插入图片描述

(4)、主题模式(Topic)

  • 创建一个Topic类型的exchange

在这里插入图片描述

  • 绑定相应的Queue

在这里插入图片描述

查看Queue(队列)

  • #表示匹配多少个层级,对RoutingKeycom进行多少级的层级匹配,在队列匹配中可有可无
  • *表示匹配层级一级,表示对这个队列匹配必须要有一个#层级对它进行匹配

在这里插入图片描述

在这里插入图片描述

匹配两个队列(com、order):

在这里插入图片描述

Logo

开源、云原生的融合云平台

更多推荐