MQTT(Message Queuing Telemetry Transport)是一种轻量级的即时通信协议,相关介绍可见:MQTT简介

Paho 是Eclipse的开源 MQTT 客户端项目,提供多种语言的 MQTT 客户端实现,包括 C、C++、C#、Java、Python、JavaScript 等。在Python环境下,Paho MQTT Python客户端由paho-mqtt模块支撑。

安装Paho MQTT Python 客户端

Paho MQTT Python 客户端依赖于Python 2.7.9以上版本和Python 3.5以上版本。本文测试环境为Python 3.7.1。
用pip安装paho-mqtt如下:

pip install paho-mqtt

常用API

paho-mqtt主要由三个模块组成:Client模块、Publish模块和Subscribe模块。Publish模块和Subscribe模块使用相对较少,参数含义也与Client模块的publish和subscribe方法的参数类似,本文限于篇幅原因就不介绍了。

Client的基本使用流程

Client的基本使用流程如下:

  • 创建客户端实例
  • 使用 connect*() 函数之一连接到代理
  • 调用 loop*() 函数之一来维护与代理的网络流量
  • 使用 subscribe() 订阅主题并接收消息
  • 使用 publish() 将消息发布到代理
  • 使用 disconnect() 断开与代理的连接

值得注意的是,Client使用过程中存在许多回调,这些回调可以帮助Client处理各种事件,后面我们将详细介绍。

Client类与方法
(1)Client的构建与重置

Client的构建与重置由以下两个方法承担:

Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
reinitialise(client_id="", clean_session=True, userdata=None)

具体参数说明如下:


client_id:
连接到代理时使用的唯一客户端 ID 字符串。如果 client_id 为零长度或 None ,则将随机生成一个。在这种情况下,clean_session 参数必须为 True。
clean_session:
确定客户端类型的布尔值。如果为 True,代理将在断开连接时删除有关此客户端的所有信息。如果为 False,则客户端是持久客户端,并且在客户端断开连接时将保留订阅信息和排队消息。

注意,客户端永远不会在断开连接时丢弃自己的传出消息。调用 connect() 或 reconnect() 将导致消息被重新发送。使用 reinitialise() 将客户端重置为其原始状态。

userdata:
作为 userdata 参数传递给回调的任何类型的用户定义数据。稍后可能会使用 user_data_set() 函数对其进行更新。

protocol:
用于此客户端的 MQTT 协议版本。可以是 MQTTv31 或 MQTTv311

transport:
设置为“websockets”以通过 WebSockets 发送 MQTT。保留默认值“tcp”以使用原始 TCP。


使用示例如下:

import paho.mqtt.client as mqtt
# 构建一个Client
mqttc = mqtt.Client()
# 重置一个Client
mqttc.reinitialise()
(2)连接至代理/重新连接/与代理断开连接

相应方法是:

connect(host, port=1883, keepalive=60, bind_address="")
reconnect()
disconnect()

注意,MQTT的本质是一个用以维护客户端与代理之间的长连接的、并且是低消耗的协议。所以,无论对于代理还是对于客户端,他们都需要清楚地知道二者之间的连接是否断开、而且是正常断开还是非正常断开(正常断开(客户端使用disconnect方法)则不需特别操作;非正常断开情况下,客户端需要尝试重新连接,而代理则需要发送遗嘱)。这一点贯穿与MQTT的协议设计与“连接”这一部分的内容。

具体参数说明如下:


host:
远程代理的主机名或 IP 地址
port:
要连接的服务器主机的网络端口。 默认为 1883。请注意,基于 SSL/TLS 的 MQTT 的默认端口为 8883,因此如果您使用 tls_set() 或 tls_set_context(),则可能需要手动提供端口
keepalive:
与代理通信之间允许的最长间隔(以秒为单位)。 如果没有其他消息正在交换,这将控制客户端向代理发送 ping 消息的速率。

需要指出,MQTT协议规定,在 1.5倍的keepalive时间内,如果代理没有收到来自客户端的任何数据包,那么代理将认为它和这个客户端之间的连接已经断开;而如果客户端没有收到来自 代理的任何数据包,那么这个客户端会认为它和代理之间的连接已经断开。为维持正常的连接,如果代理与客户端之间没有其他数据传输,客户端会每隔keepalive时间向代理发送一次ping消息(由loop()来维护)。keepalive的缺省时间是60s。

bind_address:
假设存在多个接口,要将此客户端绑定到的本地网络接口的 IP 地址


使用示例如下:

# 已构建一个Client:mqttc
mqttc.connect("mqtt.eclipseprojects.io") 
# 使用reconnect与disconnect之前必须已经调用过connect
mqttc.reconnect()
mqttc.disconnect()

注意这三个方法是阻塞进程的。

(3)网络回路控制

网络回路控制是客户端对传入与发出数据进行控制的背后驱动力,同时也根据客户端的keepalive设置发送ping消息(心跳报文),以刷新连接。如果不调用网络回路方法,则客户端不会处理传入的网络数据,并且可能无法及时发送传出的网络数据。其作用可由下图表示:
Loop的作用(图转载自Steve's Internet Guide)

相关方法包括:

loop(timeout=1.0, max_packets=1)
loop_start()
loop_stop(force=False)
loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False)

包含三套逻辑,一是循环调用loop()阻塞进程:timeout参数定义了loop()阻塞进程的超时时间,而max_packets 参数已过时,应保持未设置。timeout 不得超过客户端的 keepalive 值,否则客户端将被代理定期断开连接。
用例如下:

mqttc.connect("mqtt.eclipseprojects.io")
while True:
    mqttc.loop(timeout=1.0)
    # do something else

二是使用loop_start()和loop_stop()创建和停止后台线程,以自动调用loop()。这种方式释放了主线程。 loop_start()可以在connect*() 之前或之后调用。 此调用还处理与代理的重新连接。 调用 loop_stop() 停止后台线程。 force 参数当前被忽略。
定期调用以处理网络事件。 此调用在 select() 中等待,直到网络套接字可用于读取或写入(如果合适),然后处理传入/传出数据。 此函数最多阻塞超时秒。
用例如下:

mqttc.connect("mqtt.eclipseprojects.io")
mqttc.loop_start()
# do something else
while True:
    temperature = sensor.blocking_read()
    mqttc.publish("paho/temperature", temperature)

mqttc.loop_stop()

三是使用loop_forever()持续阻塞进程(不需外部循环),连接将维持到客户端调用disconnect()。timeout 和 max_packets 参数已过时,应保持未设置。retry_first_connection=True 使其重试第一次连接。警告:这可能会导致客户端不断连接到不存在的主机而不提示失败的情况。由于主线程被阻塞,其他操作无法在主线程执行,包括disconnect()在内的其他操作都需要通过回调函数来执行。

用例如下:

mqttc.connect("mqtt.eclipseprojects.io")
mqttc.loop_forever(retry_first_connection=False)
(4)订阅/取消订阅

使客户端订阅到一个或多个主题,或从相应主题退订。

subscribe(topic, qos=0)
unsubscribe(topic)

具体参数说明:


topic:
一个字符串,指定要订阅的订阅主题。
qos:
订阅所需的服务质量(quality of service)级别。 默认为 0,可选0,1,2。


用例:

mqttc.subscribe(("my/topic", 1))
mqttc.subscribe([("my/topic", 0), ("another/topic", 2)])
mqttc.unsubscribe("my/topic")
mqttc.unsubscribe(["my/topic", "another/topic"])
(5)发布

发布会使得消息被发送到代理,然后再由代理发送到订阅匹配主题的任何客户端。
相关方法:

publish(topic, payload=None, qos=0, retain=False)

具体参数说明如下:


topic:
消息应该发布到的主题
payload:
要发送的实际消息。 如果没有给出,或者设置为 None 将使用零长度消息。 传递 int 或 float 将导致有效负载转换为表示该数字的字符串。 如果您希望发送真正的 int/float,请使用 struct.pack() 创建您需要的有效负载。
qos:
要使用的服务质量(quality of service)水平,默认为 0,可选0,1,2。
retain:
如果设置为 True,则该消息将被设置为该主题的保留消息。保留消息的作用是使新订阅某个主题的客户端能够收到该主题中上一次发布的消息。


用例如下:

mqttc.publish(topic="my/topic", payload=None, qos=0, retain=True)
回调

回调是在指由某个事件触发相应的处理。七大类回调函数,分别为连接、断开连接、收到消息、发布消息、订阅主题、取消订阅主题、收到日志七类事件提供了处理方法。

(1)连接

on_connect():当代理响应客户端的连接请求时调用。

on_connect(client, userdata, flags, rc)

具体参数说明如下:


client:
此回调的客户端实例
userdata:
在 Client() 或 user_data_set() 中设置的私有用户数据
flags:
代理发送的响应标志
rc:
连接结果


用例:

def on_connect(client, userdata, flags, rc):
    print("Connection returned result: "+connack_string(rc))

mqttc.on_connect = on_connect
...
(2)断开连接

on_disconnect():当客户端与代理断开连接时调用。

on_disconnect(client, userdata, rc)

具体参数说明如下:


client:
此回调的客户端实例
userdata:
在 Client() 或 user_data_set() 中设置的私有用户数据
flags:
代理发送的响应标志
rc:
断开结果


用例:

def on_disconnect(client, userdata, rc):
    if rc != 0:
        print("Unexpected disconnection.")

mqttc.on_disconnect = on_disconnect
...
(3)收到消息

on_message():当收到客户端订阅的主题的消息并且该消息与现有主题过滤器回调(由message_callback_add()定义)不匹配时调用。
message_callback_add():当收到客户端订阅的主题的消息并且该消息与现有主题过滤器回调匹配时调用。
message_callback_remove():删除之前使用message_callback_add()定义的主题过滤器回调

on_message(client, userdata, message)
message_callback_add(sub, callback)
message_callback_remove(sub)

具体参数说明如下:


client:
此回调的客户端实例
userdata:
在 Client() 或 user_data_set() 中设置的私有用户数据
message:
MQTTMessage信息实例,这是一个包含成员 topic、payload、qos、retain 的类。
sub:
特定主题
callback:
定义的callback函数


on_message()用例:

def on_message(client, userdata, message):
    print("Received message '" + str(message.payload) + "' on topic '"
        + message.topic + "' with QoS " + str(message.qos))

mqttc.on_message = on_message
...

另外在举个例子说明下何时使用message_callback_add(),如果客户端订阅了sensors/#系列主题(#为通配符),可能收到消息的主题有sensors/temperature和sensors/humidity,则可以使用message_callback_add()定义两个主题过滤器回调,分别处理这两个主题下收到的消息。

def temperature_callback(client, userdata, message):
    ...

def humidity_callback(client, userdata, message):
    ...

mqttclient.message_callback_add("sensors/temperature", temperature_callback)
mqttc.message_callback_add("sensors/humidity", humidity_callback)

mqttc.subscribe("sensors/#")
(4)发布消息

on_publish():

on_publish(client, userdata, mid)
(5)订阅主题

on_subscribe():当代理响应订阅请求时调用。

on_subscribe(client, userdata, mid, granted_qos)
(6)取消订阅

on_unsubscribe():当代理响应取消订阅请求时调用。

on_unsubscribe(client, userdata, mid)
(7)收到日志

on_log():当客户端有日志信息时调用。 定义该回调可用于调试。 级别变量给出消息的严重性,将是 MQTT_LOG_INFO、MQTT_LOG_NOTICE、MQTT_LOG_WARNING、MQTT_LOG_ERR 和 MQTT_LOG_DEBUG 之一。

on_log(client, userdata, level, buf)
参考资料

Paho Python Client Documentation
Paho MQTT Python GitHub Repository
Steve’s Internet Guide

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐