目录

一.  mqtt概念

二. mqtt的实现

mqtt简单应用(实例)

三. mqtt回调函数

连接回调 on_connect

订阅回调 on_subscribe

消息回调 on_message

注册特定主题消息回调 message_callback_add() --> 主题筛选器

删除注册的特定回调 message_callback_remove()

消息发布回调 on_publish

取消订阅 on_unsubscribe

断开连接回调 ondisconnect

套接子打开回调 on_scoket_open

套接子关闭回调 on_socket_close

套接子写入回调 on_socket_register

套接子注销写入回调 on_socket_unregister



一.  mqtt概念

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布

由于物联网的环境是非常特别的,所以MQTT遵循以下设计原则:

  • (1)精简,不添加可有可无的功能;
  • (2)发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递;
  • (3)允许用户动态创建主题,零运维成本;
  • (4)把传输量降到最低以提高传输效率;
  • (5)把低带宽、高延迟、不稳定的网络等因素考虑在内;
  • (6)支持连续的会话控制;
  • (7)理解客户端计算能力可能很低;
  • (8)提供服务质量管理;
  • (9)假设数据不可知,不强求传输数据的类型与格式,保持灵活性。

二. mqtt的实现

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

  • (1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
  • (2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。

python下载mqtt包:

pip install paho-mqtt

mqtt简单应用(实例)

发布端

# -*- coding: utf-8 -*-# -*- coding: utf-8 -*-


import paho.mqtt.client as mqtt
import time


def on_connect(client, userdata, flags, rc):
    print "链接"
    print("Connected with result code: " + str(rc))


def on_message(client, userdata, msg):
    print "消息内容"
    print(msg.topic + " " + str(msg.payload))


#   订阅回调
def on_subscribe(client, userdata, mid, granted_qos):
    print "订阅"
    print("On Subscribed: qos = %d" % granted_qos)
    pass


#   取消订阅回调
def on_unsubscribe(client, userdata, mid, granted_qos):
    print "取消订阅"
    print("On unSubscribed: qos = %d" % granted_qos)
    pass


#   发布消息回调
def on_publish(client, userdata, mid):
    print "发布消息"
    print("On onPublish: qos = %d" % mid)
    pass


#   断开链接回调
def on_disconnect(client, userdata, rc):
    print "断开链接"
    print("Unexpected disconnection rc = " + str(rc))
    pass


client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.on_unsubscribe = on_unsubscribe
client.on_subscribe = on_subscribe
client.connect('127.0.0.1', 1883, 600) # 600为keepalive的时间间隔
while True:
    client.publish(topic='mqtt11', payload='amazing', qos=0, retain=False)
    time.sleep(2)

此处也可以将mqtt方法封装为一个类,使用会更方便一些

参数解释:

  • keepalive =>   心跳间隔,单位是秒,如果 broker 和 client 在这段时间内没有任何通讯,client 会给 broker 发送一个 ping 消息
  • retain  =>  如果设为 Ture ,这条消息会被设为保留消息 
  • payload  => 消息内容,字符串类型,如果设为 None ,会发送一条长度为 0 消息。如果设置了 int 或者 3. float 类型的值,会当做字符串发送,如果你想发送真正的 int 或者 float 值,需要用 struct.pack() 生成消息, mqtt的publish 只支持None, string, int, float 类型的数据,  如果需要发送json类型数据可以通过json.dumps()将数据进行转换后在发送, 接收端在on_message()回调函数中通过json.loads() 将数据解析就可以了
  • topic =>  这条消息所属的话题
  • qos  => 消息的安全等级 Qos详细介绍
    • qos=0    QoS0,At most once,至多一次;
      • QoS0 代表,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,也就算了;
    • qos=1    QoS1,At least once,至少一次;
      • QoS1 代表,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息;
    • qos=2    QoS2,Exactly once,确保只有一次
      • QoS1 代表,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息

        qos安全等级需要注意的点:

                1. python中下载的paho-mqtt包中,默认qos=0。
                2. 不论是sub还是pub都需要指定qos安全等级。

                pub指定的qos是服务器肯定按此规则接收,但是最终订阅者不一定。
                sub指定的qos表示订阅者可以接收的最高消息等级,也就是可能收到更低等级的消息

                也就是服务器只会按pub和sub两者qos等级最小的那个qos规则来发送消息

订阅端

# -*- coding: utf-8 -*-# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt


def on_connect(client, userdata, flags, rc):
    print("Connected with result code: " + str(rc))


def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))


#   订阅回调
def on_subscribe(client, userdata, mid, granted_qos):
    print("On Subscribed: qos = %d" % granted_qos)
    pass


#   取消订阅回调
def on_unsubscribe(client, userdata, mid):
    print "取消订阅"
    print("On unSubscribed: qos = %d" % mid)
    pass


#   发布消息回调
def on_publish(client, userdata, mid):
    print "发布消息"
    print("On onPublish: qos = %d" % mid)
    pass


#   断开链接回调
def on_disconnect( client, userdata, rc):
    print "断开链接"
    print("Unexpected disconnection rc = " + str(rc))
    pass


client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.on_unsubscribe = on_unsubscribe
client.on_subscribe = on_subscribe
client.connect('127.0.0.1', 1883, 600) # 600为keepalive的时间间隔
client.subscribe('mqtt11', qos=0)
client.loop_forever() # 保持连接

订阅服务client.subscribe("mqtt11", qos=0) 也可以将改订阅放在on_connect() 回调函数中,程序在建立连接成功后首先后执行on_connect() , 可将整个订阅端封装为一个类使用

参数解释:

  • keepalive =>   心跳间隔,单位是秒,如果 broker 和 client 在这段时间内没有任何通讯,client 会给 broker 发送一个 ping 消息
  • loop_forever()  =>  该函数是保持永久连接, 阻塞式,可结合多线程或多进程的方式使用

注意: 同一个mqtt服务即可以是发布端,也可以是订阅端,也可以订阅自己发布的内容,这里之所以分开是为了看起来更直观一些

mqtt方法可以封装为如下类


class MqttRoad(object):

    def __init__(self, mqtt_host, mqtt_port, mqtt_keepalive):
        super(MqttRoad, self).__init__()
        client = mqtt.Client()
        client.on_connect = self.on_connect
        client.on_message = self.on_message
        client.on_publish = self.on_publish
        client.connect(mqtt_host, mqtt_port, mqtt_keepalive)  # 600为keepalive的时间间隔
        client.loop_forever()  # 保持连接

    def on_connect(self, client, userdata, flags, rc):
        print("Connected with result code: " + str(rc))
        # 订阅
        client.subscribe("mqtt11")


    def on_message(self, client, userdata, msg):
        print("on_message topic:" + msg.topic + " message:" + str(msg.payload.decode('utf-8')))
        

    #   订阅回调
    def on_subscribe(self, client, userdata, mid, granted_qos):
        print("On Subscribed: qos = %d" % granted_qos)
        pass

    #   取消订阅回调
    def on_unsubscribe(self, client, userdata, mid):
        # print("取消订阅")
        print("On unSubscribed: qos = %d" % mid)
        pass

    #   发布消息回调
    def on_publish(self, client, userdata, mid):
        # print("发布消息")
        print("On onPublish: qos = %d" % mid)
        pass

    #   断开链接回调
    def on_disconnect(self, client, userdata, rc):
        # print("断开链接")
        print("Unexpected disconnection rc = " + str(rc))
        pass


if __name__ == '__main__':
    
    MqttRoad("172.0.0.1", 1883, 600)

一个简单的发布订阅就完成了,通过回调函数,可以对相应的值进行操作

三. mqtt回调函数

回调函数的应用是非常有必要的,回调函数有很多种,我们可能根据不同的业务场景采用不同的回调函数进行数据的处理,可以达到代码的高可用,减少代码的冗余。

回调函数只是业务程序的中转站

在这里有一个特别要注意的点,回调函数可以获取到请求的响应数据,但回调函数并不是适合作为对响应结果进行处理的地方

举个例子:

发布端发起请求,通过topic将请求传入mqtt,订阅端通过同一个topic订阅到该发布端的data数据,然后再on_message()中做业务处理,假定该业务处理需要10秒,在第1秒的时候,发布端又发布了新的内容,这是订阅端依旧通过topic订阅到了该内容,想要在on_message()中做业务处理,但是上一个业务并没有处理完成,程序处于堵塞的状态,直到10秒结束后,新订阅到的内容才会被处理。长期以此的话,就会在业务上造成延迟,如果业务需求对消息的实时性要求很高的话,那这样的处理方式就不可取了。

所以说,回调函数并不适合做业务处理,正确的做法应该是将订阅到的数据通过回调函数转给别的处理程序去执行,做到分模块分节点的执行。或者根据不同的业务需求,分类分节点的订阅,通过多个线程多个订阅的方式,做到业务上的互不干扰。

个人理解:回调函数类似于中间键的效果,mqtt在运行的过程中,会依次访问回调函数,将当前回调函数所需的一些参数信息传给回调函数,前提是你的回调函数在建立连接时被引用注册了才行。

连接回调 on_connect

连接主题(成功,失败)都会调用此函数

on_connect(client, userdata, flags, rc, reasonCode, properties)

参数解释:     (回调中重复参数不做重复解释)

  • client => 此回调的客户机实例
  • userdata => 在Client()或userdata_set()中设置的私有用户数据
  • flags => 代理发送的响应标志
  • rc => 连接结果
  • reasonCode  => mqttv5.0原因码:reasonCode类的实例。
  • properties  =>  从代理返回的mqttv5.0属性

      (对于MQTT v3.1和v3.1.1,未提供属性,但用于兼容性     .对于mqttv5.0,官方建议添加properties=None。)

  • rc值表示是否成功:

0:连接成功

1:连接被拒绝-协议版本不正确

2:连接被拒绝-客户端标识符无效

3:连接被拒绝-服务器不可用

4:连接被拒绝-用户名或密码错误

5:连接被拒绝-未授权

6-255:当前未使用。

订阅回调 on_subscribe

on_subscribe(client,userdata,mid,grated_qos,properties=None)

参数解释:     (回调中重复参数不做重复解释)

  • mid => 匹配从相应的subscribe()调用
  • grated_qos   => 给出代理的qos级别的整数列表,为每个不同的订阅请求授予。

消息回调 on_message

on_message(client, userdata, message)

参数解释:     (回调中重复参数不做重复解释)

  • message  =>  MQTTMessage的实例。这是一个包含成员主题、负载、qos和保留的类
  • 使用message_callback_add()定义将调用的多个回调,用于特定主题筛选器

注意:mqtt中on_message可以返回订阅到的信息,on_message是系统的默认订阅回调,如果没有自定义消息回调message_callback_add(sub, callback) ,则所有的订阅接收到的数据都会被on_message回调函数接收

如果数据量过大,或者解析数据耗时时建议使用message_callback_add(sub, callback)方法单独处理

注册特定主题消息回调 message_callback_add() --> 主题筛选器

message_callback_add(sub, callback)

参数解释:     (回调中重复参数不做重复解释)

  • sub  =>   sub即subscribe, 也就是client.subscribe() 方法中的订阅的topic,可以是通配符匹配订阅
  • callback  => 自定义回调函数,回调参数与on_message()相同即可,参数的意义也是一样的。举个例子如下
    def username_message(client, userdata, msg):
        print("username_message  topic:" + msg.topic)

注意: callback一定要初始化绑定一下,即client.username_message = username_message

在消息回调中,message_callback_add()的优先级要高于on_message默认回调的,匹配"sub"的message将传递给"callback", 任何不匹配的message将传递到默认的on_message回调。

这里建议多次调用不同的"sub"来定义多个主题特定回调

如果订阅的主题很多,且数据传输的频率很快,如果不使用message_callback_add()实现特定主题回调,进行单独处理数据的话,则所有的数据都会进入on_message系统默认回调中,则可能会产生数据拥堵,on_message处理不过来的现象,最严重的后果将造成消息堵塞,消息延迟

删除注册的特定回调 message_callback_remove()

message_callback_remove(sub)

删除以前注册过的回调,与message_callback_add()相对应

消息发布回调 on_publish

on_publish(client, userdata, mid)

参数解释:     (回调中重复参数不做重复解释)

  • mid  => 匹配从相应的publish()调用,以允许跟踪传出消息。
  • 对于QoS级别为1和2的消息,这意味着握手已完成。对于QoS 0,这意味着消息已经离开了客户。这个回调很重要,因为即使publish()调用返回成功,并不总是意味着消息已发送

取消订阅 on_unsubscribe

on_unsubscribe(client, userdata, mid,  properties, reasonCodes)

参数解释:     (回调中重复参数不做重复解释)

  • mid  =>  匹配从相应的unsubscribe()调用。

断开连接回调 ondisconnect

on_disconnect(client, userdata, rc, reasonCode, properties)

参数解释:     (回调中重复参数不做重复解释)

  • rc => 断开连接的结果

套接子打开回调 on_scoket_open

on_socket_open(client, userdata, socket)

参数解释:     (回调中重复参数不做重复解释)

  • socket  =>  刚打开的socket

套接子关闭回调 on_socket_close

on_socket_close(client, userdata, socket)

参数解释:     (回调中重复参数不做重复解释)

套接子写入回调 on_socket_register

on_socket_register_write(client, userdata, socket)

参数解释:     (回调中重复参数不做重复解释)

  • socket  =>  写入的套接子

套接子注销写入回调 on_socket_unregister

on_socket_unregister_write(client, userdata, socket)

参数解释:     (回调中重复参数不做重复解释)

还有一些系统默认的回调函数,就不在次一一列出了,感兴趣的可以去看看paho.mqtt.client包的源码(paho.mqtt包中的client类)

注:  喜欢的可收藏,有问题的可指出,需要的可引用,但不希望被复制内容重新发布,污染网络环境0.0

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐