代码地址: https://gitee.com/qq28069933146_admin/csharp_networkprotocol_research
演示地址: C#-MQTT调用示例演示
一,什么是MQTT:
MQTT(消息队列遥测传输)是IBM开发的即时通讯协议,是一个基于 客户端- 服务器的消息发布/订阅极其轻量级的消息传输协议。
它工作在 TCP/IP协议族上,专门为网络受限设备、低宽带以及高延迟和不可靠的网络而设计的(传输特点:至多一次、至少一次、只有一次)。使其在物联网、小型设备、移动应用等方面有较广泛的应用。
1.MQTT报文:
详情见:
2.MQTT三种身份:发布者(客户端)、代理(服务器)、订阅者(客户端)。
二,示例
可以选择MQTTnet包或者DotNetty.Codecs.Mqtt包进行学习,这里我们选用Mqtt进行学习。
VS2022
Net5
MQTTnet 4.1.4.563
1.服务端:
new MqttServerOptionsBuilder().Build());)
MqttServerOptionsBuilder mqttServerOptionsBuilder = new MqttServerOptionsBuilder(); // MQTT服务器配置
mqttServerOptionsBuilder.WithDefaultEndpoint();
mqttServerOptionsBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip)); // 设置Server的IP
mqttServerOptionsBuilder.WithDefaultEndpointPort(port); // 设置Server的端口号
//mqttServerOptionsBuilder.WithEncryptedEndpointPort(port); // 使用加密的端点端口
mqttServerOptionsBuilder.WithPersistentSessions(withPersistentSessions); // 持续会话
mqttServerOptionsBuilder.WithConnectionBacklog(2000); // 最大连接数
//mqttServerOptionsBuilder.WithConnectionValidator(c => // 鉴权-方法失效
//{
// if (c.Username != uName || c.Password != uPwd)
// {
// c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
// }
//})
MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build();
_MqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置)
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
补充:MqttServerOptionsBuilder的属性(注:有些方法4.1.4.563版本不支持了)
函数名 | 功能说明 |
Build() | 构建配置参数 |
WithApplicationMessageInterceptor() | 允许处理来自客户端的所有已发布消息 |
WithClientId() | 服务端发布消息时使用的ClientId |
WithConnectionBacklog() | 设置要保留的连接数 |
WithConnectionValidator() | 验证连接 |
WithDefaultCommunicationTimeout() | 设置默认的通信超时 |
WithDefaultEndpoint() | 使用默认端点 |
WithDefaultEndpointBoundIPAddress() | 使用默认端点IPv4地址 |
WithDefaultEndpointBoundIPV6Address() | 使用默认端点IPv6地址 |
WithDefaultEndpointPort() | 使用默认端点端口 |
WithEncryptedEndpoint() | 使用加密的端点 |
WithEncryptedEndpointBoundIPAddress() | 使用加密的端点IPv4地址 |
WithEncryptedEndpointBoundIPV6Address() | 使用加密的端点IPv6地址 |
WithEncryptedEndpointPort() | 使用加密的端点端口 |
WithEncryptionCertificate() | 使用证书进行SSL连接 |
WithEncryptionSslProtocol() | 使用SSL协议级别 |
WithMaxPendingMessagesPerClient() | 每个客户端允许最多未决消息 |
WithPersistentSessions() | 保持会话 |
WithStorage() | 使用存储 |
WithSubscriptionInterceptor() | 允许处理来自客户端的所有订阅 |
WithoutDefaultEndpoint() | 禁用默认端点 |
WithoutEncryptedEndpoint() | 禁用默认(SSL)端点 |
②服务器开启(await mqttServer.StartAsync())
③服务器关闭(await _MqttServer.StopAsync())
④是否对客户端的进行验证(账号密码-原方法失效)
⑤给客户端发送数据(原方法失效)
/// <summary>
/// 发送消息-未写(原方法失效)
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="msg">消息</param>
/// <returns></returns>
public Task SedMessage(string Topic, string msg)
{
try
{
//var clients = _MqttServer.GetClientsAsync().Result;
//foreach (var client in clients)
//{
//}
}
catch { }
return Task.CompletedTask;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
⑥获取所有的客户端(_MqttServer.GetClientsAsync().Result.ToList())
⑦服务器开启/关闭事件(StartedAsync与StoppedAsync)
_MqttServer.StartedAsync += StartedHandle; // 服务器开启事件
/// <summary>
/// 开启Server的处理程序
/// </summary>
private Task StartedHandle(EventArgs arg)
{
return Task.CompletedTask;
}
_MqttServer.StoppedAsync += StoppedHandle; // 服务器关闭事件
/// <summary>
/// 关闭Server的处理程序
/// </summary>
private Task StoppedHandle(EventArgs arg)
{
return Task.CompletedTask;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
⑧客户端连接/断开的处理事件
_MqttServer.ClientConnectedAsync += ClientConnectedHandle; // 设置客户端连接成功后的处理程序
/// <summary>
/// 设置客户端连接成功后的处理程序
/// </summary>
private Task ClientConnectedHandle(ClientConnectedEventArgs arg)
{
return Task.CompletedTask;
}
_MqttServer.ClientDisconnectedAsync += ClientDisconnectedHandle; // 设置客户端断开后的处理程序
/// <summary>
/// 设置客户端断开后的处理程序
/// </summary>
private Task ClientDisconnectedHandle(ClientDisconnectedEventArgs arg)
{
return Task.CompletedTask;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
⑨消息被订阅/被退订事件
_MqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandle; // 设置消息订阅通知
_MqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandle; // 设置消息退订通知
/// <summary>
/// 设置消息订阅通知
/// </summary>
private Task ClientSubscribedTopicHandle(ClientSubscribedTopicEventArgs arg)
{
//if (!arg.Equals("admin"))
//{
// var client = clients.Where(a => a.Id == arg.ClientId).FirstOrDefault();
// client?.DisconnectAsync();
// return Task.CompletedTask;
//}
return Task.CompletedTask;
}
/// <summary>
/// 设置消息退订通知
/// </summary>
private Task ClientUnsubscribedTopicHandle(ClientUnsubscribedTopicEventArgs arg)
{
return Task.CompletedTask;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
⑩接收到消息时的处理程序
_MqttServer.ApplicationMessageNotConsumedAsync += ApplicationMessageNotConsumedHandle; // 设置消息处理程序
/// <summary>
/// 设置消息处理程序
/// </summary>
private Task ApplicationMessageNotConsumedHandle(ApplicationMessageNotConsumedEventArgs arg)
{
_Callback?.Invoke(new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端:'{arg.SenderId}'发布了消息:主题:'{arg.ApplicationMessage.Topic}'!内容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';服务质量:{arg.ApplicationMessage.QualityOfServiceLevel};保留:{arg.ApplicationMessage.Retain}"
});
return Task.CompletedTask;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
2.客户端:
new MqttClientOptionsBuilder().Build()))
MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder();
mqttClientOptionsBuilder.WithTcpServer(mqttServerUrl, port); // 设置MQTT服务器地址
if (!string.IsNullOrEmpty(userName))
{
mqttClientOptionsBuilder.WithCredentials(userName, userPassword); // 设置鉴权参数
}
mqttClientOptionsBuilder.WithClientId(Guid.NewGuid().ToString("N")); // 设置客户端序列号
MqttClientOptions options = mqttClientOptionsBuilder.Build();
_MqttClient = new MqttFactory().CreateMqttClient();
_MqttClient.ConnectedAsync += ConnectedHandle; // 服务器连接事件
_MqttClient.DisconnectedAsync += DisconnectedHandle; // 服务器断开事件(可以写入重连事件)
_MqttClient.ApplicationMessageReceivedAsync += ApplicationMessageReceivedHandle; // 发送消息事件
await _MqttClient.ConnectAsync(options); // 连接
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
②与服务器断开连接(await _MqttClient.DisconnectAsync())
③与服务器重新连接(await _MqttClient.ReconnectAsync())
④订阅与退订主题()
⑤发布消息
MqttApplicationMessageBuilder mqttApplicationMessageBuilder = new MqttApplicationMessageBuilder(); // 设置内容
mqttApplicationMessageBuilder.WithTopic(topic); // 主题
mqttApplicationMessageBuilder.WithPayload(msg); // 信息
mqttApplicationMessageBuilder.WithRetainFlag(retained); // 保留
MqttApplicationMessage messageObj = mqttApplicationMessageBuilder.Build();
await _MqttClient.PublishAsync(messageObj, CancellationToken.None); // 发送
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
⑥与服务器连接/断开事件
_MqttClient.ConnectedAsync += ConnectedHandle; // 服务器连接事件
_MqttClient.DisconnectedAsync += DisconnectedHandle; // 服务器断开事件(可以写入重连事件)
/// <summary>
/// 服务器连接事件
/// </summary>
private Task ConnectedHandle(MqttClientConnectedEventArgs arg)
{
return Task.CompletedTask;
}
/// <summary>
/// 服务器断开事件(可以写入重连事件)
/// </summary>
private Task DisconnectedHandle(MqttClientDisconnectedEventArgs arg)
{
return Task.CompletedTask;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
⑦订阅/退订事件
⑧发送消息事件
_MqttClient.ApplicationMessageReceivedAsync += ApplicationMessageReceivedHandle; // 发送消息事件
/// <summary>
/// 发送消息事件
/// </summary>
private Task ApplicationMessageReceivedHandle(MqttApplicationMessageReceivedEventArgs arg)
{
string resultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient'{arg.ClientId}'内容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';主题:'{arg.ApplicationMessage.Topic}',消息等级Qos:[{arg.ApplicationMessage.QualityOfServiceLevel}],是否保留:[{arg.ApplicationMessage.Retain}]",
return Task.CompletedTask;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
三、MQTTHelper
/**
*┌──────────────────────────────────────────────────────────────┐
*│ 描 述:MQTT通讯相关的工具类(MQTTnet 4.1.4.563)
*│ 作 者:执笔小白
*│ 版 本:1.0
*│ 创建时间:2023-3-18 10:40:56
*└──────────────────────────────────────────────────────────────┘
*┌──────────────────────────────────────────────────────────────┐
*│ 命名空间: MqttnetServerWin
*│ 类 名:MQTTHelper
*└──────────────────────────────────────────────────────────────┘
*/
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace MqttnetServerWin
{
/// <summary>
/// MQTT通讯相关的工具类
/// </summary>
public class MQTTHelper
{
#region 变量
/// <summary>
/// 记录日志、输出、保存等操作
/// </summary>
private Action<ResultData_MQTT>? _Callback = null;
#endregion 变量
#region Server
/// <summary>
/// MQTT服务
/// </summary>
MqttServer _MqttServer = null;
/// <summary>
/// 创建MQTTServer并运行
/// </summary>
public async Task<ResultData_MQTT> CreateMQTTServerAndStart(MqttServerOptionsBuilder mqttServerOptionsBuilder, Action<ResultData_MQTT>? callback)
{
ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
_Callback = callback;
try
{
MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build();
_MqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置)
_MqttServer.StartedAsync += StartedHandle; // 服务器开启事件
_MqttServer.StoppedAsync += StoppedHandle; // 服务器关闭事件
_MqttServer.ClientConnectedAsync += ClientConnectedHandle; // 设置客户端连接成功后的处理程序
_MqttServer.ClientDisconnectedAsync += ClientDisconnectedHandle; // 设置客户端断开后的处理程序
_MqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandle; // 设置消息订阅通知
_MqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandle; // 设置消息退订通知
_MqttServer.ApplicationMessageNotConsumedAsync += ApplicationMessageNotConsumedHandle; // 设置消息处理程序
await _MqttServer.StartAsync(); // 开启服务
if (_MqttServer.IsStarted)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_成功!"
};
}
else
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!错误信息:" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
return resultData_MQTT;
}
/// <summary>
/// 简易创建MQTTServer并运行-不使用加密
/// </summary>
/// <param name="ip">IP</param>
/// <param name="port">端口</param>
/// <param name="withPersistentSessions">是否保持会话</param>
/// <param name="callback">处理方法</param>
/// <returns></returns>
public async Task<ResultData_MQTT> CreateMQTTServerAndStart(string ip, int port, bool withPersistentSessions, Action<ResultData_MQTT>? callback)
{
ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
_Callback = callback;
try
{
MqttServerOptionsBuilder mqttServerOptionsBuilder = new MqttServerOptionsBuilder(); // MQTT服务器配置
mqttServerOptionsBuilder.WithDefaultEndpoint();
mqttServerOptionsBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip)); // 设置Server的IP
mqttServerOptionsBuilder.WithDefaultEndpointPort(port); // 设置Server的端口号
//mqttServerOptionsBuilder.WithEncryptedEndpointPort(port); // 使用加密的端点端口
mqttServerOptionsBuilder.WithPersistentSessions(withPersistentSessions); // 持续会话
mqttServerOptionsBuilder.WithConnectionBacklog(2000); // 最大连接数
//mqttServerOptionsBuilder.WithConnectionValidator(c => // 鉴权-方法失效
//{
// if (c.Username != uName || c.Password != uPwd)
// {
// c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
// }
//})
MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build();
_MqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions); // 创建服务(配置)
_MqttServer.StartedAsync += StartedHandle; // 服务器开启事件
_MqttServer.StoppedAsync += StoppedHandle; // 服务器关闭事件
_MqttServer.ClientConnectedAsync += ClientConnectedHandle; // 设置客户端连接成功后的处理程序
_MqttServer.ClientDisconnectedAsync += ClientDisconnectedHandle; // 设置客户端断开后的处理程序
_MqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandle; // 设置消息订阅通知
_MqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandle; // 设置消息退订通知
_MqttServer.ValidatingConnectionAsync += ValidatingConnectionHandle; // 鉴权-未完
_MqttServer.ApplicationMessageNotConsumedAsync += ApplicationMessageNotConsumedHandle; // 设置消息处理程序
await _MqttServer.StartAsync(); // 开启服务
if (_MqttServer.IsStarted)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_成功!"
};
}
else
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTServer_失败!错误信息:" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
return resultData_MQTT;
}
/// <summary>
/// 关闭MQTTServer
/// </summary>
public async Task<ResultData_MQTT> StopMQTTServer()
{
ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
try
{
if (_MqttServer == null)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_出错!MQTTServer未在运行。"
};
}
else
{
foreach (var clientStatus in _MqttServer.GetClientsAsync().Result)
{
await clientStatus.DisconnectAsync();
}
await _MqttServer.StopAsync();
_MqttServer = null;
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_成功!"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTServer_失败!错误信息:" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
return resultData_MQTT;
}
/// <summary>
/// 获取所有的客户端
/// </summary>
public List<MqttClientStatus> GetClientsAsync()
{
return _MqttServer.GetClientsAsync().Result.ToList();
}
/// <summary>
/// 发送消息-未写
/// </summary>
/// <param name="Topic">主题</param>
/// <param name="msg">消息</param>
/// <returns></returns>
public Task SedMessage(string Topic, string msg)
{
try
{
//var clients = _MqttServer.GetClientsAsync().Result;
//foreach (var client in clients)
//{
//}
}
catch { }
return Task.CompletedTask;
}
#region 处理事件
/// <summary>
/// 开启Server的处理程序
/// </summary>
private Task StartedHandle(EventArgs arg)
{
_Callback?.Invoke(new()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已开启!"
});
return Task.CompletedTask;
}
/// <summary>
/// 关闭Server的处理程序
/// </summary>
private Task StoppedHandle(EventArgs arg)
{
_Callback?.Invoke(new()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已关闭!"
});
return Task.CompletedTask;
}
/// <summary>
/// 设置客户端连接成功后的处理程序
/// </summary>
private Task ClientConnectedHandle(ClientConnectedEventArgs arg)
{
var clients = _MqttServer.GetClientsAsync().Result;
_Callback?.Invoke(new()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'已成功连接!当前客户端连接数:{clients?.Count}个。"
});
return Task.CompletedTask;
}
/// <summary>
/// 设置客户端断开后的处理程序
/// </summary>
private Task ClientDisconnectedHandle(ClientDisconnectedEventArgs arg)
{
var clients = _MqttServer.GetClientsAsync().Result;
_Callback?.Invoke(new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'已断开连接!当前客户端连接数:{clients?.Count}个。"
});
return Task.CompletedTask;
}
/// <summary>
/// 设置消息订阅通知
/// </summary>
private Task ClientSubscribedTopicHandle(ClientSubscribedTopicEventArgs arg)
{
//if (!arg.Equals("admin"))
//{
// var client = clients.Where(a => a.Id == arg.ClientId).FirstOrDefault();
// client?.DisconnectAsync();
// return Task.CompletedTask;
//}
_Callback?.Invoke(new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端'{arg.ClientId}'订阅了主题'{arg.TopicFilter.Topic}',主题服务质量:'{arg.TopicFilter.QualityOfServiceLevel}'!"
});
return Task.CompletedTask;
}
/// <summary>
/// 设置消息退订通知
/// </summary>
private Task ClientUnsubscribedTopicHandle(ClientUnsubscribedTopicEventArgs arg)
{
_Callback?.Invoke(new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端{arg.ClientId}退订了主题{arg.TopicFilter}!"
});
return Task.CompletedTask;
}
/// <summary>
/// 鉴权-未写完
/// </summary>
/// <returns></returns>
private Task ValidatingConnectionHandle(ValidatingConnectionEventArgs arg) // 鉴权
{
if (arg.UserName != "Admin" || arg.Password != "Admin123")
{
}
return Task.CompletedTask;
}
/// <summary>
/// 设置消息处理程序
/// </summary>
private Task ApplicationMessageNotConsumedHandle(ApplicationMessageNotConsumedEventArgs arg)
{
_Callback?.Invoke(new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客户端:'{arg.SenderId}'发布了消息:主题:'{arg.ApplicationMessage.Topic}'!内容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';服务质量:{arg.ApplicationMessage.QualityOfServiceLevel};保留:{arg.ApplicationMessage.Retain}"
});
return Task.CompletedTask;
}
#endregion 处理事件
#endregion Server
#region Client
/// <summary>
/// 客户端
/// </summary>
IMqttClient _MqttClient = null;
/// <summary>
/// 创建MQTTClient并运行
/// </summary>
/// <param name="mqttClientOptionsBuilder">MQTTClient连接配置</param>
/// <param name="callback">信息处理逻辑</param>
/// <returns></returns>
public async Task<ResultData_MQTT> CreateMQTTClientAndStart(MqttClientOptionsBuilder mqttClientOptionsBuilder, Action<ResultData_MQTT>? callback)
{
ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
_Callback = callback;
try
{
MqttClientOptions options = mqttClientOptionsBuilder.Build();
_MqttClient = new MqttFactory().CreateMqttClient();
_MqttClient.ConnectedAsync += ConnectedHandle; // 服务器连接事件
_MqttClient.DisconnectedAsync += DisconnectedHandle; // 服务器断开事件(可以写入重连事件)
_MqttClient.ApplicationMessageReceivedAsync += ApplicationMessageReceivedHandle; // 发送消息事件
await _MqttClient.ConnectAsync(options); // 连接
if (_MqttClient.IsConnected)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_成功!"
};
}
else
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_失败!"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_失败!错误信息:" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
return resultData_MQTT;
}
/// <summary>
/// 简易创建MQTTClient并运行
/// </summary>
/// <param name="mqttServerUrl">mqttServer的Url</param>
/// <param name="port">mqttServer的端口</param>
/// <param name="userName">认证用用户名</param>
/// <param name="userPassword">认证用密码</param>
/// <param name="callback">信息处理逻辑</param>
/// <returns></returns>
public async Task<ResultData_MQTT> CreateMQTTClientAndStart(string mqttServerUrl, int port, string userName, string userPassword, Action<ResultData_MQTT>? callback)
{
ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
_Callback = callback;
try
{
MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder();
mqttClientOptionsBuilder.WithTcpServer(mqttServerUrl, port); // 设置MQTT服务器地址
if (!string.IsNullOrEmpty(userName))
{
mqttClientOptionsBuilder.WithCredentials(userName, userPassword); // 设置鉴权参数
}
mqttClientOptionsBuilder.WithClientId(Guid.NewGuid().ToString("N")); // 设置客户端序列号
MqttClientOptions options = mqttClientOptionsBuilder.Build();
_MqttClient = new MqttFactory().CreateMqttClient();
_MqttClient.ConnectedAsync += ConnectedHandle; // 服务器连接事件
_MqttClient.DisconnectedAsync += DisconnectedHandle; // 服务器断开事件(可以写入重连事件)
_MqttClient.ApplicationMessageReceivedAsync += ApplicationMessageReceivedHandle; // 发送消息事件
await _MqttClient.ConnectAsync(options); // 连接
if (_MqttClient.IsConnected)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_成功!"
};
}
else
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_失败!"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了开启MQTTClient_失败!错误信息:" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
return resultData_MQTT;
}
/// <summary>
/// 关闭MQTTClient
/// </summary>
public async Task DisconnectAsync_Client()
{
ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
try
{
if (_MqttClient != null && _MqttClient.IsConnected)
{
await _MqttClient.DisconnectAsync();
_MqttClient.Dispose();
_MqttClient = null;
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTClient_成功!"
};
}
else
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTClient_失败!MQTTClient未开启连接!"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了关闭MQTTClient_失败!错误信息:" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
}
/// <summary>
/// 重连
/// </summary>
/// <returns></returns>
public async Task ReconnectAsync_Client()
{
ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
try
{
if (_MqttClient != null)
{
await _MqttClient.ReconnectAsync();
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了MQTTClient重连_成功!"
};
}
else
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了MQTTClient重连_失败!未设置MQTTClient连接!"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了MQTTClient重连_失败!错误信息:" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
}
/// <summary>
/// 订阅
/// </summary>
/// <param name="topic">主题</param>
public async void SubscribeAsync_Client(string topic)
{
ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
try
{
MqttTopicFilter topicFilter = new MqttTopicFilterBuilder().WithTopic(topic).Build();
await _MqttClient.SubscribeAsync(topicFilter, CancellationToken.None);
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient执行了订阅'{topic}'_成功!"
};
}
catch (Exception ex)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient执行了订阅'{topic}'_失败!错误信息:" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
}
/// <summary>
/// 退订阅
/// </summary>
/// <param name="topic">主题</param>
public async void UnsubscribeAsync_Client(string topic)
{
ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
try
{
await MqttClientExtensions.UnsubscribeAsync(_MqttClient, topic, CancellationToken.None);
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient执行了退订'{topic}'_成功!"
};
}
catch (Exception ex)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient执行退订'{topic}'_失败!错误信息:" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
}
/// <summary>
/// 发布消息( 必须在成功连接以后才生效 )
/// </summary>
/// <param name="topic">主题</param>
/// <param name="msg">信息</param>
/// <param name="retained">是否保留</param>
/// <returns></returns>
public async Task PublishAsync_Client(string topic, string msg, bool retained)
{
ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
try
{
MqttApplicationMessageBuilder mqttApplicationMessageBuilder = new MqttApplicationMessageBuilder();
mqttApplicationMessageBuilder.WithTopic(topic); // 主题
mqttApplicationMessageBuilder.WithPayload(msg); // 信息
mqttApplicationMessageBuilder.WithRetainFlag(retained); // 保留
MqttApplicationMessage messageObj = mqttApplicationMessageBuilder.Build();
if (_MqttClient.IsConnected)
{
await _MqttClient.PublishAsync(messageObj, CancellationToken.None);
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>执行了发布信息_成功!主题:'{topic}',信息:'{msg}',是否保留:'{retained}'"
};
}
else
{
// 未连接
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了发布信息_失败!MQTTClient未开启连接!"
};
}
}
catch (Exception ex)
{
resultData_MQTT = new ResultData_MQTT()
{
ResultCode = -1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>执行了发布信息_失败!错误信息:" + ex.Message
};
}
_Callback?.Invoke(resultData_MQTT);
}
#region 事件
/// <summary>
/// 服务器连接事件
/// </summary>
private Task ConnectedHandle(MqttClientConnectedEventArgs arg)
{
_Callback?.Invoke(new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>已连接到MQTT服务器!"
});
return Task.CompletedTask;
}
/// <summary>
/// 服务器断开事件(可以写入重连事件)
/// </summary>
private Task DisconnectedHandle(MqttClientDisconnectedEventArgs arg)
{
_Callback?.Invoke(new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>已断开与MQTT服务器连接!"
});
return Task.CompletedTask;
}
/// <summary>
/// 发送消息事件
/// </summary>
private Task ApplicationMessageReceivedHandle(MqttApplicationMessageReceivedEventArgs arg)
{
_Callback?.Invoke(new ResultData_MQTT()
{
ResultCode = 1,
ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient'{arg.ClientId}'内容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';主题:'{arg.ApplicationMessage.Topic}',消息等级Qos:[{arg.ApplicationMessage.QualityOfServiceLevel}],是否保留:[{arg.ApplicationMessage.Retain}]",
ResultObject1 = arg.ApplicationMessage.Topic,
ResultObject2 = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)
});
return Task.CompletedTask;
}
#endregion 事件
#endregion Client
}
/// <summary>
/// 信息载体
/// </summary>
public class ResultData_MQTT
{
/// <summary>
/// 结果Code
/// 正常1,其他为异常;0不作为回复结果
/// </summary>
public int ResultCode { get; set; } = 0;
/// <summary>
/// 结果信息
/// </summary>
public string ResultMsg { get; set; } = string.Empty;
/// <summary>
/// 扩展1
/// </summary>
public object? ResultObject1 { get; set; } = string.Empty;
/// <summary>
/// 扩展2
/// </summary>
public object? ResultObject2 { get; set; } = string.Empty;
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
- 152.
- 153.
- 154.
- 155.
- 156.
- 157.
- 158.
- 159.
- 160.
- 161.
- 162.
- 163.
- 164.
- 165.
- 166.
- 167.
- 168.
- 169.
- 170.
- 171.
- 172.
- 173.
- 174.
- 175.
- 176.
- 177.
- 178.
- 179.
- 180.
- 181.
- 182.
- 183.
- 184.
- 185.
- 186.
- 187.
- 188.
- 189.
- 190.
- 191.
- 192.
- 193.
- 194.
- 195.
- 196.
- 197.
- 198.
- 199.
- 200.
- 201.
- 202.
- 203.
- 204.
- 205.
- 206.
- 207.
- 208.
- 209.
- 210.
- 211.
- 212.
- 213.
- 214.
- 215.
- 216.
- 217.
- 218.
- 219.
- 220.
- 221.
- 222.
- 223.
- 224.
- 225.
- 226.
- 227.
- 228.
- 229.
- 230.
- 231.
- 232.
- 233.
- 234.
- 235.
- 236.
- 237.
- 238.
- 239.
- 240.
- 241.
- 242.
- 243.
- 244.
- 245.
- 246.
- 247.
- 248.
- 249.
- 250.
- 251.
- 252.
- 253.
- 254.
- 255.
- 256.
- 257.
- 258.
- 259.
- 260.
- 261.
- 262.
- 263.
- 264.
- 265.
- 266.
- 267.
- 268.
- 269.
- 270.
- 271.
- 272.
- 273.
- 274.
- 275.
- 276.
- 277.
- 278.
- 279.
- 280.
- 281.
- 282.
- 283.
- 284.
- 285.
- 286.
- 287.
- 288.
- 289.
- 290.
- 291.
- 292.
- 293.
- 294.
- 295.
- 296.
- 297.
- 298.
- 299.
- 300.
- 301.
- 302.
- 303.
- 304.
- 305.
- 306.
- 307.
- 308.
- 309.
- 310.
- 311.
- 312.
- 313.
- 314.
- 315.
- 316.
- 317.
- 318.
- 319.
- 320.
- 321.
- 322.
- 323.
- 324.
- 325.
- 326.
- 327.
- 328.
- 329.
- 330.
- 331.
- 332.
- 333.
- 334.
- 335.
- 336.
- 337.
- 338.
- 339.
- 340.
- 341.
- 342.
- 343.
- 344.
- 345.
- 346.
- 347.
- 348.
- 349.
- 350.
- 351.
- 352.
- 353.
- 354.
- 355.
- 356.
- 357.
- 358.
- 359.
- 360.
- 361.
- 362.
- 363.
- 364.
- 365.
- 366.
- 367.
- 368.
- 369.
- 370.
- 371.
- 372.
- 373.
- 374.
- 375.
- 376.
- 377.
- 378.
- 379.
- 380.
- 381.
- 382.
- 383.
- 384.
- 385.
- 386.
- 387.
- 388.
- 389.
- 390.
- 391.
- 392.
- 393.
- 394.
- 395.
- 396.
- 397.
- 398.
- 399.
- 400.
- 401.
- 402.
- 403.
- 404.
- 405.
- 406.
- 407.
- 408.
- 409.
- 410.
- 411.
- 412.
- 413.
- 414.
- 415.
- 416.
- 417.
- 418.
- 419.
- 420.
- 421.
- 422.
- 423.
- 424.
- 425.
- 426.
- 427.
- 428.
- 429.
- 430.
- 431.
- 432.
- 433.
- 434.
- 435.
- 436.
- 437.
- 438.
- 439.
- 440.
- 441.
- 442.
- 443.
- 444.
- 445.
- 446.
- 447.
- 448.
- 449.
- 450.
- 451.
- 452.
- 453.
- 454.
- 455.
- 456.
- 457.
- 458.
- 459.
- 460.
- 461.
- 462.
- 463.
- 464.
- 465.
- 466.
- 467.
- 468.
- 469.
- 470.
- 471.
- 472.
- 473.
- 474.
- 475.
- 476.
- 477.
- 478.
- 479.
- 480.
- 481.
- 482.
- 483.
- 484.
- 485.
- 486.
- 487.
- 488.
- 489.
- 490.
- 491.
- 492.
- 493.
- 494.
- 495.
- 496.
- 497.
- 498.
- 499.
- 500.
- 501.
- 502.
- 503.
- 504.
- 505.
- 506.
- 507.
- 508.
- 509.
- 510.
- 511.
- 512.
- 513.
- 514.
- 515.
- 516.
- 517.
- 518.
- 519.
- 520.
- 521.
- 522.
- 523.
- 524.
- 525.
- 526.
- 527.
- 528.
- 529.
- 530.
- 531.
- 532.
- 533.
- 534.
- 535.
- 536.
- 537.
- 538.
- 539.
- 540.
- 541.
- 542.
- 543.
- 544.
- 545.
- 546.
- 547.
- 548.
- 549.
- 550.
- 551.
- 552.
- 553.
- 554.
- 555.
- 556.
- 557.
- 558.
- 559.
- 560.
- 561.
- 562.
- 563.
- 564.
- 565.
- 566.
- 567.
- 568.
- 569.
- 570.
- 571.
- 572.
- 573.
- 574.
- 575.
- 576.
- 577.
- 578.
- 579.
- 580.
- 581.
- 582.
- 583.
- 584.
- 585.
- 586.
- 587.
- 588.
- 589.
- 590.
- 591.
- 592.
- 593.
- 594.
- 595.
- 596.
- 597.
- 598.
- 599.
- 600.
- 601.
- 602.
- 603.
- 604.
- 605.
- 606.
- 607.
- 608.
- 609.
- 610.
- 611.
- 612.
- 613.
- 614.
- 615.
- 616.
- 617.
- 618.
- 619.
- 620.
- 621.
- 622.
- 623.
- 624.
- 625.
- 626.
- 627.
- 628.
- 629.
- 630.
- 631.
- 632.
- 633.
- 634.
- 635.
- 636.
- 637.
- 638.
- 639.
- 640.
- 641.
- 642.
- 643.
- 644.
- 645.
- 646.
- 647.
- 648.
- 649.
- 650.
- 651.
- 652.
- 653.
- 654.
- 655.
- 656.
- 657.
- 658.
- 659.
- 660.
- 661.
- 662.
- 663.
- 664.
- 665.
- 666.
- 667.
- 668.
- 669.
- 670.
- 671.
- 672.
- 673.
- 674.
- 675.
- 676.
- 677.
- 678.
- 679.
- 680.
- 681.
- 682.
- 683.
- 684.
- 685.
- 686.
- 687.
- 688.
- 689.
- 690.
- 691.
- 692.
- 693.
- 694.
- 695.
- 696.
- 697.
- 698.
- 699.
- 700.
- 701.
- 702.
- 703.
- 704.
- 705.
- 706.
- 707.
- 708.
- 709.
- 710.
- 711.
- 712.
- 713.
- 714.
- 715.
- 716.
- 717.
- 718.
- 719.
- 720.
- 721.
- 722.
- 723.
- 724.
- 725.
- 726.
- 727.
- 728.
- 729.
- 730.
- 731.
- 732.
- 733.
- 734.
- 735.
- 736.
- 737.
- 738.
- 739.
- 740.
- 741.
四、分布式MQTT消息服务器推荐
EMQ X(简称 EMQ):是一款完全 开源,高度可伸缩,高可用的分布式 MQTT 消息服务器,同时也支持 CoAP/LwM2M 一站式 IoT 协议接入。可处理千万级别的并发客户端。
参考:
https://www.shangmayuan.com/a/67b2f4a9f2c440e9b2db3a97.html
https://zhuanlan.zhihu.com/p/419561816
https://www.jianshu.com/p/a371c6ac076b
作者:꧁执笔小白꧂
所有评论(0)