Paho-MQTT是由Eclipse基金会开发的开源Python MQTT客户端。Paho-MQTT可以在任何支持Python的设备上运行。在本教程中,我们将使用 Paho 构建一个 MQTT 客户端。我将把库的每个功能添加到客户端程序中,并解释它是如何工作的。在本教程结束时,您将对库的工作原理有一个基本的了解。

如果您不熟悉 MQTT,最好先学习我的上一篇《MQTT基础知识及工作原理》

0. 安装 Python MQTT 客户端 Paho-MQTT

Paho MQTT 需要 Python 版本 3.4+。要进行安装,请打开主机或终端,然后输入:

pip install paho-mqtt

1. 建立与 MQTT 代理的连接

本教程将使用 Eclipse 提供的公共 MQTT 代理。

警告: 不要在生产环境中使用公有 MQTT 代理。

要在PC上运行代理,您可以安装mosquitto:
在Windows和Ubuntu/Debian上安装Mosquitto

建立与 MQTT 代理的连接

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  client = mqtt.Client()
client.connect(broker_url, broker_port)

这是怎么回事?

  1. 我们导入 paho 库,并将代理地址设置为 ,将端口号设置为 。iot.eclipse.org1883

1883是 MQTT 中所有未加密连接的缺省端口号。

  1. 我们创建一个 MQTT 客户端对象并调用它。我们将在下一节中看到有关 paho 客户端对象的更多信息。client

  2. 接下来,我们使用代理的地址和端口号调用函数。connect()

如果连接成功,该函数将返回 0。connect()

让我们分解一下客户端对象:

1.1 客户端对象

该对象创建 MQTT 客户机。它需要4个可选参数:client()

client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")

client_id是客户端在连接到代理时给出的唯一字符串。如果您未提供客户端 ID,代理将向客户端分配一个客户端 ID。

每个客户端必须具有唯一的客户端 ID。代理使用客户机 ID 来唯一标识每个用户。如果使用相同的客户端 ID 连接第二个客户端,则第一个客户端将断开连接。

clean_session是默认设置为True的布尔值。

如果设置为False,则代理存储有关客户端的信息。
如果设置为True,代理将删除有关客户端的所有存储信息。
要了解干净会话,请参阅: MQTT 中的干净会话说明

userdata是可以作为参数发送到回调的数据。有关回调的更多信息,请参见第 4 节。

protocol可以是其中之一,也可以取决于您要使用的版本。MQTTv31MQTTv311

transport缺省值为 。如果要通过WebSockets发送消息,请设置为 。tcpwebsockets

为了干净地断开与代理的连接,我们可以使用函数。disconnect()

2. 发布消息

要发布消息,我们使用该函数。该函数采用 4 个参数:publish()

publish(topic, payload=None, qos=0, retain=False)

topic是包含主题名称的字符串。

主题名称区分大小写。

payload是一个字符串,其中包含将发布到主题的消息。订阅该主题的任何客户端都将看到有效负载消息。

这些是可选参数:

qos为 0、1 或 2。它默认为 0。服务质量是保证收到消息的级别。
要了解不同的 MQTT 服务级别质量,请参阅此帖子
: MQTT QoS 级别说明

retain是默认为False的布尔值。如果设置为 True,则它告诉代理将该主题上的该消息存储为"最后一条好消息"。
要了解 MQTT 中的消息保留,请参阅以下内容:MQTT 保留的消息

将发布函数添加到我们的代码中:

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  client = mqtt.Client()
client.connect(broker_url, broker_port)
client.publish(topic="TestingTopic", payload="TestingPayload", qos=0, retain=False)

要检查消息是否已成功发布到某个主题,我们需要一个订阅该主题的客户端。

## 3. 订阅主题

要订阅主题,我们需要该函数。该函数采用 2 个参数subscribe()

```powershell
subscribe("topicName", qos=0)

topic是包含主题名称的字符串。

注意:主题名称区分大小写。

qos为 0、1 或 2。它将降级为 0。

让我们修改我们的程序以订阅要发布到的主题:

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  client = mqtt.Client()
client.connect(broker_url, broker_port)  client.subscribe("TestingTopic", qos=1)  client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)
注意:如果您希望客户端订阅多个主题,则可以将它们放在元组列表中。

示例:client.subscribe(['topicName1', 1),('topicName2', 1)])

元组的格式为 [(主题名称,QoS 级别)]

当您执行此程序时,您会注意到已发布的消息仍未显示在控制台/终端上。

订阅某个主题会告诉代理向您发送发布到该主题的消息。我们已经订阅了该主题,但我们需要一个回调函数来处理这些消息。

  1. 回调
    回调是在事件发生时执行的函数。在 paho 中,这些事件包括连接、断开连接、订阅、取消订阅、发布、接收消息、日志记录。

在对程序实现回调之前,我们需要首先了解程序如何调用这些回调。为此,我们将使用 paho 中可用的不同循环函数。

4.1 循环函数

循环是客户端对象的函数。当客户端收到消息时,该消息将存储在接收缓冲区中。当要将消息从客户端发送到代理时,该消息将存储在发送缓冲区中。循环函数用于处理缓冲区中的任何消息并调用相应的回调函数。它们还将尝试在断开连接时重新连接到代理。

大多数循环函数都是异步运行的,这意味着当调用循环函数时,它将在单独的线程上运行。

有 3 种类型的循环函数:

4.1.1 loop_forever():这是一个阻塞循环函数。这意味着循环函数将继续运行,并且在调用此函数后无法执行任何其他行。如果要无限期地运行程序,并且程序中有单个客户端构造函数,请使用此功能。

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  client = mqtt.Client()
client.connect(broker_url, broker_port)  client.subscribe("TestingTopic", qos=1)  client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)  client.loop_forever()

4.1.2 loop_start()/loop_stop(): 这是一个非阻塞循环函数,这意味着你可以调用这个函数,并在函数调用后继续执行代码。顾名思义,loop_start() 用于启动循环函数,loop_stop() 用于停止循环函数。如果需要在同一程序中创建多个客户端对象,则可以使用 loop_start()。

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  client = mqtt.Client()
client.connect(broker_url, broker_port)  client.subscribe("TestingTopic", qos=1)  client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)  client.loop_start()
#Some Executable Code Here
client.loop_stop()
#Client Loop Stops After loop_stop() is called

4.1.3 loop():这是一个阻塞循环函数。loop() 和 loop_forever() 之间的区别在于,如果调用 loop() 函数,则必须手动处理重新连接,这与后者不同。loop_forever() & loop_start() 函数将在断开连接时自动尝试重新连接到代理。除非在特殊情况下,否则不建议使用 loop()。

现在回到回调。在下一节中,我们将在程序中实现每个回调。

4.2 on_connect

每次客户端连接/重新连接到代理时都会调用回调。让我们将回调添加到我们的程序中。on_connect()

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  def on_connect(client, userdata, flags, rc):
 print("Connected With Result Code: {}".format(rc))  def on_disconnect(client, userdata, rc):
 print("Client Got Disconnected")  client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker_url, broker_port)  client.subscribe("TestingTopic", qos=1)  client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)  client.loop_forever()

必须将回调分配给客户端对象。如果不是,则回调将不会执行

这是怎么回事?

我们创建on_connect回调函数。它需要4个参数:客户端对象,用户数据,标志,rc。
对象。client
userdata是在客户端构造函数中声明的自定义数据。如果要将自定义数据传递到回调中,则需要它。
flags是一个字典对象,用于检查是否已将客户端对象的 clean 会话设置为 True 或 False。
rc这是result code,用于检查连接状态。不同的结果代码是:

0:连接成功 1:连接被拒绝 – 协议不正确 版本 2:连接被拒绝 – 客户端标识符无效 3:连接被拒绝 – 服务器不可用 4:连接被拒绝 – 用户名或密码错误 5:连接被拒绝 – 未授权 6-255:当前未使用。

该程序的输出是:

Connected With Result Code 0

这意味着连接成功。

4.3 on_disconnect
当客户端与代理断开连接时,将调用回调。on_disconnect()

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  def on_connect(client, userdata, flags, rc):
 print("Connected With Result Code " (rc))  def on_disconnect(client, userdata, rc):
 print("Client Got Disconnected")  client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker_url, broker_port)  client.subscribe("TestingTopic", qos=1)  client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)  client.loop_forever()

不要忘记将回调函数分配给客户端对象!
client.on_disconnect = on_disconnect

运行该程序,断开互联网连接,然后查看是否打印了消息"客户端已断开连接"。此外,重新连接到互联网以查看客户端是否重新连接到代理。

可以将多个客户端对象附加到单个回调函数。当您的程序连接到多个代理时,这很有用。

4.4 on_message

回到尽管订阅了主题但仍未显示消息的问题:回调用于处理发布到已订阅主题的消息。on_message()

在我们的程序中,我们需要做3件事:1.
订阅我们要发布的主题。
2. 使用回调处理已发布的消息。在我们的程序中,我们将简单地打印消息。
3. 将回调函数分配给客户端对象的属性。on_message

让我们创建另一个 mqtt 客户端。此新程序将向已订阅现有程序的主题发布消息。

sub.py(接收客户端)

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  def on_connect(client, userdata, flags, rc):
 print("Connected With Result Code "+rc)  def on_message(client, userdata, message):
 print("Message Recieved: "+message.payload.decode())  client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker_url, broker_port)  client.subscribe("TestingTopic", qos=1)  client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)  client.loop_forever()

pub.py(发布客户端)

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  client = mqtt.Client()
client.connect(broker_url, broker_port)  client.publish(topic="TestingTopic", payload="TestingPayload", qos=1, retain=False)

on_message() 回调有 3 个参数:、 & 。我已经解释了本节中的前两个。
该对象有 4 个属性:、 、 、 。clientuserdatamessageon_connect()messagetopicpayloadqosretain

如果您注意到对象的每个属性也是我们在客户端函数中使用的参数。messagepublish()

先运行,然后运行 。输出将显示消息和主题。sub.pypub.py

如果邮件未打印出来,请检查是否:

您已将该函数添加到客户端对象的属性中。on_message()
如果您已订阅 了 中的回调中的主题。on_connect()sub.py
如果已分别在 和 函数中正确输入主题名称。subscribe()publish()sub.pypub.py
如果要在函数中打印消息。on_message()
如何组织和处理消息?

如果您从不同的主题收到多条消息,并且需要以不同的方式处理每个主题的消息,那么我们必须对消息进行排序。

执行此操作的一种方法是使用该属性检查将消息发布到哪个主题。然后,您可以创建 if 条件来相应地处理消息。message.topic

更好的方法是使用对象的功能。这将创建多个回调来处理来自不同主题的消息。
参数采用主题名称,参数采用将处理该主题消息的回调的名称。message_callback_add(sub, callback)clientsubcallback

例:

sub.py

import paho.mqtt.client as mqtt  broker_url = "mqtt.eclipse.org"
broker_port = 1883  def on_connect(client, userdata, flags, rc):
 print("Connected With Result Code " (rc))  def on_message_from_kitchen(client, userdata, message):
 print("Message Recieved from Kitchen: "+message.payload.decode())  def on_message_from_bedroom(client, userdata, message):
 print("Message Recieved from Bedroom: "+message.payload.decode())  def on_message(client, userdata, message):
 print("Message Recieved from Others: "+message.payload.decode())  client = mqtt.Client()
client.on_connect = on_connect
#To Process Every Other Message
client.on_message = on_message
client.connect(broker_url, broker_port)  client.subscribe("TestingTopic", qos=1)
client.subscribe("KitchenTopic", qos=1)
client.subscribe("BedroomTopic", qos=1)
client.message_callback_add("KitchenTopic", on_message_from_kitchen)
client.message_callback_add("BedroomTopic", on_message_from_bedroom)  client.loop_forever()

在向其添加回调之前,请确保您已订阅这两个主题。

默认情况下,来自其他主题的消息将由 on_message() 处理

您可以通过发布到"KitchenTopic"和"卧室主题"并查看它是否单独处理来尝试上述程序。

4.5 on_publish

执行函数时调用该函数。这将返回一个元组 。on_publish()publish()(result, mid)

result是错误代码。错误代码 0 表示消息已成功发布。

mid代表消息 ID。它是一个整数,是客户端分配的唯一消息标识符。如果使用 QoS 级别 1 或 2,则客户端循环将使用 标识尚未发送的消息。mid

4.6 on_subscribe()/on_unsubscribe()

当客户端订阅主题时,将调用回调。例:on_subscribe()on_subscribe(client, userdata, mid, granted_qos)

当客户端取消订阅某个主题时,将调用回调。例:on_unsubscribe()on_unsubscribe(client, userdata, mid)

mid是函数中讨论的消息 id。on_publish()

granted_qos是订阅时该主题的 qos 级别。如果需要,您可以在程序中尝试一下。

5. 其他有用的 Paho-MQTT 功能

Paho还有一些有用的函数,可以在程序中使用它们来执行函数,而不必创建客户端构造函数并经历创建回调的麻烦。

5.1 单() / 多() 发布

单个或多个函数用于将单个消息或多条消息发布到代理上的主题,而无需创建客户机对象。

single(topic, payload=None, qos=0, retain=False, hostname=broker_url, port=broker_port, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")

topic是唯一必需的参数。这是一个包含主题名称的字符串。

auth是包含用户名和密码的字典(如果代理需要)。(eclipse 代理不需要身份验证)。
例:auth = {‘username’:“username”, ‘password’:""}

tls如果我们使用 TLS/SSL 加密,则需要。
例:dict = {‘ca_certs’:"", ‘certfile’:"", ‘keyfile’:"", ‘tls_version’:"", ‘ciphers’:"<ciphers">}

transport接受 2 个值:和 。如果您不想使用websockets,请将其设置为 。websocketstcptcp

其他参数类似于客户端对象中的参数。

multiple(msgs, hostname=broker_url, port=broker_port, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")

msgs是必需的参数。它包含要发布的消息的列表。每条消息都可以是字典或元组。

字典必须采用以下格式:
msg = {‘topic’:"< topicname >", ‘payload’:"", ‘qos’:‘0’, ‘retain’:‘False’}

元组必须采用以下格式:
("< topicname >", “< payload >”, qos, retain)

在这两种格式中,名称都必须存在。topic

5.2 简单() / 回调()

simple 是一个阻止函数,它订阅一个主题或一个主题列表,并返回发布到该主题的消息。

simple(topics, qos=0, msg_count=1, retained=False, hostname=“localhost”, port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311)

topics是唯一必需的参数。这可以是单个主题的字符串,也可以是多个主题的列表。

msg_count是在断开与代理的连接之前必须返回的消息数。对于 msg_count > 1,该函数将返回消息列表。

其他参数已经在 single() / multiple() 部分中进行了解释。

callback与 相似,唯一的区别是它需要一个额外的参数,即回调。简单函数仅返回来自主题的消息,回调函数将返回的消息发送到任何函数进行处理。simple
callback(callback, topics, qos=0, userdata=None, hostname=“iot.eclipse.org”, port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, protocol=mqtt.MQTTv311)

6. Will_set()

这是一个非常有用的功能。当客户端连接到代理时,它会告诉代理,如果它断开连接,它必须向主题发布消息。它是客户端构造函数的一个属性。

client.will_set(topic, payload=None, qos=0, retain=False)

要了解如何以及何时在MQTT中使用Last Will & Testament,请参阅这篇文章
:MQTT Last Will and Testament(用示例解释)

您的反馈对我来说非常重要。如果本教程对您有所帮助,或者,如果文章中的某些部分有错误或解释不正确,请在下面的评论中告诉我。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐