MQTTEMQX认识,安装部署

一,认识

EMQX ,大规模分布式物联网 MQTT 消息服务器,高效可靠连接海量物联网设备,实时处理分发消息与事件流数据,助力构建关键业务的物联网平台与应用。是一款大规模可弹性伸缩的云原生分布式物联网 MQTT 消息服务器。MQTT 协议,是基于发布/订阅模式的物联网通信协议,凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山

EMQ :面向海量的 移动/物联网/车载 等终端接入,并实现在海量物理网设备间快速低延时的消息路由:

  1. 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持百万连接。
  2. 分布式节点集群,快速低延时的消息路由,单集群支持千万规模的路由。
  3. 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
  4. 完整物联网协议支持,MQTTMQTT-SNCoAPLwM2M、私有 TCP/UDP 协议支持。

MQTT 是基于 发布(Publish)/订阅(Subscribe) 模式来进行通信及数据交换的,与 HTTP 的 请求(Request)/应答(Response) 的模式有本质的不同。

订阅者(Subscriber) 会向 消息服务器(Broker) 订阅一个 主题(Topic) 。成功订阅后,消息服务器会将该主题下的消息转发给所有的订阅者。

主题(Topic)以 ‘/’ 为分隔符区分不同的层级。包含通配符 ‘+’ 或 ‘#’ 的主题又称为 主题过滤器(Topic Filters) ; 不含通配符的称为 主题名(Topic Names) 例如:

sensor/1/temperature

chat/room/subject

presence/user/feng

sensor/1/#

sensor/+/temperature

uber/drivers/joe/inbox


看下图我们认识一下mqtt 的通信过程

在这里插入图片描述

连接

设备与服务端建立长链接后,会派生出一个connect进程来维护设备与服务端的通信,客户端收到连接协议包,校验完权限后会建立session,session进程建立成功后会与连接进程建立双向绑定,之后连接进程给设备返回connack协议包;

订阅,取消订阅

conn进程收到SUBSCRIBE协议包后,校验权限后,调用主题,session进程投递消息,session进程处理完订阅逻辑后,给conn进程发送suback消息

断开连接

设备端发送DISCONNECT, conn进程执行正常stop,session进程监听到EXIT消息后自动退出。

心跳

心跳逻辑仅用的了conn进程,即conn收到PINGREQ立刻返回PINGRESP。

发布

协议控制包类型,详情见:http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718019

在这里插入图片描述

消息重传 (Message Retransmission)

官网地址:(http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718019)

消息重传,是属于 MQTT 协议标准规范的一部分。协议中规定了作为通信的双方 服务端客户端 对于自己发送到对端的 PUBLISH 消息都应满足其 服务质量 (Quality of Service levels) 的要求。如:

  • QoS 0:表示 最多一次交付;消息是根据底层网络的能力传递的。接收方不发送响应,发送方不执行重试。消息到达接收者一次或根本不到达。

  • QoS 1:表示 消息至少送达一次 (At least once delivery);即发送端会一直重发该消息,除非收到了对端对该消息的确认。意思是在 MQTT 协议的上层(即业务的应用层)相同的 QoS 1 消息可能会收到多次。

               PUBLISH
#1 Sender  --------------->  Receiver       (*)
               PUBACK
#2 Sender  <---------------  Receiver

涉及到 2 个报文;共 2 次发送动作,发送端和接收端各 1 次;这 2 个报文都持有相同的 PacketId。
行尾标记为 * 号的,表示发送方在等待确认报文超时后,可能会主动发起重传。
  • QoS 2:表示 消息只送达一次 (Exactly once delivery);即该消息在上层仅会接收到一次。

虽然,QoS 1 和 QoS 2 的 PUBLISH 报文在 MQTT 协议栈这一层都会发生重传,但请你谨记的是:

  • QoS 1 消息发生重传后,在 MQTT 协议栈上层,也会收到这些重发的 PUBLISH 消息。
  • QoS 2 消息无论如何重传,最终在 MQTT 协议栈上层,都只会收到一条 PUBLISH 消息
               PUBLISH
#1 Sender  --------------->  Receiver       (*)
               PUBREC
#2 Sender  <---------------  Receiver
               PUBREL
#3 Sender  --------------->  Receiver       (*)
               PUBCOM
#4 Sender  <---------------  Receiver

涉及到 4 个报文;共 4 次发送动作,发送端和接收端各 2 次;这 4 个报文都持有相同的 PacketId。
行尾标记为 * 号的,表示发送方在等待确认报文超时后,可能会主动发起重传

消息顺序

当然,以上的概念仅需要了解即可,你最需要关心的是,消息在被重复发送后,消息顺序出现的变化,尤其是 QoS 1 类的消息。例如:

假设,当前飞行窗口设置为 2 时,EMQX 计划向客户端的某主题投递 4 条 QoS 1 的消息。并假设客户端程序、或网络在中间出现过问题,那么整个发送流程会变成:

#1  [4,3,2,1 || ]   ----->   []
#2  [4,3 || 2, 1]   ----->   [1, 2]
#3  [4 || 3, 2]     ----->   [1, 2, 3]
#4  [4 || 3, 2]     ----->   [1, 2, 3, 2, 3]
#5  [ || 4]         ----->   [1, 2, 3, 2, 3, 4]
#6  [ || ]          ----->   [1, 2, 3, 2, 3, 4]

流程共 6 个步骤;左边表示 EMQX 的 消息队列 和 飞行窗口,以 || 分割;右侧表示客户端收到的消息顺序,其中每步表示:

  1. Broker 将 4 条消息放入消息队列中。
  2. Broker 依次发送 1 2,并将其放入 飞行窗口 中;客户端仅应答消息 1;且此时由于客户端发送流出现了问题,无法发送后续应答报文。
  3. Broker 收到消息 1 的应答;从飞行窗口移除消息 1;并将 3 发送出去;继续等待 2 3 的应答;
  4. Broker 等待应答超时,重发了报文 2 3;客户端收到重发的报文 2 3 并正常应答。
  5. Broker 从飞行窗口移除了消息 2 3,并发送报文 4;客户端收到了报文 4 并回复应答。
  6. 至此,所有报文处理完成。客户端收到的报文顺序为 [1, 2, 3, 2, 3, 4],并也依次上报给 MQTT 协议栈的上层。

虽然,存在重复的报文消息。但这是完全符合协议的规范的,每个报文第一次出现的位置都是有序的,并且重复收到的报文 2 3 的报文中,会携带一个标识位,表明其为重发报文。

MQTT 协议和 EMQX 将这个主题认为是 有序的主题 (Ordered Topic) 参见:MQTTv3.1.1 - Message ordering (opens new window)

它确保 相同的主题和 QoS 下,消息是按顺序投递和应答的

此外,如果用户期望所有主题下的 QoS 1 与 QoS 2 消息都严格有序,那么需要设置飞行窗口的最大长度为 1,但代价是会降低该客户端的吞吐。

详情请见:http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718101

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

优点:

  1. 源码开放。
  2. 单节点支持500万 MQTT设备连接 ,集群可扩展至一亿并发MQTT连接;
  3. 单节点支持每秒实时接收、移动、处理与分发数百万条的 MQTT 消息。
  4. 基于 OTP 软实时的运行时系统设计,消息分发与投递时延低于 1 毫秒
  5. 采用 Masterless 的大规模分布式集群架构,实现系统高可用和水平扩展。

当我们接触当新技术,要养成查看官网,以及相关文档的习惯。这样会极快的帮助我们高效的学习、掌握它。然后再根据搜索引擎进行查漏补缺。以下是EMQX相关连接

官网地址:https://www.emqx.io/zh

在这里插入图片描述

在线文档:https://www.emqx.io/docs/zh/v5.0/#%E4%BA%A7%E5%93%81%E4%BC%98%E5%8A%BF

在这里插入图片描述

下载地址:https://www.emqx.io/zh/downloads

在这里插入图片描述

二,安装

下载地址:https://www.emqx.io/zh/downloads

这里看自己需求,通过什么方式安装。

docker,安装

获取docker镜像

docker pull emqx/emqx:5.0.7

启动 Docker 容器

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.7

这里需要绑定映射出来配置文件,参考一下目录,这里不在多做解释,不会docker 绑定物理地址,请看我的docker 相关博客,比较简单 只需 命令 -v

在这里插入图片描述

linux,安装

二进制安装
  • 下载

    wget https://www.emqx.com/zh/downloads/broker/5.0.7/emqx-5.0.7-el8-amd64.rpm
    
  • 安装

    sudo yum install emqx-5.0.7-el8-amd64.rpm
    
  • 启动

    sudo emqx start
    

    相关目录

在这里插入图片描述

zip包安装

下载地址:https://github.com/emqx/emqx/releases/tag/v5.0.7

百度网盘地址:

在这里插入图片描述

为什么是选择下载这个?多看看文档介绍。

在这里插入图片描述

  1. 下载

    wget https://www.emqx.com/zh/downloads/broker/5.0.7/emqx-5.0.7-el7-amd64.tar.gz
    
  2. 解压

    tar -zxvf emqx-5.0.7-el7-amd64.tar.gz
    
  3. 查看目录

在这里插入图片描述

  1. 启动

    ./bin/emqx start
    ./bin/emqx_ctl status
    
  2. 停止 EMQX Broker

    ./bin/emqx stop
    
  3. 卸载 EMQX Broker

    直接删除 EMQX 目录即可
    

Linux环境注意版本,和依赖即可

三,部署

这里以docker 部署为例。

拉取镜像

docker pull emqx/emqx:5.0.7

在这里插入图片描述

启动容器

docker run -d  -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083  --restart=always  --name emqx [容器id]

top:对于命令不懂可以查看我之前docker系列博客

访问控制台:ip:18083

默认 用户名:admin 密码: public

在这里插入图片描述

首次进来是 英文, 在 setting 将 英文 改为简体中文即可

在这里插入图片描述

四,配置

编辑用户名密码配置文件

vim ./etc/emqx/plugins/emqx_auth_username.conf

在这里插入图片描述

关闭匿名登录

vim /etc/emqx/emqx.conf

查找allow_anonymous,修改为false

Top:小技巧 vi 下 可以 直接输入 /[字符] 回车查找, n 是下一个

五,测试,使用 mqtt.fx

界面介绍

在这里插入图片描述

配置服务器名称,IP地址,端口, 其他默认即可,如果服务器配置了不允许匿名,这里我们需要 User Credentials 菜单栏,配置账号密码

在这里插入图片描述

连接成功,先订阅主题 ,然后再发布

主题: 这里的主题,我们自定义即可,只要在稍后发布的时候能一致就行,然后点击subscribe订阅即可

在这里插入图片描述
在这里插入图片描述

可以看到已经有了一个客户端,而且可以看到有个一个主题了。

在这里插入图片描述
在这里插入图片描述

发布消息

在这里插入图片描述

查看消息

在这里插入图片描述

五,总结

大多是官网资料总结,如果在之前有接触过 ActiveMQ 这一类的消息中间件,理解起来相对简单一点

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐