大部分项目中,前后端交互只是前端请求后端接口,拿到返回的数据之后再处理即可,但是还是有些需要等待后端处理完成之后,主动通知前端,比如异步支付、消息通知等需求,或者前端自己两个页面间通信,尤其是那种需要实时更新却不方便通过传参等方式解决的,mqtt就可以很好的解决这些问题

关于mqtt原理等的文章已经有很多,此文仅是记录前端在vue项目中使用的实例,方便刚接触mqtt的前端朋友快速入门使用

1、使用前的准备工作
先创建一些基础方法方便使用

// 准备好日期、随机、判断等基础方法
Date.prototype.Format = function (fmt) {
    let o = {
        "M+": this.getMonth() + 1, //月份
        "d+": this.getDate(), //日
        "h+": this.getHours(), //小时
        "m+": this.getMinutes(), //分
        "s+": this.getSeconds(), //秒
        "q+": Math.floor((this.getMonth() + 3) / 3), //季度
        "S": this.getMilliseconds() //毫秒
    };
    if (/(y+)/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() + "").substr(4 - RegExp.$1.length));
    for (let k in o)
        if (new RegExp("(" + k + ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" + o[k]).substr(("" + o[k]).length)));
    return fmt;
};

let random = function(n) {
    let res = '';
    for (; res.length < n; res += Math.random().toString(36).substr(2).toUpperCase()) {}
    return res.substr(0, n)
};

let datePrefixRandomStr = function(len){
    return new Date().Format("yyMMddhhmmss-")+ random(len) ;
};

let isNotNull = function (obj) {
    return null !== obj && 	undefined !== obj;
};


2、创建订阅者方法

hostname、port、topic、username、password等参数都需要按照自己项目实际填写,可以直接问部署mqtt服务器相关人员,其他的可以参考以下,或者根据项目实际需求更改

// 处理初始值、订阅者方法
let MqttConsumer = function(jsonParam) {
    if (!jsonParam.topic){
        throw "topic不能为空";
    }
    this.hostname = isNotNull(jsonParam.hostname) ? jsonParam.hostname : '127.0.0.1';
    // this.port = null;
    this.port = isNotNull(jsonParam.port) ? jsonParam.port : 8084;
    this.clientId = isNotNull(jsonParam.clientId) ? jsonParam.clientId :  'ws-consumer-' + datePrefixRandomStr(5);
    this.timeout = isNotNull(jsonParam.timeout) ? jsonParam.timeout : 5;
    this.keepAlive = isNotNull(jsonParam.keepAlive) ? jsonParam.keepAlive : 10;
    this.reConnectionTimeout = isNotNull(jsonParam.reConnectionTimeout) ? jsonParam.reConnectionTimeout * 1000 : 5000;
    this.cleanSession = isNotNull(jsonParam.cleanSession) ? jsonParam.cleanSession : true;
    this.ssl = isNotNull(jsonParam.ssl) ? jsonParam.ssl : false;
    this.userName = jsonParam.userName;
    this.password = jsonParam.password;
    this.topic = jsonParam.topic;
    this.qos = isNotNull(jsonParam.qos) ? jsonParam.qos : 2;
    this.client = null;
    this.connParam = null;
};

// 订阅者准备工作
MqttConsumer.prototype.prepareParam = function(fnConnectionFail, fnConnectionSuccess){
    let self = this;
    self.connParam = {
        invocationContext: {
            host : self.hostname,
            port: self.port,
            path: "/mqtt",
            clientId: self.clientId
        },
        timeout: self.timeout,
        keepAliveInterval: self.keepAlive,
        cleanSession: self.cleanSession,
        useSSL: self.ssl,
        userName: self.userName,
        password: self.password,
        onSuccess: function(){
            console.info(self.clientId + ": 连接成功");
            fnConnectionSuccess && fnConnectionSuccess.call(this);
            self.client.subscribe(self.topic, { qos: self.qos});
        },
        onFailure: function(e){
            console.log(self.clientId + ": 连接服务器失败," + e.errorMessage);
            try {
                fnConnectionFail && fnConnectionFail.call(this, e);
            } catch (e) {}
            let reconnect = setInterval(function () {
                try {
                    console.log(self.clientId + ": 重连中...");
                    delete self.connParam.mqttVersion;
                    delete self.connParam.mqttVersionExplicit;
                    self.client.connect(self.connParam);
                    clearInterval(reconnect), reconnect = null;
                } catch (e) {
                    console.log();
                }
            }, self.reConnectionTimeout);
        }
    };
};

// 连接
MqttConsumer.prototype.connect = function(fnOnMessage, fnConnectionFail, fnConnectionSuccess, fnConnectionLost){

    this.prepareParam(fnConnectionFail, fnConnectionSuccess);
    this.client = new Paho.MQTT.Client(this.hostname, this.port, this.clientId);
    this.client.connect(this.connParam);

    let self = this;
    self.client.onConnectionLost = function(responseObject) {
        (function(){
            if (responseObject.errorCode !== 0) {
                console.log(self.clientId + ": 与服务器断开连接," + responseObject.errorMessage);
                let interval = setInterval(function () {
                    try {
                        console.log(self.clientId + ":重连中...");
                        self.prepareParam(fnConnectionFail, fnConnectionSuccess);
                        self.client.connect(self.connParam);
                        clearInterval(interval), interval = null;
                    } catch (e) {
                        console.log(e);
                    }
                }, self.reConnectionTimeout);
            }
        })();
        fnConnectionLost && fnConnectionLost.call(this);
    };

    self.client.onMessageArrived = function(messageBody) {
      try{ 
            fnOnMessage && fnOnMessage.call(this, messageBody.destinationName, messageBody.payloadString, messageBody.qos, messageBody.retained);
        } catch(ex){
            console.error(ex);
        }
    };
};

3、创建发布者方法

发布者处方法和订阅者类似

let MqttProducer = function(jsonParam) {
    this.hostname = isNotNull(jsonParam.hostname) ? jsonParam.hostname : '127.0.0.1';
    this.port = isNotNull(jsonParam.port) ? jsonParam.port : 8084;
    this.clientId = isNotNull(jsonParam.clientId) ? jsonParam.clientId : 'ws-producer-' + datePrefixRandomStr(5);
    this.timeout = isNotNull(jsonParam.timeout) ? jsonParam.timeout : 5;
    this.keepAlive = isNotNull(jsonParam.keepAlive) ? jsonParam.keepAlive : 10;
    this.reConnectionTimeout = isNotNull(jsonParam.reConnectionTimeout) ? jsonParam.reConnectionTimeout * 1000 : 5000;
    this.cleanSession = isNotNull(jsonParam.cleanSession) ? jsonParam.cleanSession : true;
    this.ssl = isNotNull(jsonParam.ssl) ? jsonParam.ssl : false;
    this.userName = jsonParam.userName;
    this.password = jsonParam.password;
    this.qos = isNotNull(jsonParam.qos) ? jsonParam.qos : 2;
    this.client = null;
    this.connParam = null;
};

MqttProducer.prototype.prepareParam = function(fnConnectionFail, fnConnectionSuccess){
    let self = this;
    self.connParam = {
        invocationContext: {
            host : self.hostname,
            port: self.port,
            path: "/mqtt",
            clientId: self.clientId
        },
        timeout: self.timeout,
        keepAliveInterval: self.keepAlive,
        cleanSession: self.cleanSession,
        useSSL: self.ssl,
        userName: self.userName,
        password: self.password,
        onSuccess: function(){
            console.info(self.clientId + ": 连接成功");
            fnConnectionSuccess && fnConnectionSuccess.call(this);
        },
        onFailure: function(e){
            console.warn(self.clientId + ": 连接服务器失败," + e.errorMessage);
            try {
                fnConnectionFail && fnConnectionFail.call(this, e);
            } catch (e) {}
            let reconnect = setInterval(function () {
                try {
                    console.warn(self.clientId + ": 重连中...");
                    delete self.connParam.mqttVersion;
                    delete self.connParam.mqttVersionExplicit;
                    self.client.connect(self.connParam);
                    clearInterval(reconnect), reconnect = null;
                } catch (e) {
                    console.warn(e);
                }
            }, self.reConnectionTimeout);
        }
    };
};

MqttProducer.prototype.connect = function(fnConnectionFail, fnConnectionSuccess, fnConnectionLost){
    this.prepareParam(fnConnectionFail, fnConnectionSuccess);
    this.client = new Paho.MQTT.Client(this.hostname, this.port, this.clientId);
    this.client.connect(this.connParam);
    let self = this;
    self.client.onConnectionLost = function(responseObject) {
        (function(){
            if (responseObject.errorCode !== 0) {
                console.warn(self.clientId + ": 与服务器断开连接," + responseObject.errorMessage);
                let interval = setInterval(function () {
                    try {
                        console.warn(self.clientId + ":重连中...");
                        self.prepareParam(fnConnectionFail, fnConnectionSuccess);
                        self.client.connect(self.connParam);
                        clearInterval(interval), interval = null;
                    } catch (e) {
                        console.warn(e);
                    }
                }, self.reConnectionTimeout);
            }
        })();
        fnConnectionLost && fnConnectionLost.call(this);
    };
};
// 发送消息的发方法
MqttProducer.prototype.send = function (topic, msg, qos) {
    let message = new Paho.MQTT.Message(msg);
    message.destinationName = topic;
    message.qos = isNotNull(qos) ? qos : this.qos;
    this.client.send(message);
}

4、使用

这里直接举例前端从发送消息到接收的简单过程

先在页面中创建发布者

function createProducer() {
    producer = new MqttProducer({
        hostname : ‘http://xxx.com/mqtt’, // 此处为服务端mqtt域名,需后端配置
        port : 8084, // 端口按照实际的来 
        userName : 'username', // 用户名
        password : 'password' // 密码
    });
    producer.connect();
}

然后发送消息
注意:topic需要和订阅者的topic相同,才能接收到消息

function sendMQTTmessage(user, message) {
	// topic需要和接收者的topic相同,才能接收到消息,此处user是我用来确定发布和接收者唯一性的参数
    let topic = `/mqtt/m/${user}`
    // message就是需要发送的消息内容,需要string类型
    producer.send(topic, message);
}

在接收页创建订阅者
注意:订阅处配置需要和发布处相同

// 初始化mqtt连接等
function registerMqttConsumer(user) {
	let consumer = new MqttConsumer({
		hostname : ‘http://xxx.com/mqtt’,
		port : 8084,
		userName : 'username',
		password : 'password',
		topic : `/mqtt/m/${user}`
	});
	consumer.connect((topic, message, qos, retained) => {
        // 接收到消息了
	});
}

5、使用过的其他情况

注意:如果项目使用的mqtt通知消息时,主题topic有多个层级,但是主要区别是最后一个层级的话,也可以只订阅一次,订阅时topic使用多层通配符---->“#”,“#”是用于匹配主题中任意层级的通配符,需要注意的是无论什么时候,它都必须是主题过滤器的最后一个字符 .

比如如下情况,就可以订阅到/mqtt/m/ 下所有发出的消息,此种情况下可以直接在接收到消息的时候判断topic完整的主题字符串找出当前发的消息具体是哪个,然后再继续做处理即可

function registerMqttConsumer(user) {
	let consumer = new MqttConsumer({
		hostname : ‘http://xxx.com/mqtt’,
		port : 8084,
		userName : 'username',
		password : 'password',
		topic : `/mqtt/m/#`
	});
	consumer.connect((topic, message, qos, retained) => {
        // 接收到消息了
	});
}

以上即为前端使用mqtt的完整步骤,mqttws31.js的文件我会把链接放上,以上。
https://download.csdn.net/download/xt_XiTu/12488593?spm=1001.2014.3001.5503

这种方法是几年前刚接触mqtt时的公司中的用法,实际项目中使用时可以直接在npm上寻找适合需求的插件

Logo

前往低代码交流专区

更多推荐