目录

做 C# WinForms、后台服务、物联网设备接入、客户端消息推送的时候,很多人第一反应可能是 WebSocket、HTTP 轮询、SignalR。但如果你的业务场景是“多个客户端之间进行轻量级消息通信”,或者设备端、客户端需要长连接实时收发消息,那么 MQTT 是一个非常值得了解的协议。

一、MQTT 是什么?

​编辑

二、MQTT 的核心概念

1. Broker:消息代理服务器

2. Client:客户端

3. Topic:主题

4. Publish:发布消息

5. Subscribe:订阅主题

6. QoS:消息服务质量

三、MQTT 在系统中的位置

​编辑

四、M2Mqtt 是什么?

五、阿里云 MQTT 连接需要哪些参数?

六、VS2017 创建 .NET Framework 4.5 项目

1. 创建项目

2. 安装 M2Mqtt

七、C# 计算阿里云 MQTT Password

八、使用 M2Mqtt 连接阿里云 MQTT

九、代码重点讲解

1. 创建 MqttClient

2. 连接 MQTT

3. 订阅 Topic

4. 发布消息

十、WinForms 项目中怎么用?

十一、常见错误排查

1. 连接不上,提示认证失败

2. 能连接,但收不到消息

3. 程序一会儿就断开

4. WinForms 界面卡死

十二、生产环境建议

     ​编辑

这样做的好处是:

十三、完整项目结构建议

十四、总结


做 C# WinForms、后台服务、物联网设备接入、客户端消息推送的时候,很多人第一反应可能是 WebSocket、HTTP 轮询、SignalR。但如果你的业务场景是“多个客户端之间进行轻量级消息通信”,或者设备端、客户端需要长连接实时收发消息,那么 MQTT 是一个非常值得了解的协议。

本文就以 C# .NET Framework 4.5 + Visual Studio 2017 + 开源 M2Mqtt 为例,讲一下如何连接 阿里云云消息队列 MQTT 版,并完成订阅、发布消息。

本文适合以下场景:

  • 老项目仍然使用 .NET Framework 4.5 / 4.5.2;

  • WinForms 客户端需要接入 MQTT;

  • 想用开源 M2Mqtt 快速实现 MQTT 连接;

  • 想对接阿里云 MQTT 产品;

  • 想搞清楚 ClientId、Topic、QoS、Broker、UserName、Password 到底是什么。


一、MQTT 是什么?

MQTT,全称是 Message Queuing Telemetry Transport,中文通常叫 消息队列遥测传输协议

它是一种非常轻量级的消息通信协议,最早主要用于物联网、嵌入式设备、弱网络环境。现在除了物联网,在桌面客户端、移动端消息推送、客服系统、呼叫中心状态同步、在线聊天、设备状态上报等场景中也经常使用。

简单说,MQTT 解决的是这样一个问题:

客户端 A 想把消息发给客户端 B,但 A 不直接找 B,而是把消息发给中间的 MQTT 服务器,再由服务器转发给订阅了对应主题的客户端。

这个中间的服务器,叫 Broker


二、MQTT 的核心概念

1. Broker:消息代理服务器

Broker 可以理解成 MQTT 的“中转站”。

所有客户端都连接到 Broker,客户端之间并不直接通信。客户端只需要把消息发布到某个 Topic,Broker 会负责把这条消息转发给订阅了该 Topic 的其他客户端。

在本文中,阿里云云消息队列 MQTT 版就扮演 Broker 的角色。

2. Client:客户端

Client 就是连接 MQTT Broker 的程序。

比如:

  • C# WinForms 客户端;

  • Android App;

  • iOS App;

  • 嵌入式设备;

  • 后台服务;

  • WebSocket 网关服务。

只要它能使用 MQTT 协议连接 Broker,它就是 MQTT Client。

3. Topic:主题

Topic 可以理解为消息的“路由地址”。

例如:

chat/user/10001
device/temperature
callcenter/agent/status
test/topic

发布者把消息发到某个 Topic,订阅者订阅这个 Topic,就能收到消息。

比如:

客户端 A 发布:

Topic: chat/user/10001
Message: 你好,我是 A

客户端 B 如果订阅了:

chat/user/10001

那么 B 就可以收到这条消息。

4. Publish:发布消息

Publish 就是往某个 Topic 发送消息。

比如:

往 topic/chat 发送一条消息:hello mqtt

5. Subscribe:订阅主题

Subscribe 就是告诉 Broker:

这个 Topic 有消息时,请转发给我。

比如客户端订阅:

topic/chat

以后只要有人往 topic/chat 发消息,Broker 就会把消息推送给它。

6. QoS:消息服务质量

QoS 是 Quality of Service 的缩写,表示 MQTT 消息投递质量。

常见有三个级别:

QoS 含义 说明
0 最多一次 发出去就不管了,可能丢消息,但性能最好
1 至少一次 保证消息至少到达一次,但可能重复
2 只有一次 保证消息只到达一次,可靠性最高,但性能开销最大

在业务系统里,如果只是普通状态通知,QoS 0 或 QoS 1 就够了。如果是比较重要的业务事件,可以考虑 QoS 1。QoS 2 虽然可靠,但协议交互更多,性能成本也更高,不建议一上来就滥用。


三、MQTT 在系统中的位置

如果你做过传统 HTTP 接口,可能是这样的:

客户端  --->  HTTP API  --->  服务端

HTTP 最大的特点是请求响应模型:客户端请求一次,服务端返回一次。服务端不能天然主动推送消息给客户端。

而 MQTT 更像这样:

客户端 A  ─┐
客户端 B  ─┼──>  MQTT Broker  ───>  订阅者客户端
客户端 C  ─┘

客户端与 Broker 保持长连接。只要 Topic 有消息,Broker 就可以主动推送给订阅者。


四、M2Mqtt 是什么?

M2Mqtt 是一个开源的 .NET MQTT 客户端库,属于 Eclipse Paho 相关生态。它可以让 C# 程序非常方便地连接 MQTT Broker,完成:

  • 连接 MQTT 服务端;

  • 订阅 Topic;

  • 发布消息;

  • 接收消息;

  • 处理连接状态。

M2Mqtt 的优势是:

  • 对 .NET Framework 老项目友好;

  • API 简单;

  • WinForms、控制台、Windows 服务都能用;

  • 适合快速接入 MQTT。

不过也要客观说一句:M2Mqtt 比较老,最后活跃时间并不算新。如果是全新的 .NET 6 / .NET 8 项目,可以优先考虑 MQTTnet。但如果你的项目是 .NET Framework 4.5,尤其是 VS2017 老项目,M2Mqtt 仍然是一个比较顺手的选择。


五、阿里云 MQTT 连接需要哪些参数?

连接阿里云云消息队列 MQTT 版,一般需要准备这些信息:

参数 说明
Broker 地址 阿里云 MQTT 实例的接入点地址
Port MQTT 端口,常见为 1883 或 SSL 端口
InstanceId MQTT 实例 ID,例如 mqtt-xxxxxx
ClientId 客户端 ID,通常格式为 GroupID@@@DeviceID
GroupID 客户端分组 ID
DeviceAccessKeyId 一机一密模式下的设备访问凭证 ID
DeviceAccessKeySecret 一机一密模式下的设备访问凭证密钥
UserName 按阿里云规则拼接
Password 按阿里云规则签名计算
Topic 要发布或订阅的主题

这里面最容易搞错的是 ClientIdUserNamePassword

以一机一密模式为例:

ClientId = GID_Test@@@0001
UserName = DeviceCredential|DeviceAccessKeyId|InstanceId
Password = Base64(HMACSHA1(ClientId, DeviceAccessKeySecret))

注意:这里的 Password 不是你自己随便写的密码,而是签名计算出来的字符串。


六、VS2017 创建 .NET Framework 4.5 项目

下面开始写代码。

1. 创建项目

打开 Visual Studio 2017:

文件 -> 新建 -> 项目 -> Visual C# -> Windows 桌面 -> 控制台应用程序

目标框架选择:

.NET Framework 4.5

如果你是 WinForms 项目也可以,本文先用控制台项目演示,方便看日志和验证连接。

2. 安装 M2Mqtt

方式一:NuGet 包管理器安装

工具 -> NuGet 包管理器 -> 管理解决方案的 NuGet 程序包

搜索:

M2Mqtt

然后安装。

方式二:程序包管理器控制台安装:

Install-Package M2Mqtt

如果你的 NuGet 源访问比较慢,可以先检查 VS2017 的 NuGet 包源是否正常。


七、C# 计算阿里云 MQTT Password

阿里云一机一密模式下,Password 需要使用 DeviceAccessKeySecretClientId 做 HMAC-SHA1 签名,然后进行 Base64 编码。

新建一个帮助类:

using System;
using System.Security.Cryptography;
using System.Text;

namespace AliyunMqttDemo
{
    public static class AliyunMqttSignHelper
    {
        /// <summary>
        /// 计算阿里云 MQTT 一机一密模式下的 Password
        /// Password = Base64(HMACSHA1(ClientId, DeviceAccessKeySecret))
        /// </summary>
        /// <param name="clientId">MQTT ClientId,例如:GID_Test@@@0001</param>
        /// <param name="deviceAccessKeySecret">设备访问凭证密钥</param>
        /// <returns>签名后的 Password</returns>
        public static string HmacSha1Base64(string clientId, string deviceAccessKeySecret)
        {
            if (string.IsNullOrWhiteSpace(clientId))
                throw new ArgumentException("clientId 不能为空");

            if (string.IsNullOrWhiteSpace(deviceAccessKeySecret))
                throw new ArgumentException("deviceAccessKeySecret 不能为空");

            byte[] keyBytes = Encoding.UTF8.GetBytes(deviceAccessKeySecret);
            byte[] dataBytes = Encoding.UTF8.GetBytes(clientId);

            using (var hmac = new HMACSHA1(keyBytes))
            {
                byte[] hashBytes = hmac.ComputeHash(dataBytes);
                return Convert.ToBase64String(hashBytes);
            }
        }
    }
}

这段代码的核心逻辑是:

new HMACSHA1(keyBytes)

这里的 keyBytes 就是 DeviceAccessKeySecret

然后:

hmac.ComputeHash(dataBytes)

这里的 dataBytes 就是 ClientId

最后:

Convert.ToBase64String(hashBytes)

把二进制签名结果转成 Base64 字符串,这个字符串就是连接 MQTT 时要传入的 Password。


八、使用 M2Mqtt 连接阿里云 MQTT

下面写完整连接代码。

using System;
using System.Text;
using System.Threading;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;

namespace AliyunMqttDemo
{
    class Program
    {
        static MqttClient _client;

        static void Main(string[] args)
        {
            // 1. 阿里云 MQTT 接入点地址
            // 示例:mqtt-xxxxxx.mqtt.aliyuncs.com
            string brokerHost = "你的阿里云MQTT接入点";

            // 2. 端口
            // 普通 TCP 通常是 1883,SSL/TLS 端口以控制台实际显示为准
            int brokerPort = 1883;

            // 3. 实例 ID
            string instanceId = "mqtt-xxxxxx";

            // 4. GroupID 和 DeviceID
            string groupId = "GID_Test";
            string deviceId = "0001";

            // 5. ClientId
            string clientId = groupId + "@@@" + deviceId;

            // 6. 一机一密凭证
            string deviceAccessKeyId = "你的DeviceAccessKeyId";
            string deviceAccessKeySecret = "你的DeviceAccessKeySecret";

            // 7. UserName
            string userName = "DeviceCredential|" + deviceAccessKeyId + "|" + instanceId;

            // 8. Password
            string password = AliyunMqttSignHelper.HmacSha1Base64(clientId, deviceAccessKeySecret);

            // 9. Topic
            string topic = "test/topic";

            try
            {
                Console.WriteLine("开始创建 MQTT 客户端...");

                _client = new MqttClient(brokerHost, brokerPort, false, null, null, MqttSslProtocols.None);

                // 注册接收消息事件
                _client.MqttMsgPublishReceived += Client_MqttMsgPublishReceived;

                // 注册发布完成事件
                _client.MqttMsgPublished += Client_MqttMsgPublished;

                Console.WriteLine("开始连接阿里云 MQTT...");

                byte connectResult = _client.Connect(
                    clientId,
                    userName,
                    password,
                    false,
                    60
                );

                Console.WriteLine("连接结果:" + connectResult);

                if (_client.IsConnected)
                {
                    Console.WriteLine("MQTT 连接成功!");

                    // 订阅主题
                    _client.Subscribe(
                        new string[] { topic },
                        new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE }
                    );

                    Console.WriteLine("已订阅 Topic:" + topic);

                    // 发布一条测试消息
                    string message = "Hello Aliyun MQTT, 当前时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");

                    ushort messageId = _client.Publish(
                        topic,
                        Encoding.UTF8.GetBytes(message),
                        MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE,
                        false
                    );

                    Console.WriteLine("已发布消息,MessageId:" + messageId);
                    Console.WriteLine("消息内容:" + message);
                }
                else
                {
                    Console.WriteLine("MQTT 连接失败!");
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine("MQTT 异常:" + ex.Message);
                Console.WriteLine(ex.ToString());
            }

            Console.WriteLine("按任意键退出...");
            Console.ReadKey();

            if (_client != null && _client.IsConnected)
            {
                _client.Disconnect();
            }
        }

        private static void Client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
        {
            string topic = e.Topic;
            string message = Encoding.UTF8.GetString(e.Message);

            Console.WriteLine("收到消息:");
            Console.WriteLine("Topic:" + topic);
            Console.WriteLine("Message:" + message);
        }

        private static void Client_MqttMsgPublished(object sender, MqttMsgPublishedEventArgs e)
        {
            Console.WriteLine("消息发布完成,MessageId:" + e.MessageId + ",IsPublished:" + e.IsPublished);
        }
    }
}

九、代码重点讲解

1. 创建 MqttClient

_client = new MqttClient(brokerHost, brokerPort, false, null, null, MqttSslProtocols.None);

这里的参数含义是:

参数 说明
brokerHost MQTT 服务器地址
brokerPort MQTT 端口
secure 是否启用 SSL/TLS
caCert CA 证书
clientCert 客户端证书
sslProtocol SSL/TLS 协议版本

本文示例使用普通 TCP,所以 securefalse

如果你使用的是 SSL/TLS 接入点,就需要改成 true,并根据实际情况配置证书和协议。

2. 连接 MQTT

_client.Connect(clientId, userName, password, false, 60);

这里几个参数非常关键:

参数 说明
clientId MQTT 客户端唯一标识
userName 阿里云鉴权用户名
password 阿里云签名密码
cleanSession 是否清理会话
keepAlivePeriod 心跳周期,单位秒

cleanSession 设置为 false 表示保留会话。是否保留会话要看业务需求,如果是普通客户端在线消息,可以先用 truefalse 测试;如果需要离线消息、持久订阅,要结合阿里云控制台配置和 Topic 权限一起考虑。

3. 订阅 Topic

_client.Subscribe(
    new string[] { topic },
    new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE }
);

这表示订阅一个 Topic,并使用 QoS 1。

M2Mqtt 中常见 QoS 常量如下:

MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE      // QoS 0
MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE     // QoS 1
MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE      // QoS 2

4. 发布消息

_client.Publish(
    topic,
    Encoding.UTF8.GetBytes(message),
    MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE,
    false
);

参数含义:

参数 说明
topic 目标主题
message 消息内容,byte 数组
qosLevel QoS 等级
retain 是否保留消息

retain 一般先设置为 false。保留消息是 MQTT 的高级特性,意思是 Broker 会保留某个 Topic 的最后一条消息,新的订阅者订阅后可以立刻收到。普通业务消息不建议随便开启。


十、WinForms 项目中怎么用?

如果你是 WinForms 项目,建议不要把 MQTT 连接代码直接堆在窗体事件里,而是封装成一个服务类。

比如:

using System;
using System.Text;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;

namespace AliyunMqttDemo
{
    public class AliyunMqttService
    {
        private MqttClient _client;

        public event Action<string, string> OnMessageReceived;

        public bool IsConnected
        {
            get { return _client != null && _client.IsConnected; }
        }

        public void Connect(
            string brokerHost,
            int brokerPort,
            string instanceId,
            string groupId,
            string deviceId,
            string deviceAccessKeyId,
            string deviceAccessKeySecret)
        {
            string clientId = groupId + "@@@" + deviceId;
            string userName = "DeviceCredential|" + deviceAccessKeyId + "|" + instanceId;
            string password = AliyunMqttSignHelper.HmacSha1Base64(clientId, deviceAccessKeySecret);

            _client = new MqttClient(brokerHost, brokerPort, false, null, null, MqttSslProtocols.None);
            _client.MqttMsgPublishReceived += Client_MqttMsgPublishReceived;

            _client.Connect(clientId, userName, password, false, 60);
        }

        public void Subscribe(string topic)
        {
            if (!IsConnected)
                throw new InvalidOperationException("MQTT 未连接");

            _client.Subscribe(
                new string[] { topic },
                new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE }
            );
        }

        public void Publish(string topic, string message)
        {
            if (!IsConnected)
                throw new InvalidOperationException("MQTT 未连接");

            _client.Publish(
                topic,
                Encoding.UTF8.GetBytes(message),
                MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE,
                false
            );
        }

        public void Disconnect()
        {
            if (_client != null && _client.IsConnected)
            {
                _client.Disconnect();
            }
        }

        private void Client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
        {
            string topic = e.Topic;
            string message = Encoding.UTF8.GetString(e.Message);

            if (OnMessageReceived != null)
            {
                OnMessageReceived(topic, message);
            }
        }
    }
}

然后在 WinForms 窗体里调用:

private AliyunMqttService _mqttService = new AliyunMqttService();

private void btnConnect_Click(object sender, EventArgs e)
{
    _mqttService.OnMessageReceived += MqttService_OnMessageReceived;

    _mqttService.Connect(
        "你的阿里云MQTT接入点",
        1883,
        "mqtt-xxxxxx",
        "GID_Test",
        "0001",
        "你的DeviceAccessKeyId",
        "你的DeviceAccessKeySecret"
    );

    _mqttService.Subscribe("test/topic");

    MessageBox.Show("MQTT连接成功");
}

private void btnSend_Click(object sender, EventArgs e)
{
    _mqttService.Publish("test/topic", "这是一条来自 WinForms 的 MQTT 消息");
}

private void MqttService_OnMessageReceived(string topic, string message)
{
    if (this.InvokeRequired)
    {
        this.Invoke(new Action<string, string>(MqttService_OnMessageReceived), topic, message);
        return;
    }

    txtLog.AppendText("收到消息:" + topic + " -> " + message + Environment.NewLine);
}

这里要特别注意:

M2Mqtt 的消息接收事件不一定在 UI 线程触发,所以在 WinForms 里面更新控件时,一定要判断:

this.InvokeRequired

否则容易出现跨线程访问控件异常。


十一、常见错误排查

1. 连接不上,提示认证失败

优先检查:

  • ClientId 是否正确;

  • GroupID@@@DeviceID 格式是否正确;

  • InstanceId 是否填错;

  • DeviceAccessKeyId 是否正确;

  • DeviceAccessKeySecret 是否正确;

  • Password 是否按 HMAC-SHA1 + Base64 计算;

  • UserName 是否是 DeviceCredential|DeviceAccessKeyId|InstanceId

最常见的错误是把 DeviceAccessKeySecret 直接当 Password 传进去,这是不对的。

2. 能连接,但收不到消息

检查:

  • 订阅的 Topic 和发布的 Topic 是否完全一致;

  • Topic 权限是否配置正确;

  • QoS 是否匹配;

  • 是否订阅成功;

  • 是否有其他客户端使用相同 ClientId 抢占连接。

MQTT 中 ClientId 必须唯一。如果两个客户端使用同一个 ClientId 连接 Broker,通常会出现互相踢下线的问题。

3. 程序一会儿就断开

检查:

  • 网络是否稳定;

  • KeepAlive 是否太短;

  • 是否有异常没有捕获;

  • 是否多个地方重复创建连接;

  • 是否被相同 ClientId 的其他客户端挤掉;

  • 是否阿里云控制台有连接数限制。

4. WinForms 界面卡死

不要在 UI 线程里做死循环重连。

错误写法类似:

while (true)
{
    ConnectMqtt();
}

正确做法应该是:

  • 限制重试次数;

  • 每次失败后延迟;

  • 使用后台线程;

  • 做状态标记,避免重复重连;

  • 记录日志时不要疯狂写数据库。


十二、生产环境建议

如果只是 Demo,直接在客户端写 DeviceAccessKeySecret 可以跑通。

但是生产环境不建议这样做。

原因很简单:客户端程序可能被反编译,密钥写死在客户端里有泄露风险。

更合理的方案是:

客户端登录业务系统
        │
        ▼
业务服务端校验用户身份
        │
        ▼
服务端向阿里云申请或查询设备访问凭证
        │
        ▼
服务端把 MQTT 连接参数下发给客户端
        │
        ▼
客户端连接阿里云 MQTT

     

这样做的好处是:

  • AccessKey 不暴露在客户端;

  • 可以按用户、设备、租户做权限控制;

  • 可以禁用某个设备;

  • 可以更容易做审计和风控;

  • 客户端泄露后影响范围更小。


十三、完整项目结构建议

如果是正式项目,建议结构如下:

AliyunMqttDemo
│
├── Program.cs
│
├── Mqtt
│   ├── AliyunMqttService.cs
│   ├── AliyunMqttSignHelper.cs
│   └── MqttConfig.cs
│
└── App.config

MqttConfig.cs 可以这样写:

namespace AliyunMqttDemo
{
    public class MqttConfig
    {
        public string BrokerHost { get; set; }
        public int BrokerPort { get; set; }
        public string InstanceId { get; set; }
        public string GroupId { get; set; }
        public string DeviceId { get; set; }
        public string DeviceAccessKeyId { get; set; }
        public string DeviceAccessKeySecret { get; set; }
        public string Topic { get; set; }
    }
}

这样后期你要做配置文件、数据库配置、接口下发配置都会更方便。


十四、总结

MQTT 的核心思想是发布订阅模型。客户端不直接互相通信,而是通过 Broker 转发消息。

阿里云云消息队列 MQTT 版提供了云端 Broker 能力,适合需要稳定长连接、消息推送、设备接入、客户端实时通信的业务场景。

M2Mqtt 虽然是一个比较老的 .NET MQTT 客户端库,但对 .NET Framework 4.5 老项目非常友好。对于一些 WinForms、Windows 服务、传统桌面客户端项目来说,用它接入阿里云 MQTT 是完全可以实现的。

本文的关键点有三个:

第一,搞清楚 MQTT 的基本模型:

Publisher -> Broker -> Subscriber

第二,搞清楚阿里云 MQTT 的鉴权参数:

ClientId = GroupID@@@DeviceID
UserName = DeviceCredential|DeviceAccessKeyId|InstanceId
Password = Base64(HMACSHA1(ClientId, DeviceAccessKeySecret))

第三,WinForms 中接收 MQTT 消息后更新界面,一定要注意跨线程问题:

this.InvokeRequired

如果你的系统里有在线聊天、设备状态同步、客服坐席状态、呼叫中心事件推送、订单状态通知这类实时通信需求,MQTT 是一个很值得考虑的技术方案。

更多推荐