C# Mqtt物联网通讯,MqttNet使用
引入MqttNet包,在Nuget中搜索mqttnet。
·
引入MqttNet包,在Nuget中搜索mqttnet
服务端
创建mqttServer,服务端。
初始化Mqtt:
public virtual void InitMqttServer(string ip, int port)
{
var mqttServerOptions =
new MqttServerOptionsBuilder()
.WithDefaultEndpoint()
.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip))//set the ip of the server
.WithDefaultEndpointPort(port)//set the port of the server
.Build();
mqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions); // create MQTT service object
mqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync;
mqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync;
mqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync;
mqttServer.ClientSubscribedTopicAsync += MqttServer_ClientSubscribedTopicAsync;
mqttServer.ClientUnsubscribedTopicAsync += MqttServer_ClientUnsubscribedTopicAsync;
mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
mqttServer.ClientAcknowledgedPublishPacketAsync += MqttServer_ClientAcknowledgedPublishPacketAsync;
mqttServer.InterceptingClientEnqueueAsync += MqttServer_InterceptingClientEnqueueAsync;
mqttServer.ApplicationMessageNotConsumedAsync += MqttServer_ApplicationMessageNotConsumedAsync;
}
名称 | 描述 |
---|---|
ValidatingConnectionAsync | 对客户端的连接进行验证 |
ClientConnectedAsync | 客户端连接成功 |
ClientDisconnectedAsync | 客户端断开连接 |
ClientSubscribedTopicAsync | 客户端发布订阅 |
ClientUnsubscribedTopicAsync | 客户端取消订阅 |
InterceptingPublishAsync | 拦截客户端的消息 |
ClientAcknowledgedPublishPacketAsync | 已确认发布数据包 |
InterceptingClientEnqueueAsync | 拦截客户端排队 |
ApplicationMessageNotConsumedAsync | 应用程序消息未使用 |
对客户端的连接进行验证
/// <summary>
/// connection verification
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public virtual Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
try
{
//verify the validity of the client ID
if (string.IsNullOrWhiteSpace(arg.ClientId))
{
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return Task.CompletedTask;
}
//verify username and password
bool acceptflag = !(string.IsNullOrWhiteSpace(arg.UserName) || string.IsNullOrWhiteSpace(arg.Password));
//verify failed
if (!acceptflag)
{
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return Task.CompletedTask;
}
arg.ReasonCode = MqttConnectReasonCode.Success;
}
catch (Exception ex)
{
}
return Task.CompletedTask;
}
客户端连接成功
/// <summary>
/// connection success
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public virtual Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
try
{
MqttUsers.Add(new MqttUser() { ClientId = arg.ClientId, UserName = arg.UserName });
}
catch (Exception)
{
}
return Task.CompletedTask;
}
客户端断开连接
/// <summary>
/// disconnect
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public virtual Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
try
{
MqttUser? mqttUser = MqttUsers.FirstOrDefault(t => t.ClientId == arg.ClientId);
if (mqttUser != null)
{
MqttUsers.Remove(mqttUser);
}
}
catch (Exception)
{
}
return Task.CompletedTask;
}
客户端发布订阅
/// <summary>
/// subscribe
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public virtual Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
{
try
{
if (arg == null)
return Task.CompletedTask;
MqttUser? mqttUser = MqttUsers.FirstOrDefault(t => t.ClientId == arg.ClientId);
if (mqttUser != null)
{
mqttUser.MqttSubedTopics.Add(new MqttSubedTopic() { Parent = mqttUser, Topic = arg.TopicFilter.Topic });
}
}
catch (Exception)
{
}
return Task.CompletedTask;
}
客户端取消订阅
/// <summary>
/// unsubscribe
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public virtual Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
{
try
{
if (arg == null)
return Task.CompletedTask;
MqttUser? mqttUser = MqttUsers.FirstOrDefault(t => t.ClientId == arg.ClientId);
if (mqttUser != null)
{
MqttSubedTopic? mqttSubedTopic = mqttUser.MqttSubedTopics.FirstOrDefault(t => t.Topic == arg.TopicFilter);
if (mqttSubedTopic != null)
mqttUser.MqttSubedTopics.Remove(mqttSubedTopic);
}
}
catch (Exception)
{
}
return Task.CompletedTask;
}
客户端
初始化:
public virtual void InitMqttClient(string serverIp, int serverPort, string userName = "", string password = "")
{
try
{
string ClientId = Guid.NewGuid().ToString();
var options = new MqttClientOptionsBuilder()
.WithTcpServer(serverIp, serverPort) //服务器IP和端口
.WithClientId(ClientId) //客户端ID
.WithCredentials(userName, password).Build(); //账号
Client = new MqttFactory().CreateMqttClient();
Client.ConnectingAsync += Client_ConnectingAsync;
Client.ConnectedAsync += Client_ConnectedAsync;
Client.DisconnectedAsync += Client_DisconnectedAsync;
Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
Client.InspectPacketAsync += Client_InspectPacketAsync;
}
catch (Exception)
{
}
}
启动客户端
public virtual bool StartMqttClient(MqttClientOptions options)
{
try
{
if (Client == null)
return false;
connectCts = new();
connectCt = connectCts.Token;
Client.ConnectAsync(options, connectCt);
return Client.IsConnected;
}
catch (Exception)
{
}
return false;
}
关闭客户端
/// <summary>
/// 关闭客户端
/// </summary>
public virtual void StopMqttClient()
{
try
{
connectCts.Cancel();//取消连接
reconnectCts.Cancel();//取消重连
}
catch (Exception)
{
}
}
剩余代码
/// <summary>
/// 订阅主题
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
public virtual async Task SubscribeTopic(string topic)
{
try
{
//若已订阅则返回
if (TopicList.Contains(topic))
{
return;
}
if (Client is null || Client.IsConnected == false)
{
//未连接服务器;
return;
}
var result = await Client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).Build()); //订阅服务端消息
TopicList.Add(topic);
}
catch (Exception ex)
{
}
return;
}
/// <summary>
/// 取消订阅订阅主题
/// </summary>
/// <returns></returns>
public virtual async void UnsubscribeTopic(string topic)
{
try
{
//取消订阅主题
var result = await Client.UnsubscribeAsync(topic);
TopicList.Remove(topic);
}
catch (Exception ex)
{
}
}
/// <summary>
/// 发布消息
/// </summary>
/// <param name="sendMessage"></param>
/// <param name="isRetain"></param>
/// <param name="publishTopic"></param>
/// <param name="QosLevel"></param>
/// <returns></returns>
public virtual async Task<bool> PublishMessage(string sendMessage, bool isRetain,string publishTopic,MQTTnet.Protocol.MqttQualityOfServiceLevel QosLevel)
{
try
{
if (Client==null)
{
return false;
}
//根据选择的消息质量进行设置
var mqttAMB = new MqttApplicationMessageBuilder();
//根据设置的消息质量发布消息
switch (QosLevel)
{
case MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce:
mqttAMB.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
//mqttAMB.WithAtLeastOnceQoS();
break;
case MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce:
mqttAMB.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce);
//mqttAMB.WithAtMostOnceQoS();
break;
case MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce:
mqttAMB.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce);
//mqttAMB.WithExactlyOnceQoS();
break;
default:
break;
}
switch (mqttPayloadType)
{
case MqttPayloadType.Json:
case MqttPayloadType.Plaintext:
mqttAMB.WithPayload(sendMessage);
break;
case MqttPayloadType.Base64:
mqttAMB.WithPayload(Convert.ToBase64String(Encoding.Default.GetBytes(sendMessage)));
break;
//case MqttPayloadType.Json:
// mqttAMB.WithPayload(SendMessage.ToJsonString());
// break;
case MqttPayloadType.Hex:
mqttAMB.WithPayload(StringExtention.GetBytes(sendMessage.Replace(" ", string.Empty)));
break;
}
//保留消息
if (isRetain)
{
mqttAMB.WithRetainFlag();
}
else
{
mqttAMB.WithRetainFlag(false);
}
var mam = mqttAMB.WithTopic(publishTopic) //发布的主题
//.WithPayload(SendMessage)
//.WithExactlyOnceQoS()
.Build();
//发布
var result = await Client.PublishAsync(mam, CancellationToken.None);
return true;
}
catch (Exception ex)
{
}
return false;
}
/// <summary>
/// 数据包检查
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public virtual Task Client_InspectPacketAsync(MQTTnet.Diagnostics.InspectMqttPacketEventArgs arg)
{
return Task.CompletedTask;
}
/// <summary>
/// 接收事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public virtual Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
try
{
string str = string.Empty;
if (arg.ApplicationMessage.PayloadSegment.Array == null)
{
str = $"主题{arg.ApplicationMessage.Topic}接收的消息为空";
return Task.CompletedTask;
}
var payload = arg.ApplicationMessage.PayloadSegment.ToArray();
str = string.Empty;
switch (mqttPayloadType)
{
case MqttPayloadType.Json:
case MqttPayloadType.Plaintext:
str = $"{Encoding.UTF8.GetString(payload)}" + $"主题:{arg.ApplicationMessage.Topic}";
break;
//case MqttPayloadType.Json:
// ShowReceiveMessage($"{Encoding.UTF8.GetString(payload).ToJsonString()}", $"主题:{arg.ApplicationMessage.Topic}");
// break;
case MqttPayloadType.Hex:
str = $"{BitConverter.ToString(payload).Replace("-", "").InsertFormat(4, " ")}" + $"主题:{arg.ApplicationMessage.Topic}";
break;
case MqttPayloadType.Base64:
str = $"{Convert.ToBase64String(payload)}" + $"主题:{arg.ApplicationMessage.Topic}";
break;
}
}
catch (Exception)
{
}
return Task.CompletedTask;
}
/// <summary>
/// 重新连接
/// </summary>
public virtual bool ReConnect()
{
try
{
if (Client == null)
return false;
reconnectCts = new CancellationTokenSource();
reconnectCt = reconnectCts.Token;
Client.ReconnectAsync(reconnectCt);
return Client.IsConnected;
}
catch (Exception)
{
}
return false;
}
/// <summary>
/// 异常离线、主动断开
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public virtual Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
if (arg == null)
return Task.CompletedTask;
//异常导致的掉线
if (arg.Exception != null)
{
//被取消连接
if (arg.Exception is OperationCanceledException)
{
//已取消连接...
}
else if (arg.Exception is MqttCommunicationException)
{
string str = $"{arg.Exception.Message} {arg.Exception.InnerException?.Message}";
}
else
{
string str = $"{arg.Exception.Message} {arg.Exception.InnerException?.Message}";
}
}
//非异常导致离线
else
{
//已断开连接..
}
return Task.CompletedTask;
}
/// <summary>
/// 连接成功
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public virtual Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
return Task.CompletedTask;
}
/// <summary>
/// 连接中
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public virtual Task Client_ConnectingAsync(MqttClientConnectingEventArgs arg)
{
return Task.CompletedTask;
}
这里代码不是很全,本文主要是告诉你怎么去使用Mqtt和MqttNet库,具体的方法里面需要根据自己的业务场景去实现。
源码下载CSDN链接:https://download.csdn.net/download/qq_42707143/88602648
C#技术交流QQ群:371769310
更多推荐
已为社区贡献1条内容
所有评论(0)