一、简介

emqx消息服务器

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP 是出色的软实时(Soft-Realtime)、低延时(Low-Latency)、分布式(Distributed) 的语言平台。MQTT 是轻量的(Lightweight)、发布订阅模式(PubSub) 的物联网消息协议。

mqtt协议

MQTT 是基于 发布(Publish)/订阅(Subscribe) 模式来进行通信及数据交换的,与 HTTP 的 请求(Request)/应答(Response) 的模式有本质的不同。
订阅者(Subscriber) 会向 消息服务器(Broker) 订阅一个 主题(Topic) 。成功订阅后,消息服务器会将该主题下的消息转发给所有的订阅者。
主题(Topic)以 ‘/’ 为分隔符区分不同的层级。包含通配符 ‘+’ 或 ‘#’ 的主题又称为 主题过滤器(Topic Filters); 不含通配符的称为 主题名(Topic Names)
mqtt控制报文有14种:
CONNECT – 连接服务端
CONNACK – 确认连接请求
PUBLISH – 发布消息
PUBACK –发布确认
PUBREC – 发布收到(QoS 2,第一步)
PUBREL – 发布释放(QoS 2,第二步)
PUBCOMP – 发布完成(QoS 2,第三步)
SUBSCRIBE - 订阅主题
SUBACK – 订阅确认
UNSUBSCRIBE –取消订阅
UNSUBACK – 取消订阅确认
PINGREQ – 心跳请求
PINGRESP – 心跳响应
DISCONNECT –断开连接

本文只讲解如何安装部署单机版和集群版emqx,使用emqx进行数据消费存储请移步我的上一篇文章 java实现emqx共享订阅,若对您有帮助请收藏评论,谢谢支持。

二、单机版安装emqx服务器

单机版安装emqx的方式有很多这里介绍几种常用的安装方法,我本人使用的环境是centOS 7.8虚拟机。

2.1 Shell 脚本一键安装

相比其他的安装方式shell脚本算是最方便省事的了。
1 .执行shell命令:

curl https://repos.emqx.io/install_emqx.sh | bash

这个命令意思是下载install_emqx.sh脚本文件并执行,有兴趣的小伙伴可以看看这个脚本的具体操作。我贴在下方:

#! /bin/sh
set -e

case $1 in
    "emqx-ee")
        broker="emqx-ee"
        package="emqx-ee"
        ;;
    "emqx-edge")
        broker="emqx-edge"
        package="emqx-edge"
        ;;
    *)
        broker="emqx-ce"
        package="emqx"
        ;;
esac

[ -f /etc/redhat-release ] && ISCENTOS=true
[ -f /etc/debian_version ] && ISDEB=true
[ ! -z "$(cat /etc/os-release |grep -o openSUSE)" ] && ISSUSE=true

if [ ! -z $ISDEB ]; then
    apt update && apt install -y \
        apt-transport-https \
        ca-certificates \
        curl \
        gnupg-agent \
        software-properties-common \
        lsb-core

    curl -fsSL https://repos.emqx.io/gpg.pub | apt-key add -

    [ $(lsb_release -d | awk '{print $2}') = 'Ubuntu' ] && add-apt-repository \
        "deb [arch=amd64] https://repos.emqx.io/${broker}/deb/ubuntu/ \
        ./$(lsb_release -cs) \
        stable"

    [ $(lsb_release -d | awk '{print $2}') = 'Debian' ] && add-apt-repository \
        "deb [arch=amd64] https://repos.emqx.io/${broker}/deb/debian/ \
        ./$(lsb_release -cs) \
        stable"

    apt update && apt install -y ${package}
    echo "EMQ X install success"
fi

if [ ! -z $ISSUSE ]; then
    zypper in -y curl rsyslog
    curl -L -o /tmp/gpg.pub https://repos.emqx.io/gpg.pub
    rpmkeys --import /tmp/gpg.pub
    zypper ar -f -c https://repos.emqx.io/${broker}/redhat/opensuse/leap/stable emqx
    zypper in -y ${package}
    echo "EMQ X install success"
fi

if [ ! -z $ISCENTOS ];then
    yum install -y yum-utils \
        device-mapper-persistent-data \
        lvm2

    num=$(cat /etc/redhat-release | grep -o [0-9] |head -n 1)
    if [ $num = 7 ];then
        yum-config-manager \
            --add-repo \
            https://repos.emqx.io/${broker}/redhat/centos/7/${broker}.repo
    fi

    if [ $num = 6 ]; then
        yum-config-manager \
            --add-repo \
            https://repos.emqx.io/${broker}/redhat/centos/6.8/${broker}.repo
    fi

    yum install -y ${package}
    echo "EMQ X install success"
fi

看到如下表示成功:
在这里插入图片描述

  1. 启动emqx
    执行命令:
emqx start

3.查看emqx状态:

emqx_ctl status

显示

[root@localhost ~]# emqx_ctl status
Node 'emqx@127.0.0.1' is started
emqx 4.2.0 is running

表示安装启动成功,访问http://ip:18083进入控制台

4.停止emqx

emqx stop

5.卸载emqx
卸载emqx我们看一下他的shell便可知脚本是通过yum安装的所以卸载命令:

yum remove emqx

2.2 yum安装

1.安装所需要的依赖包

yum install -y yum-utils device-mapper-persistent-data lvm2

2.使用以下命令设置稳定存储库,以 CentOS7 为例

yum-config-manager --add-repo https://repos.emqx.io/emqx-ce/redhat/centos/7/emqx-ce.repo

3.安装最新版本的 EMQ X Broker

yum install emqx

4.启动 EMQ X Broker

emqx start
emqx 4.0.0 is started successfully!

5.查看启动状态

emqx_ctl status
Node 'emqx@127.0.0.1' is started
emqx v4.0.0 is running

6.停止 EMQ X Broker

emqx stop

7.卸载 EMQ X Broker

yum remove emqx

2.3 rpm安装

1.下载安装包,地址:
链接: https://www.emqx.io/downloads/broker/?osType=Linux
选择一个版本的emqx这里我选择4.1.0下载完毕上传至linux系统
2.安装

rpm -ivh emqx-centos7-v4.1.0.x86_64.rpm

出现:
在这里插入图片描述
表示安装成功!
3.启动

emqx start

4.查看状态

emqx_ctl status

5.停止

emqx stop

6.卸载

 rpm -e emqx

2.4 docker安装

关于docker安装我已写好docker的脚本在:
https://github.com/itwwj/iot-project.git中的 docs/docker/emq目录下

1.编写docker脚本

#!/bin/bash

docker stop emqx
docker rm emqx
docker run -d --name emqx --restart=always  \
      -p 1883:1883 \
      -p 8083:8083 \
      -p 8883:8883 \
      -p 8084:8084 \
      -p 18083:18083 \
      -v `pwd`/conf/emqx_auth_mysql.conf:/opt/emqx/etc/plugins/emqx_auth_mysql.conf \
      -v `pwd`/conf/emqx.conf:/opt/emqx/etc/emqx.conf \
      -v `pwd`/conf/acl.conf:/opt/emqx/etc/acl.conf \
      -v /etc/localtime:/etc/localtime \
      --privileged=true \
      emqx/emqx:v4.0.0

这里我把三个常用的配置文件做了挂载,不想从容器里复制的小伙伴可以去我的项目里复制。
2.启动
启动时候直接运行start.sh脚本即可,但是直接运行会报错,先运行几个命令:

#start.sh同级目录下执行
chmod u+x *.sh
sed -i "s/\r//" start.sh
#conf目录下执行
chmod 777 *.conf

3.停止

docker stop emqx

4.卸载

docker rm emqx

三、配置认证和鉴权插件

3.1 emqx目录

emqx的配置文件目录在 /etc/emqx 路径下
在这里插入图片描述
日志目录在 /var/log/emqx 路径下
在这里插入图片描述
数据文件目录在 /var/lib/emqx 路径下
在这里插入图片描述

3.2 配置基于MySQL的认证和acl

这里的认证主要是使用官方的插件emqx_auth_mysql
编辑认证插件配置文件:

vi /etc/emqx/plugins/emqx_auth_mysql.conf

## 主要修改的地方如下
## 服务器地址
auth.mysql.server = 192.168.1.17:3306
## mysql用户名
auth.mysql.username = root
## 密码
auth.mysql.password = root
## 数据库名
auth.mysql.database = mqtt
## 认真比对以下认证sql是否和数据库一致
auth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1
## 数据库数据加盐规则与算法 这里为了测试方便我使用 plain 无规则
auth.mysql.password_hash = plain
## 对比以下acl鉴权sql是否和数据库信息一致
auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

将下列sql脚本导入数据库

CREATE TABLE `mqtt_user` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `username` varchar(100) DEFAULT NULL,
  `password` varchar(100) DEFAULT NULL,
  `salt` varchar(35) DEFAULT NULL,
  `is_superuser` tinyint(1) DEFAULT 0,
  `created` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_username` (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 添加一个默认的连接账号
INSERT INTO `mqtt_user` ( `username`, `password`, `salt`, `is_superuser`)
VALUES
    ('root', 'root', NULL, 0);

CREATE TABLE `mqtt_acl` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `allow` int(1) DEFAULT 1 COMMENT '0: deny, 1: allow',
  `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',
  `username` varchar(100) DEFAULT NULL COMMENT 'Username',
  `clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',
  `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
  `topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 所有用户不可以订阅系统主题
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (0, NULL, '$all', NULL, 1, '$SYS/#');

-- 允许 本机 上的客户端订阅系统主题
INSERT INTO mqtt_acl (allow, ipaddr, username, clientid, access, topic) VALUES (1, '127.0.0.1', NULL, NULL, 1, '$SYS/#');

这里说明一下:
数据库脚本字段中认证脚本:
username:客户端用户
password:客户端密码
salt:密码盐值
is_superuser :是否为超级用户,是的话就跳过acl验证
created :创建时间
acl鉴权脚本:
allow: 禁止(0),允许(1)
ipaddr:设置 IP 地址
username:连接客户端的用户名,此处的值如果设置为 $all 表示该规则适用于所有的用户
clientid:连接客户端的 Client ID
access:允许的操作:订阅(1),发布(2),订阅发布都可以(3)
topic:控制的主题,可以使用通配符,并且可以在主题中加入占位符来匹配客户端信息,例如 t/%c 则在匹配时主题将会替换为当前客户端的 Client ID

MySQL 8.0 及以后版本使用了 caching_sha2_password 作为默认身份验证插件,受限于客户端驱动你必须将其更改为 mysql_native_password 插件:

ALTER USER 'your_username'@'your_host' IDENTIFIED WITH mysql_native_password BY 'your_password';

登录控制台或使用命令开启插件:
控制台:
http://192.168.1.177:18083/plugins
默认账号:admin 密码:public
插件页面启动mysql鉴权插件

四、集群配置

这里我准备三台虚拟机做演示,分别是
node-1:192.168.1.16
node-2:192.168.1.17
node-3:192.168.1.177

4.1集群设计

在集群前先来了解一下emq的分布式集群设计:
在这里插入图片描述
EMQ X 分布式的基本功能是将消息转发和投递给各节点上的订阅者,为实现此过程,EMQ X 维护了几个与之相关的数据结构:订阅表,路由表,主题树。

订阅表: 主题 - 订阅者
MQTT 客户端订阅主题时,EMQ X 会维护主题(Topic) -> 订阅者(Subscriber) 映射的订阅表。订阅表只存在于订阅者所在的 EMQ X 节点上
路由表: 主题 - 节点
同一集群的所有节点,都会复制一份主题(Topic) -> 节点(Node) 映射的路由表
主题树: 带统配符的主题匹配
除路由表之外,EMQ X 集群中的每个节点也会维护一份主题树(Topic Trie) 的备份

消息派发过程
当 MQTT 客户端发布消息时,所在节点会根据消息主题,检索路由表并转发消息到相关节点,再由相关节点检索本地的订阅表并将消息发送给相关订阅者。

例如 client1 向主题 t/a 发布消息,消息在节点间的路由与派发流程:

client1 发布主题为 t/a 的消息到节点 node1
node1 通过查询主题树,得知 t/a 可匹配到现有的 t/a、t/# 这两个主题。
node1 通过查询路由表,得知主题 t/a 只在 node3 上有订阅者,而主题 t/# 只在 node2 上有订阅者。故 node1 将消息转发给 node2 和 node3。
node2 收到转发来的 t/a 消息后,查询本地订阅表,获取本节点上订阅了 t/# 的订阅者,并把消息投递给他们。
node3 收到转发来的 t/a 消息后,查询本地订阅表,获取本节点上订阅了 t/a 的订阅者,并把消息投递给他们。
消息转发和投递结束。

4.2 多种节点发现策略

策略说明
manual手动命令创建集群
static静态节点列表自动集群
mcastUDP 组播方式自动集群
dnsDNS A 记录自动集群
etcd通过 etcd 自动集群
k8sKubernetes 服务自动集群

manual 手动创建集群
默认配置为手动创建集群,节点须通过 ./bin/emqx_ctl join \ 命令加入:

cluster.discovery = manual

基于 static 节点列表自动集群
配置固定的节点列表,自动发现并创建集群:

cluster.discovery = static
cluster.static.seeds = emqx1@127.0.0.1,emqx2@127.0.0.1

基于 mcast 组播自动集群
基于 UDP 组播自动发现并创建集群:

cluster.discovery = mcast
cluster.mcast.addr = 239.192.0.1
cluster.mcast.ports = 4369,4370
cluster.mcast.iface = 0.0.0.0
cluster.mcast.ttl = 255
cluster.mcast.loop = on

基于 DNS A 记录自动集群
基于 DNS A 记录自动发现并创建集群:

cluster.discovery = dns
cluster.dns.name = localhost
cluster.dns.app  = ekka

基于 etcd 自动集群
基于 etcd 自动发现并创建集群:

cluster.discovery = etcd
cluster.etcd.server = http://127.0.0.1:2379
cluster.etcd.prefix = emqcl
cluster.etcd.node_ttl = 1m

基于 kubernetes 自动集群
Kubernetes 下自动发现并创建集群:

cluster.discovery = k8s
cluster.k8s.apiserver = http://10.110.111.204:8080
cluster.k8s.service_name = ekka
cluster.k8s.address_type = ip
cluster.k8s.app_name = ekka

4.3集群搭建

4.3.1 基于 manual 手动创建集群(推荐)

1.编辑emqx.conf配置文件

vim /etc/emqx/emqx.conf

分别为三台服务更改配置:

cluster.discovery = manual
node.name=emq@ip #记得ip要写你的主机ip

2.重启emqx服务

emqx restart

3.启动集群
在三台服务器根目录下执行:

./bin/emqx_ctl cluster join emqx@192.168.1.177

这里192.168.1.177是手动选取的Leader 节点,其他的节点只要手动加入Leader 节点即可。
4.查看集群节点

./bin/emqx_ctl cluster status

在这里插入图片描述

5.退出集群

./bin/emqx_ctl cluster leave

4.3.2 基于 static 节点列表自动集群

1.分别编辑三台emqx的配置文件,更改的信息主要为:

vim /etc/emqx/emqx.conf

节点1:


#集群发现模式,静态发现,启动后不用输加入集群命令
cluster.discovery = static 
 
#集群列表,配合上面static发现策略使用
cluster.static.seeds = emqx1@192.168.1.16,emqx2@192.168.1.17,emqx3@192.168.1.177
 
#节点名
node.name = emqx1@192.168.1.16

节点2:


#集群发现模式,静态发现,启动后不用输加入集群命令
cluster.discovery = static 
 
#集群列表,配合上面static发现策略使用
cluster.static.seeds = emqx1@192.168.1.16,emqx2@192.168.1.17,emqx3@192.168.1.177
 
#节点名
node.name = emqx2@192.168.1.17

节点3:


#集群发现模式,静态发现,启动后不用输加入集群命令
cluster.discovery = static 
 
#集群列表,配合上面static发现策略使用
cluster.static.seeds = emqx1@192.168.1.16,emqx2@192.168.1.17,emqx3@192.168.1.177
 
#节点名
node.name = emqx3@192.168.1.177

2.重新启动三台节点的emqx:

emqx restart

3.查看集群状态

./bin/emqx_ctl cluster status

4.开启客户端订阅主题:
$SYS/brokers
收到以下系统主题回复,则代表集群成功
节点信息

4.3.3 基于 mcast 组播自动集群

组播需集群在同一局域网,大多数云厂商禁止组播,所以此方式不推荐生产模式使用
1.修改配置文件emqx.conf

cluster.discovery = mcast  #第35行
cluster.mcast.addr = 239.192.0.1 #第69行打开注释
cluster.mcast.ports = 4369,4370 #第74行打开注释
cluster.mcast.iface = 0.0.0.0 #第81行打开注释
cluster.mcast.ttl = 255 #第86行打开注释
cluster.mcast.loop = on #第91行打开注释

2.重启emqx

emqx restart

3.查看状态

cd /
./bin/emqx_ctl cluster status

4.4 nginx负载均衡

nginx负载我这里介绍两种,一种是直接进行负载均衡,一种是通过ssl连接进行负载。就操作来说直接负载简单易操作但是安全性不够。步骤4.4.2和4.4.3二者任选其一

4.4.1 安装nginx

1.编写脚本
我使用了docker方式进行安装docker脚本如下:

#!/bin/bash

docker stop nginx
docker run -idt -p 18883:8883 -p 28083:28083 --name nginx --restart=always \
    -v /data/projects/:/data/projects \
    -v `pwd`/conf/:/etc/nginx \
    -v `pwd`/logs/:/var/log/nginx  \
    -v `pwd`/html/crossdomain.xml:/usr/share/nginx/html/crossdomain.xml  \
    -e TZ="Asia/Shanghai" \
    nginx:1.17.0

我对配置文件进行了挂载,挂载文件不想自己配置可以去我的github下载项目,地址:https://github.com/itwwj/iot-project.git中的 docs/docker/nginx目录下

运行docker脚本

4.4.2 直接负载

1.编写nginx脚本注意路径在conf/conf.d/emq.conf

  upstream emqx_servers {
      server 192.168.1.16:1883  max_fails=3 fail_timeout=30s;
      server 192.168.1.17:1883  max_fails=3 fail_timeout=30s;
      server 192.168.1.177:1883  max_fails=3 fail_timeout=30s;
  }

  server {
      listen 8883;
      proxy_pass emqx_servers;
  }

2.编辑nginx.conf
打开引入emq.conf配置文件的注释
在这里插入图片描述
3.重启emqx

docker restart nginx

4.4.3 ssl终结负载

4.4.3.1 生成自签证书

1.下载ssl http://slproweb.com/products/Win32OpenSSL.html 也可以下载64位。
在这里插入图片描述
2.安装运行,安装就不介绍了直接点就可以。生成自签证书:

openssl genrsa -out ca-key.pem 2048
openssl req -x509 -new -nodes -key ca-key.pem -days 10000 -out ca.pem -subj "/CN=kube-ca"

去你执行的目录将证书拷出

4.4.3.2 配置nginx ssl

1.将证书拷至容器中
进入容器

docker exec -it nginx /bin/bash

在容器中创建 /etc/ssl 目录

mkdir /ssl

容器内安装sz、rz

apt-get update && apt-get install lrzsz

将证书上传至容器的 /ssl目录下,注意不要上传错位置。

2.配置nginx容器的ssl
编辑conf/conf.d/ssl.conf

  upstream emqx_servers {
      zone tcp_servers 64k;
      hash $remote_addr;
      server 192.168.1.16:1883 max_fails=2 fail_timeout=30s;
      server 192.168.1.17:1883 max_fails=2 fail_timeout=30s;
      server 192.168.1.177:1883 max_fails=2 fail_timeout=30s;
  }

  server {
      listen 8883 ssl;
      status_zone tcp_server;
      proxy_pass emqx_servers;
      proxy_buffer_size 4k;
      ssl_handshake_timeout 15s;
      ssl_certificate     /ssl/ca.pem;
      ssl_certificate_key /ssl/ca-key.pem;
  }

3.放开nginx.conf中的引入配置文件注释
在这里插入图片描述
4.重启nginx

docker restart nginx

4.4.4 验证集群

1.登录控制台:http://192.168.1.177:28083
在这里插入图片描述
可以看到有三个节点
2.客户端连接nginx
ssl连接需要加载ssl证书,直接负载就ssl选择false填写正确的连接信息。在这里插入图片描述
3.订阅集群主题
在这里插入图片描述
至此集群搭建和负载均衡成功!

Logo

更多推荐