天楚锐齿

物联网、大数据、云计算、通信、IT、嵌入式
29
10

EMQ(emqttd) 2.x 安装和使用(物联网传输控制协议的Broker)


支持下国产开源。


MQTT物联网传输控制协议:《MQTT-3.1.1-CN.pdf


下载:emqttd-centos64-v2.0-rc.2-20161019.zip


安装:

$ unzip emqttd-centos64-v2.0-rc.2-20161019.zip -d /data/

$ mv /data/emqttd /data/emqttd-centos64-v2.0-rc.2-20161019

$ ln -s /data/emqttd-centos64-v2.0-rc.2-20161019 /data/emqttd


系统优化配置:

# ulimit -n 1048576

$ sudo sysctl -w fs.file-max=2097152

$ sudo sysctl -w fs.nr_open=2097152

$ sudo sysctl -w net.core.somaxconn=65535

$ sudo sysctl -p


修改配置文件(node.cookie必须每台都要一样,node.name的@后面必须是ip地址或者fqdn方式的主机名):

$ cd /data/emqttd

$ vi etc/emq.conf

## Node name

node.name = emqttd@192.168.60.58

## Cookie for distributed node

node.cookie = emq_dist_cookie_533d99ckd9ji475

## Erlang Process Limit

node.process_limit = 2000000

## Sets the maximum number of simultaneously existing ports for this system

node.max_ports = 1000000

## Size of acceptor pool

mqtt.listener.tcp.acceptors = 64

## Maximum number of concurrent clients

mqtt.listener.tcp.max_clients = 1000000

## Rate Limit. Format is 'burst,rate', Unit is KB/Sec

## mqtt.listener.tcp.rate_limit = 100,10

## TCP Socket Options

mqtt.listener.tcp.backlog = 262144

## Distributed node port range

node.dist_listen_min = 6000

node.dist_listen_max = 6999

## 如果需要启用防火墙,则上面两行去掉注释,注意下面的防火墙端口设置,要打开该段端口。

## Expired after 1 day:

## w - week

## d - day

## h - hour

## m - minute

## s - second

mqtt.session.expired_after = 2w

# 上面为持久会话到期时间,从客户端断开算起,超时后客户端没有收到的消息会丢弃(不想丢失消息的话,该值就要设置的很大)。

## Console log. Enum: off, file, console, both

log.console = both

## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency

log.console.level = info

## Console log file

log.console.file = log/console.log

## Error log file

log.error.file = log/error.log

## Enable the crash log. Enum: on, off

log.crash = on

log.crash.file = log/crash.log


修改下启动脚本(在stop处增加一行):

$ vi bin/emqttd

...

    stop)

        # Wait for the node to completely stop...

        PID="$(relx_get_pid)"

        if ! relx_nodetool "stop"; then

            exit 1

        fi

        while $(kill -s 0 "$PID" 2>/dev/null);

        do

            sleep 1

        done

        killall epmd

        ;;


启动:

$ cd emqttd

直接进入控制台模式:

$ ./bin/emqttd console

后台运行模式:

$ ./bin/emqttd start

$ ./bin/emqttd_ctl status

$ ./bin/emqttd stop


开启防火墙:

端口:1883:MQTT协议tcp端口,8883:MQTT(SSL) tcp端口,8083:MTQQ(websocket)、HTTP API端口,18083:dashboard管理控制WEB端口,4369:集群处理epmd端口,6000-6999由上面配置文件定义的epmd需要的端口范围。

sudo firewall-cmd                      --zone=public --add-port=1883/tcp

sudo firewall-cmd --permanent --zone=public --add-port=1883/tcp

sudo firewall-cmd                      --zone=public --add-port=8883/tcp

sudo firewall-cmd --permanent --zone=public --add-port=8883/tcp

sudo firewall-cmd                      --zone=public --add-port=8083/tcp

sudo firewall-cmd --permanent --zone=public --add-port=8083/tcp

sudo firewall-cmd                      --zone=public --add-port=18083/tcp

sudo firewall-cmd --permanent --zone=public --add-port=18083/tcp

sudo firewall-cmd                      --zone=public --add-port=4369/tcp

sudo firewall-cmd --permanent --zone=public --add-port=4369/tcp

sudo firewall-cmd                      --zone=public --add-port=6000-6999/tcp

sudo firewall-cmd --permanent --zone=public --add-port=6000-6999/tcp

sudo firewall-cmd                      --zone=public --list-all

sudo firewall-cmd --permanent --zone=public --list-all


查看各台的启动状态:

http://192.168.60.55:8083/status

http://192.168.60.55:18083/      用户名/密码: admin / public

$ vi data/configs/vm.xxx.args

$ vi data/configs/app.xxx.conf


LOG位置:

$ vi log/


把节点加入集群:

在各个节点上执行(重复执行也没关系,其中192.168.60.55这台会提示错误:cannot_join_with_self,这个没关系,自己不用加入自己):

$ ./bin/emqttd_ctl cluster join emqttd@192.168.60.55

$ ./bin/emqttd_ctl cluster status

Cluster status: [{running_nodes,['emqttd@192.168.60.58',

                                 'emqttd@192.168.60.56',

                                 'emqttd@192.168.60.57',

                                 'emqttd@192.168.60.55']}]

把节点退出集群:

本机退出集群:

$ ./bin/emqttd_ctl cluster leave

把某节点退出集群:

$ ./bin/emqttd_ctl cluster remove emqttd@192.168.60.56



测试一下:

下载安装MTQQ协议的客户端:https://mosquitto.org/download/

$ sudo rpm -ivh libmosquitto1-1.4.10-1.1.x86_64.rpm

$ sudo rpm -ivh mosquitto-clients-1.4.10-1.1.x86_64.rpm

订阅:

$ mosquitto_sub -h 192.168.60.56 -p 1883 -t test_topic_1 -q 1 -c -i 1111

发布:另开一个ssh或者另外机器上执行:

$ mosquitto_pub -h 192.168.60.57 -p 1883 -t test_topic_1 -q 1 -m "hello mqtt 7"

参数:

-h: 连接到哪台broker。

-p: 连接端口。

-t: topic名字,topic类似文件系统的组织方式,以"/"分隔符来分层,订阅者订阅时可以使用通配符"+"和"#",发布者不能使用通配符。

-q: qos级别,默认为0,共三个值:0:至多一次,不保证到达订阅者;1:至少一次,保证到达订阅者,但不保证不重复;2:正好一次,保证到达,又保证不重复;非钱类的应用使用qos1就可以了。

-c: 如果订阅者退出,在broker保留所有订阅的消息,一旦重新连接上,则把所有消息发给订阅者,就是持久性订阅(clean_sess=false)。

-i: 设置client id,在即时通信时可以设置为用户id或者用户名等具有唯一性的字段。

-r: 发布方使用的参数,保留消息(每个topic只保留最后一个这种消息,之前的会覆盖 ),即使该消息被一个订阅者读取了,还会一直保留在broker,如果有新的订阅者订阅该topic,则马上会收到该消息,类似qq里面的公告性消息,这类信息的删除,发送一个payload为空的null消息即可:$ mosquitto_pub -h 192.168.60.56 -p 1883 -t test_topic_1 -q 1 -r -n  -u user -P 123456。

-m: 该topic的一条消息内容。



加入MariaDB的认证:

修改配置:

$ vi ./etc/emq.conf

## Allow Anonymous authentication

mqtt.allow_anonymous = false

## Default ACL File

mqtt.acl_file = etc/acl.conf


$ vi ./data/loaded_plugins

emq_dashboard.

emq_auth_mysql.


修改plugins配置:

$ vi etc/plugins/emq_auth_mysql.conf

## Mysql Server

auth.mysql.server = 192.168.60.60:3306

## Mysql Pool Size

auth.mysql.pool = 8

## Mysql Username

auth.mysql.username = root

## Mysql Password

auth.mysql.password = 123456

## Mysql Database

auth.mysql.database = mqtt

## Variables: %u = username, %c = clientid

## Authentication Query: select password only

auth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1

## Password hash: plain, md5, sha, sha256, pbkdf2

auth.mysql.password_hash = plain

## %% Superuser Query

auth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1

## ACL Query Command

auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

## ACL nomatch

auth.mysql.acl_nomatch = deny


往MariaDB插入初始化数据库和表(引擎需要改成InnDB):

认证的用户表也可以共用其他系统的用户表,由上面的emq_auth_mysql.conf来配置:auth.mysql.auth_query、auth.mysql.super_query、auth.mysql.acl_query:

建立mqtt数据库:

$ mysql -uroot -p123456 -hxxxx

MariaDB [test]> create database mqtt;

用户表:

MariaDB [test]> CREATE TABLE `mqtt_user` (

  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,

  `username` varchar(100) DEFAULT NULL,

  `password` varchar(100) DEFAULT NULL,

  `salt` varchar(20) DEFAULT NULL,

  `is_superuser` tinyint(1) DEFAULT 0,

  `created` datetime DEFAULT NULL,

  PRIMARY KEY (`id`),

  UNIQUE KEY `mqtt_username` (`username`)

) DEFAULT CHARSET=utf8;

用户表插入测试数据:

MariaDB [mqtt]> INSERT INTO mqtt_user (id, username, password, salt, is_superuser, created)

VALUES

    (1,'superuser','123456','123456',True,'2016-10-26 10:00:00'),

    (2,'user','123456','123456',False,'2016-10-26 10:01:00');

acl表:

MariaDB [mqtt]> CREATE TABLE `mqtt_acl` (

  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,

  `allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',

  `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',

  `username` varchar(100) DEFAULT NULL COMMENT 'Username',

  `clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',

  `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',

  `topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',

  PRIMARY KEY (`id`)

) DEFAULT CHARSET=utf8;

acl表插入默认数据(注意带$SYS为broker保留的特殊topic用来统计使用,username为$all表示所有在user表中的用户):

MariaDB [mqtt]> INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)

VALUES

    (1,1,NULL,'$all',NULL,2,'#'),

    (2,1,NULL,'$all',NULL,1,'#'),

    (3,1,NULL,'$all',NULL,3,'#'),

    (4,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),

    (5,1,'127.0.0.1',NULL,NULL,2,'#'),

    (6,1,NULL,'dashboard',NULL,1,'$SYS/#');


测试,重启每台服务器后执行:

$ mosquitto_sub -h 192.168.60.55 -p 1883 -t test_topic_1 -q 1 -c -i 1112  -u user -P 123456

$ mosquitto_pub -h 192.168.60.55 -p 1883 -t test_topic_1 -q 1 -m "hello mqtt 1" -u user -P 123456



重新初始化emq:

$ bin/emqttd stop

$ rm -rf data/mnesia/*

$ rm -rf data/configs/*

$ rm -rf log/*

$ bin/emqttd start

$ bin/emqttd_ctl cluster join emqttd@192.168.60.55



使用Haproxy来实现负载分担:

因为emq的topic和消息在集群的各台服务器上一致,所以数据不能以增加机器的方式扩容,只能增加每台的内存,和客户端的连接则可以以增加机器方式扩容。

$ vi /etc/haproxy/haproxy.cfg

################## EMQ ######################

listen emq_emqttd *:1883

    mode tcp

    balance source

    log global

    timeout connect         25s

    timeout client          0

    timeout server          0

    option tcpka

    option tcplog

    option tcp-check

    server emq1 192.168.60.55:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25

    server emq2 192.168.60.56:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25

    server emq3 192.168.60.57:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25

    server emq4 192.168.60.58:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25


开启haproxy上的防火墙:

sudo firewall-cmd                      --zone=public --add-port=1883/tcp

sudo firewall-cmd --permanent --zone=public --add-port=1883/tcp

sudo firewall-cmd                      --zone=public --list-all

sudo firewall-cmd --permanent --zone=public --list-all



使用:

broker操作:

显示broker状态:

$ ./bin/emqttd_ctl status

显示broker版本、启用时间等:

$ ./bin/emqttd_ctl broker

显示broker的pubsub进程状态、内存、队列长度、规约数等:

$ ./bin/emqttd_ctl broker  pubsub

显示broker的统计信息:客户端连接数、会话数、主题数、订阅数、路由数等:

$ ./bin/emqttd_ctl broker stats

显示broker的测量信息:底层流量、MQTT报文数、消息数等:

$ ./bin/emqttd_ctl broker metrics

Cluster操作:

$ ./bin/emqttd_ctl cluster

cluster join <Node>                             # Join the cluster

cluster leave                                   # Leave the cluster

cluster remove <Node>                           # Remove the node from cluster

cluster status                                  # Cluster status

Client操作:

$ ./bin/emqttd_ctl clients

clients list                                    # List all clients

clients show <ClientId>                         # Show a client

clients kick <ClientId>                         # Kick out a client

Sessions操作:

$ ./bin/emqttd_ctl sessions

sessions list                                   # List all sessions

sessions list persistent                        # List all persistent sessions ,桥接Bridge的时候才会用到(即clean_sess=false的类型)

sessions list transient                         # List all transient sessions

sessions show <ClientId>                        # Show a session

Routes操作:

$ ./bin/emqttd_ctl routes

routes list                                     # List all routes

routes show <Topic>                             # Show a route

Topics操作:

$ ./bin/emqttd_ctl topics

topics list                                     # List all topics

topics show <Topic>                             # Show a topic

Subscription订阅者操作:

$ ./bin/emqttd_ctl subscriptions

subscriptions list                              # List all subscriptions

subscriptions show <ClientId>                   # Show subscriptions of a client

subscriptions add <ClientId> <Topic> <QoS>      # Add a static subscription manually

subscriptions del <ClientId>                    # Delete static subscriptions manually

subscriptions del <ClientId> <Topic>            # Delete a static subscription manually

Plugins插件操作:

$ ./bin/emqttd_ctl plugins

plugins list                                    # Show loaded plugins

plugins load <Plugin>                           # Load plugin

plugins unload <Plugin>                         # Unload plugin

$ ./bin/emqttd_ctl plugins list

Plugin(emq_auth_clientid, version=2.0, description=Authentication with ClientId/Password, active=false)

Plugin(emq_auth_http, version=2.0, description=Authentication/ACL with HTTP API, active=false)

Plugin(emq_auth_ldap, version=2.0, description=Authentication/ACL with LDAP, active=false)

Plugin(emq_auth_mongo, version=2.0, description=Authentication/ACL with MongoDB, active=false)

Plugin(emq_auth_mysql, version=2.0, description=Authentication/ACL with MySQL, active=false)

Plugin(emq_auth_pgsql, version=2.0, description=Authentication/ACL with PostgreSQL, active=false)

Plugin(emq_auth_redis, version=2.0, description=Authentication/ACL with Redis, active=false)

Plugin(emq_auth_username, version=2.0, description=Authentication with Username/Password, active=false)

Plugin(emq_coap, version=0.2, description=CoAP Gateway, active=false)

Plugin(emq_dashboard, version=2.0, description=Dashboard, active=true)

Plugin(emq_mod_rewrite, version=2.0, description=Rewrite Module, active=false)

Plugin(emq_plugin_template, version=2.0, description=EMQ Plugin Template, active=false)

Plugin(emq_recon, version=2.0, description=Recon Plugin, active=false)

Plugin(emq_reloader, version=2.0, description=Reloader Plugin, active=false)

Plugin(emq_sn, version=0.2, description=MQTT-SN Gateway, active=false)

Plugin(emq_stomp, version=2.0, description=Stomp Protocol Plugin, active=false)

Bridges桥接操作:

$ ./bin/emqttd_ctl bridges

bridges list                                    # List bridges

bridges options                                 # Bridge options

bridges start <Node> <Topic>                    # Start a bridge

bridges start <Node> <Topic> <Options>          # Start a bridge with options

bridges stop <Node> <Topic>                     # Stop a bridge

vm虚机(erlang虚机)性能查看:

$ ./bin/emqttd_ctl vm all

cpu/load1               : 0.05

cpu/load5               : 0.05

cpu/load15              : 0.07

memory/total            : 166404232

memory/processes        : 32564328

memory/processes_used   : 32563952

memory/system           : 133839904

memory/atom             : 959633

memory/atom_used        : 954350

memory/binary           : 46088

memory/code             : 28250535

memory/ets              : 5741416

process/limit           : 2097152

process/count           : 288

io/max_fds              : 1000000

io/active_fds           : 1

ports/count           : 25

ports/limit           : 1048576

端口使用情况查看:

$ ./bin/emqttd_ctl listeners

mnesia数据库信息查看:

$ bin/emqttd_ctl mnesia

管理dashboard用户:

$ ./bin/emqttd_ctl admins

admins add <Username> <Password> <Tags>         # Add dashboard user

admins passwd <Username> <Password>             # Reset dashboard user password

admins del <Username>                           # Delete dashboard user


追踪 ,EMQ消息服务器支持追踪来自某个客户端(Client)的全部报文,或者发布到某个主题(Topic)的全部消息。

追踪客户端(Client):

./bin/emqttd_ctl trace client "clientid(mosqsub/23058-vm6)" "trace_clientid.log"

追踪主题(Topic):

./bin/emqttd_ctl trace topic "topic1" "trace_topic.log"

查询追踪:

./bin/emqttd_ctl trace list

停止追踪:

./bin/emqttd_ctl trace client "clientid(mosqsub/23058-vm6)" off

./bin/emqttd_ctl trace topic "topic1" off


钩子(Hook)扩展,EMQ消息服务器在客户端上下线、主题订阅、消息收发位置设计了扩展钩子(Hook):

钩子                                说明

client.connected             客户端上线

client.subscribe               客户端订阅主题前

client.unsubscribe           客户端取消订阅主题

session.subscribed          客户端订阅主题后

session.unsubscribed      客户端取消订阅主题后

message.publish             MQTT消息发布

message.delivered          MQTT消息送达

message.acked               MQTT消息回执

client.disconnected        客户端连接断开

钩子使用例子:

-module(emqttd_plugin_template).

-export([load/1, unload/0]).

-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).

load(Env) ->

    emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),

    emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),

    emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).

on_message_publish(Message, _Env) ->

    io:format("publish ~s~n", [emqttd_message:format(Message)]),

    {ok, Message}.

on_message_delivered(ClientId, _Username, Message, _Env) ->

    io:format("delivered to client ~s: ~s~n", [ClientId, emqttd_message:format(Message)]),

    {ok, Message}.

on_message_acked(ClientId, _Username, Message, _Env) ->

    io:format("client ~s acked: ~s~n", [ClientId, emqttd_message:format(Message)]),

    {ok, Message}.

unload() ->

    emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),

    emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4),

    emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4).



MQTT协议的客户端库:

https://github.com/mqtt/mqtt.github.io/wiki/libraries

http://www.eclipse.org/paho/downloads.php



做IM即时通信协议时的考虑:

MQTT作为发布、订阅系统,作为消息推送是很好的。

如果作为IM即时通信,则可以考虑把每个用户id做成一个topic,每个用户订阅名为自己id的topic(比如"USER/1111"),发送方如果需要发布消息给某一个用户,则发布该用户id的topic消息,对方自然就会收到。另外每个用户还要订阅几个系统类的topic(比如"ADMIN/broadcast","ADMIN/1111"),以便后台系统发布各类消息。至于群组消息,每加入一个群,则增加订阅一个名为群id的topic(比如"GROUP/1111")。


评论

天楚锐齿

物联网、大数据、云计算、通信、IT、嵌入式
29
10

EMQ(emqttd) 2.x 安装和使用(物联网传输控制协议的Broker)


支持下国产开源。


MQTT物联网传输控制协议:《MQTT-3.1.1-CN.pdf


下载:emqttd-centos64-v2.0-rc.2-20161019.zip


安装:

$ unzip emqttd-centos64-v2.0-rc.2-20161019.zip -d /data/

$ mv /data/emqttd /data/emqttd-centos64-v2.0-rc.2-20161019

$ ln -s /data/emqttd-centos64-v2.0-rc.2-20161019 /data/emqttd


系统优化配置:

# ulimit -n 1048576

$ sudo sysctl -w fs.file-max=2097152

$ sudo sysctl -w fs.nr_open=2097152

$ sudo sysctl -w net.core.somaxconn=65535

$ sudo sysctl -p


修改配置文件(node.cookie必须每台都要一样,node.name的@后面必须是ip地址或者fqdn方式的主机名):

$ cd /data/emqttd

$ vi etc/emq.conf

## Node name

node.name = emqttd@192.168.60.58

## Cookie for distributed node

node.cookie = emq_dist_cookie_533d99ckd9ji475

## Erlang Process Limit

node.process_limit = 2000000

## Sets the maximum number of simultaneously existing ports for this system

node.max_ports = 1000000

## Size of acceptor pool

mqtt.listener.tcp.acceptors = 64

## Maximum number of concurrent clients

mqtt.listener.tcp.max_clients = 1000000

## Rate Limit. Format is 'burst,rate', Unit is KB/Sec

## mqtt.listener.tcp.rate_limit = 100,10

## TCP Socket Options

mqtt.listener.tcp.backlog = 262144

## Distributed node port range

node.dist_listen_min = 6000

node.dist_listen_max = 6999

## 如果需要启用防火墙,则上面两行去掉注释,注意下面的防火墙端口设置,要打开该段端口。

## Expired after 1 day:

## w - week

## d - day

## h - hour

## m - minute

## s - second

mqtt.session.expired_after = 2w

# 上面为持久会话到期时间,从客户端断开算起,超时后客户端没有收到的消息会丢弃(不想丢失消息的话,该值就要设置的很大)。

## Console log. Enum: off, file, console, both

log.console = both

## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency

log.console.level = info

## Console log file

log.console.file = log/console.log

## Error log file

log.error.file = log/error.log

## Enable the crash log. Enum: on, off

log.crash = on

log.crash.file = log/crash.log


修改下启动脚本(在stop处增加一行):

$ vi bin/emqttd

...

    stop)

        # Wait for the node to completely stop...

        PID="$(relx_get_pid)"

        if ! relx_nodetool "stop"; then

            exit 1

        fi

        while $(kill -s 0 "$PID" 2>/dev/null);

        do

            sleep 1

        done

        killall epmd

        ;;


启动:

$ cd emqttd

直接进入控制台模式:

$ ./bin/emqttd console

后台运行模式:

$ ./bin/emqttd start

$ ./bin/emqttd_ctl status

$ ./bin/emqttd stop


开启防火墙:

端口:1883:MQTT协议tcp端口,8883:MQTT(SSL) tcp端口,8083:MTQQ(websocket)、HTTP API端口,18083:dashboard管理控制WEB端口,4369:集群处理epmd端口,6000-6999由上面配置文件定义的epmd需要的端口范围。

sudo firewall-cmd                      --zone=public --add-port=1883/tcp

sudo firewall-cmd --permanent --zone=public --add-port=1883/tcp

sudo firewall-cmd                      --zone=public --add-port=8883/tcp

sudo firewall-cmd --permanent --zone=public --add-port=8883/tcp

sudo firewall-cmd                      --zone=public --add-port=8083/tcp

sudo firewall-cmd --permanent --zone=public --add-port=8083/tcp

sudo firewall-cmd                      --zone=public --add-port=18083/tcp

sudo firewall-cmd --permanent --zone=public --add-port=18083/tcp

sudo firewall-cmd                      --zone=public --add-port=4369/tcp

sudo firewall-cmd --permanent --zone=public --add-port=4369/tcp

sudo firewall-cmd                      --zone=public --add-port=6000-6999/tcp

sudo firewall-cmd --permanent --zone=public --add-port=6000-6999/tcp

sudo firewall-cmd                      --zone=public --list-all

sudo firewall-cmd --permanent --zone=public --list-all


查看各台的启动状态:

http://192.168.60.55:8083/status

http://192.168.60.55:18083/      用户名/密码: admin / public

$ vi data/configs/vm.xxx.args

$ vi data/configs/app.xxx.conf


LOG位置:

$ vi log/


把节点加入集群:

在各个节点上执行(重复执行也没关系,其中192.168.60.55这台会提示错误:cannot_join_with_self,这个没关系,自己不用加入自己):

$ ./bin/emqttd_ctl cluster join emqttd@192.168.60.55

$ ./bin/emqttd_ctl cluster status

Cluster status: [{running_nodes,['emqttd@192.168.60.58',

                                 'emqttd@192.168.60.56',

                                 'emqttd@192.168.60.57',

                                 'emqttd@192.168.60.55']}]

把节点退出集群:

本机退出集群:

$ ./bin/emqttd_ctl cluster leave

把某节点退出集群:

$ ./bin/emqttd_ctl cluster remove emqttd@192.168.60.56



测试一下:

下载安装MTQQ协议的客户端:https://mosquitto.org/download/

$ sudo rpm -ivh libmosquitto1-1.4.10-1.1.x86_64.rpm

$ sudo rpm -ivh mosquitto-clients-1.4.10-1.1.x86_64.rpm

订阅:

$ mosquitto_sub -h 192.168.60.56 -p 1883 -t test_topic_1 -q 1 -c -i 1111

发布:另开一个ssh或者另外机器上执行:

$ mosquitto_pub -h 192.168.60.57 -p 1883 -t test_topic_1 -q 1 -m "hello mqtt 7"

参数:

-h: 连接到哪台broker。

-p: 连接端口。

-t: topic名字,topic类似文件系统的组织方式,以"/"分隔符来分层,订阅者订阅时可以使用通配符"+"和"#",发布者不能使用通配符。

-q: qos级别,默认为0,共三个值:0:至多一次,不保证到达订阅者;1:至少一次,保证到达订阅者,但不保证不重复;2:正好一次,保证到达,又保证不重复;非钱类的应用使用qos1就可以了。

-c: 如果订阅者退出,在broker保留所有订阅的消息,一旦重新连接上,则把所有消息发给订阅者,就是持久性订阅(clean_sess=false)。

-i: 设置client id,在即时通信时可以设置为用户id或者用户名等具有唯一性的字段。

-r: 发布方使用的参数,保留消息(每个topic只保留最后一个这种消息,之前的会覆盖 ),即使该消息被一个订阅者读取了,还会一直保留在broker,如果有新的订阅者订阅该topic,则马上会收到该消息,类似qq里面的公告性消息,这类信息的删除,发送一个payload为空的null消息即可:$ mosquitto_pub -h 192.168.60.56 -p 1883 -t test_topic_1 -q 1 -r -n  -u user -P 123456。

-m: 该topic的一条消息内容。



加入MariaDB的认证:

修改配置:

$ vi ./etc/emq.conf

## Allow Anonymous authentication

mqtt.allow_anonymous = false

## Default ACL File

mqtt.acl_file = etc/acl.conf


$ vi ./data/loaded_plugins

emq_dashboard.

emq_auth_mysql.


修改plugins配置:

$ vi etc/plugins/emq_auth_mysql.conf

## Mysql Server

auth.mysql.server = 192.168.60.60:3306

## Mysql Pool Size

auth.mysql.pool = 8

## Mysql Username

auth.mysql.username = root

## Mysql Password

auth.mysql.password = 123456

## Mysql Database

auth.mysql.database = mqtt

## Variables: %u = username, %c = clientid

## Authentication Query: select password only

auth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1

## Password hash: plain, md5, sha, sha256, pbkdf2

auth.mysql.password_hash = plain

## %% Superuser Query

auth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1

## ACL Query Command

auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

## ACL nomatch

auth.mysql.acl_nomatch = deny


往MariaDB插入初始化数据库和表(引擎需要改成InnDB):

认证的用户表也可以共用其他系统的用户表,由上面的emq_auth_mysql.conf来配置:auth.mysql.auth_query、auth.mysql.super_query、auth.mysql.acl_query:

建立mqtt数据库:

$ mysql -uroot -p123456 -hxxxx

MariaDB [test]> create database mqtt;

用户表:

MariaDB [test]> CREATE TABLE `mqtt_user` (

  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,

  `username` varchar(100) DEFAULT NULL,

  `password` varchar(100) DEFAULT NULL,

  `salt` varchar(20) DEFAULT NULL,

  `is_superuser` tinyint(1) DEFAULT 0,

  `created` datetime DEFAULT NULL,

  PRIMARY KEY (`id`),

  UNIQUE KEY `mqtt_username` (`username`)

) DEFAULT CHARSET=utf8;

用户表插入测试数据:

MariaDB [mqtt]> INSERT INTO mqtt_user (id, username, password, salt, is_superuser, created)

VALUES

    (1,'superuser','123456','123456',True,'2016-10-26 10:00:00'),

    (2,'user','123456','123456',False,'2016-10-26 10:01:00');

acl表:

MariaDB [mqtt]> CREATE TABLE `mqtt_acl` (

  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,

  `allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',

  `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',

  `username` varchar(100) DEFAULT NULL COMMENT 'Username',

  `clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',

  `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',

  `topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',

  PRIMARY KEY (`id`)

) DEFAULT CHARSET=utf8;

acl表插入默认数据(注意带$SYS为broker保留的特殊topic用来统计使用,username为$all表示所有在user表中的用户):

MariaDB [mqtt]> INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)

VALUES

    (1,1,NULL,'$all',NULL,2,'#'),

    (2,1,NULL,'$all',NULL,1,'#'),

    (3,1,NULL,'$all',NULL,3,'#'),

    (4,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),

    (5,1,'127.0.0.1',NULL,NULL,2,'#'),

    (6,1,NULL,'dashboard',NULL,1,'$SYS/#');


测试,重启每台服务器后执行:

$ mosquitto_sub -h 192.168.60.55 -p 1883 -t test_topic_1 -q 1 -c -i 1112  -u user -P 123456

$ mosquitto_pub -h 192.168.60.55 -p 1883 -t test_topic_1 -q 1 -m "hello mqtt 1" -u user -P 123456



重新初始化emq:

$ bin/emqttd stop

$ rm -rf data/mnesia/*

$ rm -rf data/configs/*

$ rm -rf log/*

$ bin/emqttd start

$ bin/emqttd_ctl cluster join emqttd@192.168.60.55



使用Haproxy来实现负载分担:

因为emq的topic和消息在集群的各台服务器上一致,所以数据不能以增加机器的方式扩容,只能增加每台的内存,和客户端的连接则可以以增加机器方式扩容。

$ vi /etc/haproxy/haproxy.cfg

################## EMQ ######################

listen emq_emqttd *:1883

    mode tcp

    balance source

    log global

    timeout connect         25s

    timeout client          0

    timeout server          0

    option tcpka

    option tcplog

    option tcp-check

    server emq1 192.168.60.55:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25

    server emq2 192.168.60.56:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25

    server emq3 192.168.60.57:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25

    server emq4 192.168.60.58:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25


开启haproxy上的防火墙:

sudo firewall-cmd                      --zone=public --add-port=1883/tcp

sudo firewall-cmd --permanent --zone=public --add-port=1883/tcp

sudo firewall-cmd                      --zone=public --list-all

sudo firewall-cmd --permanent --zone=public --list-all



使用:

broker操作:

显示broker状态:

$ ./bin/emqttd_ctl status

显示broker版本、启用时间等:

$ ./bin/emqttd_ctl broker

显示broker的pubsub进程状态、内存、队列长度、规约数等:

$ ./bin/emqttd_ctl broker  pubsub

显示broker的统计信息:客户端连接数、会话数、主题数、订阅数、路由数等:

$ ./bin/emqttd_ctl broker stats

显示broker的测量信息:底层流量、MQTT报文数、消息数等:

$ ./bin/emqttd_ctl broker metrics

Cluster操作:

$ ./bin/emqttd_ctl cluster

cluster join <Node>                             # Join the cluster

cluster leave                                   # Leave the cluster

cluster remove <Node>                           # Remove the node from cluster

cluster status                                  # Cluster status

Client操作:

$ ./bin/emqttd_ctl clients

clients list                                    # List all clients

clients show <ClientId>                         # Show a client

clients kick <ClientId>                         # Kick out a client

Sessions操作:

$ ./bin/emqttd_ctl sessions

sessions list                                   # List all sessions

sessions list persistent                        # List all persistent sessions ,桥接Bridge的时候才会用到(即clean_sess=false的类型)

sessions list transient                         # List all transient sessions

sessions show <ClientId>                        # Show a session

Routes操作:

$ ./bin/emqttd_ctl routes

routes list                                     # List all routes

routes show <Topic>                             # Show a route

Topics操作:

$ ./bin/emqttd_ctl topics

topics list                                     # List all topics

topics show <Topic>                             # Show a topic

Subscription订阅者操作:

$ ./bin/emqttd_ctl subscriptions

subscriptions list                              # List all subscriptions

subscriptions show <ClientId>                   # Show subscriptions of a client

subscriptions add <ClientId> <Topic> <QoS>      # Add a static subscription manually

subscriptions del <ClientId>                    # Delete static subscriptions manually

subscriptions del <ClientId> <Topic>            # Delete a static subscription manually

Plugins插件操作:

$ ./bin/emqttd_ctl plugins

plugins list                                    # Show loaded plugins

plugins load <Plugin>                           # Load plugin

plugins unload <Plugin>                         # Unload plugin

$ ./bin/emqttd_ctl plugins list

Plugin(emq_auth_clientid, version=2.0, description=Authentication with ClientId/Password, active=false)

Plugin(emq_auth_http, version=2.0, description=Authentication/ACL with HTTP API, active=false)

Plugin(emq_auth_ldap, version=2.0, description=Authentication/ACL with LDAP, active=false)

Plugin(emq_auth_mongo, version=2.0, description=Authentication/ACL with MongoDB, active=false)

Plugin(emq_auth_mysql, version=2.0, description=Authentication/ACL with MySQL, active=false)

Plugin(emq_auth_pgsql, version=2.0, description=Authentication/ACL with PostgreSQL, active=false)

Plugin(emq_auth_redis, version=2.0, description=Authentication/ACL with Redis, active=false)

Plugin(emq_auth_username, version=2.0, description=Authentication with Username/Password, active=false)

Plugin(emq_coap, version=0.2, description=CoAP Gateway, active=false)

Plugin(emq_dashboard, version=2.0, description=Dashboard, active=true)

Plugin(emq_mod_rewrite, version=2.0, description=Rewrite Module, active=false)

Plugin(emq_plugin_template, version=2.0, description=EMQ Plugin Template, active=false)

Plugin(emq_recon, version=2.0, description=Recon Plugin, active=false)

Plugin(emq_reloader, version=2.0, description=Reloader Plugin, active=false)

Plugin(emq_sn, version=0.2, description=MQTT-SN Gateway, active=false)

Plugin(emq_stomp, version=2.0, description=Stomp Protocol Plugin, active=false)

Bridges桥接操作:

$ ./bin/emqttd_ctl bridges

bridges list                                    # List bridges

bridges options                                 # Bridge options

bridges start <Node> <Topic>                    # Start a bridge

bridges start <Node> <Topic> <Options>          # Start a bridge with options

bridges stop <Node> <Topic>                     # Stop a bridge

vm虚机(erlang虚机)性能查看:

$ ./bin/emqttd_ctl vm all

cpu/load1               : 0.05

cpu/load5               : 0.05

cpu/load15              : 0.07

memory/total            : 166404232

memory/processes        : 32564328

memory/processes_used   : 32563952

memory/system           : 133839904

memory/atom             : 959633

memory/atom_used        : 954350

memory/binary           : 46088

memory/code             : 28250535

memory/ets              : 5741416

process/limit           : 2097152

process/count           : 288

io/max_fds              : 1000000

io/active_fds           : 1

ports/count           : 25

ports/limit           : 1048576

端口使用情况查看:

$ ./bin/emqttd_ctl listeners

mnesia数据库信息查看:

$ bin/emqttd_ctl mnesia

管理dashboard用户:

$ ./bin/emqttd_ctl admins

admins add <Username> <Password> <Tags>         # Add dashboard user

admins passwd <Username> <Password>             # Reset dashboard user password

admins del <Username>                           # Delete dashboard user


追踪 ,EMQ消息服务器支持追踪来自某个客户端(Client)的全部报文,或者发布到某个主题(Topic)的全部消息。

追踪客户端(Client):

./bin/emqttd_ctl trace client "clientid(mosqsub/23058-vm6)" "trace_clientid.log"

追踪主题(Topic):

./bin/emqttd_ctl trace topic "topic1" "trace_topic.log"

查询追踪:

./bin/emqttd_ctl trace list

停止追踪:

./bin/emqttd_ctl trace client "clientid(mosqsub/23058-vm6)" off

./bin/emqttd_ctl trace topic "topic1" off


钩子(Hook)扩展,EMQ消息服务器在客户端上下线、主题订阅、消息收发位置设计了扩展钩子(Hook):

钩子                                说明

client.connected             客户端上线

client.subscribe               客户端订阅主题前

client.unsubscribe           客户端取消订阅主题

session.subscribed          客户端订阅主题后

session.unsubscribed      客户端取消订阅主题后

message.publish             MQTT消息发布

message.delivered          MQTT消息送达

message.acked               MQTT消息回执

client.disconnected        客户端连接断开

钩子使用例子:

-module(emqttd_plugin_template).

-export([load/1, unload/0]).

-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).

load(Env) ->

    emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),

    emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),

    emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).

on_message_publish(Message, _Env) ->

    io:format("publish ~s~n", [emqttd_message:format(Message)]),

    {ok, Message}.

on_message_delivered(ClientId, _Username, Message, _Env) ->

    io:format("delivered to client ~s: ~s~n", [ClientId, emqttd_message:format(Message)]),

    {ok, Message}.

on_message_acked(ClientId, _Username, Message, _Env) ->

    io:format("client ~s acked: ~s~n", [ClientId, emqttd_message:format(Message)]),

    {ok, Message}.

unload() ->

    emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),

    emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4),

    emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4).



MQTT协议的客户端库:

https://github.com/mqtt/mqtt.github.io/wiki/libraries

http://www.eclipse.org/paho/downloads.php



做IM即时通信协议时的考虑:

MQTT作为发布、订阅系统,作为消息推送是很好的。

如果作为IM即时通信,则可以考虑把每个用户id做成一个topic,每个用户订阅名为自己id的topic(比如"USER/1111"),发送方如果需要发布消息给某一个用户,则发布该用户id的topic消息,对方自然就会收到。另外每个用户还要订阅几个系统类的topic(比如"ADMIN/broadcast","ADMIN/1111"),以便后台系统发布各类消息。至于群组消息,每加入一个群,则增加订阅一个名为群id的topic(比如"GROUP/1111")。


评论

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐