Flask是一个用Python编写的轻量级Web应用程序框架,之所以称为“微框架”,是因为它使用了一个简单的核心来扩展其他功能,如ORM、表单验证工具、文件上传、各种开放认证技术、等等

MQTT是一种基于发布/订阅模式的轻量级物联网(IoT)消息传输协议。它可以以极少的代码和更小的带宽为联网设备提供实时可靠的消息服务。广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。

本文主要介绍如何在Flask项目中使用MQTT,实现MQTT客户端和MQTT代理之间的连接、订阅、消息传递、取消订阅等功能。

我们将使用Flask-MQTT客户端库,它是一个 Flask 扩展,可以看作是paho-mqtt的装饰器,以简化 Flask 应用程序中的 MQTT 集成。

项目初始化

本项目使用 Python 3.8 开发和测试,用户可以使用以下命令验证 Python 的版本。

$ python3 --version
Python 3.8.2

使用 Pip 安装 Flask-MQTT 库。

pip3 install flask-mqtt

使用 Flask-MQTT

我们将采用 EMQ 提供的免费公共 MQTT 代理,它是在MQTT 云服务 - EMQX Cloud的基础上创建的。以下是服务器访问信息:

  • 经纪人:broker.emqx.io

  • TCP 端口:1883

  • Websocket 端口:8083

导入烧瓶-MQTT

导入 Flask 库和 Flask-MQTT 扩展,并创建 Flask 应用程序。

from flask import Flask, request, jsonify
from flask_mqtt import Mqtt

app = Flask(__name__)

配置Flask-MQTT扩展

app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = ''  # Set this item when you need to verify username and password
app.config['MQTT_PASSWORD'] = ''  # Set this item when you need to verify username and password
app.config['MQTT_KEEPALIVE'] = 5  # Set KeepAlive time in seconds
app.config['MQTT_TLS_ENABLED'] = False  # If your server supports TLS, set it True
topic = '/flask/mqtt'

mqtt_client = Mqtt(app)

完整的配置项请参考Flask-MQTT配置文档。

编写连接回调函数

我们可以在这个回调函数中处理成功或失败的MQTT连接,本例会在连接成功后订阅/flask/mqtt主题。

@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
   if rc == 0:
       print('Connected successfully')
       mqtt_client.subscribe(topic) # subscribe topic
   else:
       print('Bad connection. Code:', rc)

编写消息回调函数

此函数将打印/flask/mqtt主题收到的消息。

@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
   data = dict(
       topic=message.topic,
       payload=message.payload.decode()
  )
   print('Received message on topic: {topic} with payload: {payload}'.format(**data))

创建消息发布API

我们创建一个简单的 POST API 来发布 MQTT 消息。

在实际情况下,API 可能需要一些更复杂的业务逻辑处理。

@app.route('/publish', methods=['POST'])
def publish_message():
   request_data = request.get_json()
   publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
   return jsonify({'code': publish_result[0]})

运行 Flask 应用程序

当 Flask 应用启动时,MQTT 客户端会连接到服务器并订阅主题/flask/mqtt

if __name__ == '__main__':
   app.run(host='127.0.0.1', port=5000)

测试

现在,我们使用MQTT 客户端 - MQTT X来连接、订阅和发布测试。

接收消息

  1. 在 MQTT X 中创建连接并连接到 MQTT 服务器。

MQTT X 新连接

  1. Hello from MQTT X发布到 MQTT X 中的/flask/mqtt主题。

MQTT X 发布 MQTT 消息

3、我们会在Flask运行窗口看到MQTT X发送的消息。

Flask 接收 MQTT 消息

发布消息

1、订阅MQTT X中的/flask/mqtt主题。

MQTT X 订阅

2.使用Postman调用/publishAPI:发送消息Hello from Flask/flask/mqtt主题。

邮递员测试

3、我们可以在MQTT X中看到Flask发送的消息。

Flask 发布 MQTT 消息

完整代码

from flask import Flask, request, jsonify
from flask_mqtt import Mqtt

app = Flask(__name__)

app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = ''  # Set this item when you need to verify username and password
app.config['MQTT_PASSWORD'] = ''  # Set this item when you need to verify username and password
app.config['MQTT_KEEPALIVE'] = 5  # Set KeepAlive time in seconds
app.config['MQTT_TLS_ENABLED'] = False  # If your broker supports TLS, set it True
topic = '/flask/mqtt'

mqtt_client = Mqtt(app)


@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
   if rc == 0:
       print('Connected successfully')
       mqtt_client.subscribe(topic) # subscribe topic
   else:
       print('Bad connection. Code:', rc)


@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
   data = dict(
       topic=message.topic,
       payload=message.payload.decode()
  )
   print('Received message on topic: {topic} with payload: {payload}'.format(**data))


@app.route('/publish', methods=['POST'])
def publish_message():
   request_data = request.get_json()
   publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
   return jsonify({'code': publish_result[0]})

if __name__ == '__main__':
   app.run(host='127.0.0.1', port=5000)

限制

Flask-MQTT 目前不适合与多个工作实例一起使用。 因此,如果您使用像 geventgunicorn 这样的 WSGI 服务器,请确保您只有一个工作实例。

总结

至此,我们已经完成了一个使用 Flask-MQTT 的简单 MQTT 客户端,可以在 Flask 应用中订阅和发布消息。

原文发表于emqx.com。

Logo

ModelScope旨在打造下一代开源的模型即服务共享平台,为泛AI开发者提供灵活、易用、低成本的一站式模型服务产品,让模型应用更简单!

更多推荐