RabbitMQ

不多说了,它是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。用它解决微服务各种服务的解耦等等。

What is AMQP, MQTT, STOMP ?

先搞懂一些协议层面上的东西 AMQP, MQTT,STOMP

  • AMQP代表高级消息队列协议
  • MQTT(消息队列遥测传输)
  • STOMP(简单/流式文本导向的消息传递协议)是这三种协议中唯——种基于文本的协议

说的直白了,
AMQP 用在后端微服务中比较多,RocketMQ、 Kafka等这些消息软件都实现了这种高级协议
MQTT 能传递文本、语音、图片、视频等二进制数据
STOMP 简单文本传输

MQTT协议 用在终端消息推送比较多,现在的物联网产品里面大都有MQTT技术的身影。

RabbitMQ 支持以上三种协议

选择你的消息协议 AMQP, MQTT,STOMP,请看大神博客:
https://www.yuque.com/noobwo/mq/hpiop0

How to use RabbitMQ with MQTT ?

首先请确保你对 rabbitmq使用已经熟悉了,不熟悉的请自行学习,很简单的使用。

如果不想看英文官方文档,网上不错的rabbitmq基本使用 博客
https://www.cnblogs.com/refuge/category/1395422.html

1. Docker 安装RabbitMQ

docker run -d --hostname myrabbit-test --name mq-rabbit -p 15674:15674 -p 15675:15675 -p 5672:5672 -p 15672:15672 -p 1883:1883 -e  RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=Admin123 rabbitmq:latest

端口说明:

端口说明
5672后台SDK TCP方式连接rabbitmq的端口,发送rabbitmq AMQP协议的消息
15672rabbitmq 管理界面登录访问的端口
15674rabbitmq STOMP WebSocket方式 访问的端口 ,比如JS订阅发布mqtt消息
15675rabbitmq MQTT WebSocket方式 访问的端口 ,比如JS订阅发布mqtt消息
1883rabbitmq MQTT TCP方式访问的端口 ,比如java订阅发布mqtt消息

RABBITMQ_DEFAULT_VHOST 相当于一个相对独立的RabbitMQ服务器,每个VirtualHost
之间是相互隔离的,里面的exchange、queue、message不能互通, VirtualHost可以看成相当于mysql中的一个Database.

2. MQTT插件启用

镜像 rabbitmq:latest 启动的容器里面,默认没有启用各个插件,请进入容器启用各个插件
STOMP 不用可以不启用该插件。

rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_web_mqtt

3. 查看 rabbitmq_mqtt 默认配置

rabbitmq的配置文件在哪里?
一般在 /etc/rabbitmq/下面,Windows可查看README.txt文,一般在 %APPDATA%\RabbitMQ\rabbitmq.config

具体说明,请查看官方文档:
https://www.rabbitmq.com/configure.html#config-file-location

如果你想使用默认配置文件时,请确保如下配置已经做好:

  • mqtt 默认vhost 为“/” 请登录rabbitmq管理后台创建该 vhost
  • mqtt 默认 exchange 为 “amq.topic”
  • mqtt 默认 用户名和密码 为 guest/guest,当然连接的时候可以指定其他用户,比如docker创建的时候指定的 admin用户,请确保该用户有访问 vhost “/”的权限
    在这里插入图片描述

4. 采用JS 前端订阅发布mqtt消息

不多说,直接上代码,html代码如下:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
        "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" lang="zh-CN">
<head>
    <title>Mqtt Websockets</title>
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/javascript"></script>

    <style type="text/css">
        .button {
            background-color: #4CAF50;
            border: none;
            color: white;
            padding: 8px 20px;
            text-align: center;
            text-decoration: none;
            display: inline-block;
            font-size: 16px;
            margin: 4px 2px;
            cursor: pointer;
        }
    </style>
    <script type="text/javascript">
        // 工具类 日期函数
        function getFormatDate() {
            var date = new Date();
            var month = date.getMonth() + 1;
            var strDate = date.getDate();
            var strHours = date.getHours();
            var strMinutes = date.getMinutes();
            var strSeconds = date.getSeconds();
            if (month >= 1 && month <= 9) {
                month = "0" + month;
            }
            if (strDate >= 0 && strDate <= 9) {
                strDate = "0" + strDate;
            }
            if (strHours >= 0 && strHours <= 9) {
                strHours = "0" + strHours;
            }
            if (strMinutes >= 0 && strMinutes <= 9) {
                strMinutes = "0" + strMinutes;
            }
            if (strSeconds >= 0 && strSeconds <= 9) {
                strSeconds = "0" + strSeconds;
            }
            return date.getFullYear() + "-" + month + "-" + strDate + " " + strHours + ":" + strMinutes + ":" + strSeconds;
        }
    </script>

    <script type="text/javascript">
        // RabbitMq 服务器IP地址或域名地址
        const host = '127.0.0.1';
        //WebSocket 协议服务端口,如果是走 HTTPS,设置443端口
        const port = 15675;
        //需要操作的 Topic,第一级父级 topic
        const topic = 'MIDDOL-TEST';
        //设备唯一id
        const clientId = "myclientid_" + parseInt(Math.random() * 10000, 10);
        //服务连接失败后重新尝试连接的时间间隔
        const reconnectTimeout = 10000;
        // RabbitMq 用户名密码,确保该用户可以访问mqtt资源权限
        const cleansession = true;
        // RabbitMq 用户名密码,确保该用户可以访问mqtt资源权限
        const username = 'admin';
        const password = 'Admin123';
        //是否走加密 HTTPS,如果走 HTTPS,设置为 true
        const useTLS = false;

        // MQTT 客户端引用对象
        let mqtt;

        function MQTTconnect() {
            mqtt = new Paho.MQTT.Client(host, port, "/ws", clientId);
            mqtt.onConnectionLost = onConnectionLost;
            mqtt.onMessageArrived = onMessageArrived;
            let options = {
                timeout: 3,
                keepAliveInterval: 60,
                mqttVersion: 4,
                cleanSession: cleansession,
                onSuccess: onConnect,
                onFailure: onFailure,
                useSSL: useTLS,
                userName: username,
                password: password
            };
            mqtt.connect(options);
        }

        // 连接服务器失败
        function onFailure(message) {
            console.log("onFailure : " + message);
            $("#arrivedDiv").append("<div style='color: red'> " + getFormatDate() + " onFailure : " + message.errorMessage + " </div>");
            setTimeout(MQTTconnect, reconnectTimeout);
        }

        // 连接上服务器
        function onConnect() {
            // 订阅某个主题
            mqtt.subscribe(topic, {qos: 0});
            // 订阅P2P主题
            mqtt.subscribe(topic + "/p2p/" + clientId, {qos: 0});

            // 发布主题消息
            let message = new Paho.MQTT.Message("Connect success, Hello mqtt!");
            // set topic
            message.destinationName = topic;
            mqtt.send(message);

            //发送 P2P 消息
            message = new Paho.MQTT.Message("Connect success, Hello mqtt P2P Msg!");
            // set topic
            message.destinationName = topic + "/p2p/" + clientId;
            mqtt.send(message);
        }

        // 未连接服务器
        function onConnectionLost(response) {
            console.log("onConnectionLost : " + response);
            setTimeout(MQTTconnect, reconnectTimeout);
        }

        // 接收到mqtt消息
        function onMessageArrived(message) {
            let topic = message.destinationName;
            let payload = message.payloadString;
            // console.log("recv msg : " + topic + "   " + payload);

            $("#arrivedDiv").append("<div> " + getFormatDate() + " recv msg : topic=" + topic + "  ,payload=" + payload + " </div>")
        }

        // MQTT 连接初始化
        MQTTconnect();

        // 发送测试
        function testMessageSend() {
            let msg = $("#testMessage").val();
            if (msg == null || msg === '') {
                $("#arrivedDiv").append("<div style='color: red'>请在文本框输入消息后点击发送按钮</div>")
                return;
            }
            // 发布主题消息
            //set body
            let message = new Paho.MQTT.Message(msg);
            // set topic
            message.destinationName = topic;
            mqtt.send(message);
        }

        function testMessageClean() {
            $("#arrivedDiv").html("<br/>");
        }

        $(function () {
            $("#myClientIdSpan").html("" + clientId);
        })

    </script>
</head>

<div style="margin-top: 30px;">
    <span>MQTT设备ID(clientId):</span><span id="myClientIdSpan" style="font-weight: bold;"></span>
    <br/><br/>
    <label style="font-weight: bold" for="testMessage">MQTT消息:</label>
    <input style="height: 25px; width: 180px;" maxlength="60" value="这是一条测试消息" id="testMessage"/>
    <button class="button" id="mySendBtn" onclick="testMessageSend()"> 点击发送</button>

    <button class="button" id="cleanSendBtn" onclick="testMessageClean()"> 清空日志</button>
</div>

<div style="margin-top: 30px">

    <div style="font-size: 20px;color: darkcyan"> 接收到的mqtt消息日志</div>
    <hr/>
    <div id="arrivedDiv" style="height:600px; width:1000px; overflow:scroll; background:#EEEEEE;">
        <br/>
    </div>

</div>
</html>

主要查看上面js里面 RabbitMq 服务器IP地址或域名地址和端口号主要信息,运行如下界面:
在这里插入图片描述

5. 采用Java 订阅发布mqtt消息

mqtt作为行业通用协议,支持很多种语言开发,具体SDK请查看如下:

https://www.alibabacloud.com/help/zh/doc-detail/44866.htm?spm=a2c63.p38356.b99.49.18af6586hItGUj

本次采用 org.eclipse.paho.client.mqttv3 SDK进行订阅发布MQTT消息。

POM.xml引入Maven依赖:

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>

JAVA 主要代码如下:

package com.test.rabbitmq.mqtt;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * MqttClient demo service
 *
 * @author admin
 */
public class MyMqttService {

    private static org.slf4j.Logger logger = LoggerFactory.getLogger(MyMqttService.class);

    private MqttClient client;
    private String defaultTopic;

    /**
     * Builder模式构造实例
     */
    public static class Builder {
        private String host;
        private String userName;
        private String passWord;
        private String clientId;
        private String defaultTopic = "MyMqttTopic";
        private MqttCallback callback;
        private boolean cleanSession;

        public Builder host(String host) {
            this.host = host;
            return this;
        }

        public Builder userName(String userName) {
            this.userName = userName;
            return this;
        }

        public Builder passWord(String passWord) {
            this.passWord = passWord;
            return this;
        }

        public Builder clientId(String clientId) {
            this.clientId = clientId;
            return this;
        }

        public Builder defaultTopic(String defaultTopic) {
            this.defaultTopic = defaultTopic;
            return this;
        }

        public Builder callback(MqttCallback callback) {
            this.callback = callback;
            return this;
        }

        public Builder cleanSession(boolean cleanSession) {
            this.cleanSession = cleanSession;
            return this;
        }

        public MyMqttService build() {
            return new MyMqttService(this);
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    private MyMqttService(Builder builder) {
        defaultTopic = builder.defaultTopic;
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(), r -> {
            Thread t = new Thread(r);
            t.setName("MyMQTT线程");
            return t;
        });
        try {
            //id应该保持唯一性
            client = new MqttClient(builder.host, builder.clientId, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(builder.cleanSession);
            options.setUserName(builder.userName);
            options.setPassword(builder.passWord.toCharArray());
            options.setConnectionTimeout(10);
            options.setKeepAliveInterval(20);
            if (builder.callback == null) {
                client.setCallback(new MqttCallbackExtended() {

                    @Override
                    public void connectComplete(boolean reconnect, String serveruri) {
                        // 客户端连接成功后就需要尽快订阅需要的 topic
                        logger.debug(builder.clientId + " connectComplete reconnect=" + reconnect + ", serveruri=" + serveruri);

                        // 参考阿里云mqtt文档  https://www.alibabacloud.com/help/zh/doc-detail/42420.htm?spm=a2c63.p38356.b99.12.87851d06uHImcQ
                        /*
                         * cleanSession=true:客户端再次上线时,将不再关心之前所有的订阅关系以及离线消息。
                         * cleanSession=false:客户端再次上线时,还需要处理之前的离线消息,而之前的订阅关系也会持续生效
                         * QoS0 代表最多分发一次
                         * QoS1 代表至少达到一次
                         *   ----------------------------------------------------------------------------------
                         *   |QoS级别   |	cleanSession=true	                |cleanSession=false             |
                         *   | QoS0	    |  无离线消息,在线消息只尝试推一次。	    |无离线消息,在线消息只尝试推一次。 |
                         *   | QoS1	    |  无离线消息,在线消息保证可达。	        |有离线消息,所有消息保证可达。     |
                         *  ----------------------------------------------------------------------------------
                         */
                        final String[] topicFilter = {builder.defaultTopic, builder.defaultTopic.concat("/p2p/").concat(builder.clientId)};
                        final int[] qos = {0, 0};
                        executorService.submit(() -> subscribe(topicFilter, qos));
                    }

                    @Override
                    public void connectionLost(Throwable arg0) {
                        logger.debug(builder.clientId + " connectionLost " + arg0);
                    }

                    @Override
                    public void deliveryComplete(IMqttDeliveryToken arg0) {

                        logger.debug(builder.clientId + " deliveryComplete " + arg0);
                    }

                    @Override
                    public void messageArrived(String arg0, MqttMessage arg1) {

                        logger.debug(builder.clientId + " messageArrived: " + arg1.toString());
                    }
                });
            } else {
                client.setCallback(builder.callback);
            }
            client.connect(options);
        } catch (MqttException e) {
            logger.error("MyMqttService 初始化异常 ", e);
        }
    }

    /**
     * 发送消息,默认主题
     *
     * @param msg 消息
     */
    public void sendMessage(String msg) {
        sendMessage(defaultTopic, msg);
    }

    /**
     * 发送指定主题消息
     *
     * @param topic 主题
     * @param msg   消息
     */
    public void sendMessage(String topic, String msg) {
        try {
            MqttMessage message = new MqttMessage(msg.getBytes());
            message.setQos(0);
            message.setRetained(false);
            client.publish(topic, message);
            logger.info("发送消息成功 topic={},msg={}", topic, msg);
        } catch (MqttException e) {
            logger.error("发送主题消息异常 topic={} ,msg={}", topic, msg, e);
        }
    }

    /**
     * 订阅主题
     *
     * @param topicFilters 主题名称
     * @param qos          规则
     */
    public void subscribe(String[] topicFilters, int[] qos) {
        try {
            client.subscribe(topicFilters, qos);
            for (int i = 0; i < topicFilters.length; i++) {
                logger.info("subscribe success topicFilters={}, qos={}", topicFilters[i], qos[i]);
            }

        } catch (MqttException e) {
            logger.error("订阅主题", e);
        }
    }

    /**
     * 取消订阅某个主题
     *
     * @param topicFilters 主题名称
     */
    public void unsubscribe(String[] topicFilters) {
        try {
            client.unsubscribe(topicFilters);
        } catch (MqttException e) {
            logger.error("取消订阅某个主题", e);
        }
    }

    public void closeClient(boolean force) {
        try {
            client.close(force);
        } catch (MqttException e) {
            logger.error("closeClient异常", e);
        }
    }
}

写个main方法测试一下哦:

package com.test.rabbitmq.mqtt;

public class MyMqttTestService {

    public static void main(String[] args) throws InterruptedException {

        MyMqttService service = MyMqttService.builder()
                .host("tcp://127.0.0.1:1883")
                .userName("admin")
                .passWord("Admin123")
                .clientId("myclientid_10001")
                .defaultTopic("MIDDOL-TEST")
                .cleanSession(true).build();

        Thread.sleep(3000L);
        service.sendMessage("这是java后端发送的消息");

        // service.closeClient(false);
    }
}

运行一下看发生了什么?
控制台信息如下:
在这里插入图片描述

刚刚运行的html页面也同时受到了java后端发送的消息。
在这里插入图片描述
查看rabbitmq 管理后台的channel 出现两个链接
在这里插入图片描述

前端发送一个消息看看:
在这里插入图片描述

后端同样可以受到消息:
在这里插入图片描述

6. 稍作改动可切换阿里云MQTT消息队列

阿里云的MQTT消息队列的SDK代码几乎和上面一致,多了 accessKey权限校验等。
https://www.alibabacloud.com/help/zh/doc-detail/59721.htm?spm=a2c63.p38356.b99.48.28d04a9bVXkQM4
在这里插入图片描述

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐