1、前言

        MQTTnet的C#版的开源MQTT通讯库,支持MQTT Server和Client,并提供各种类型的连接方法Demo。

       MQTTnet库3.1升级到4.0,并不完全兼容,在连接方式构建、事件订阅等方面需要修改。

2、MQTTnet 3.1 Client编程

using部分

using MQTTnet;
using MQTTnet.Client.Options;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Publishing;
using MQTTnet.Exceptions;

连接与事件订阅

MqttClient mqttClient;

    private async Task MqttClientStart()
    {
        try
        {
            var mqttFactory = new MqttFactory();

            var options = new MqttClientOptions
            {
                ClientId = "clientid_pascalming",
                ProtocolVersion = MQTTnet.Formatter.MqttProtocolVersion.V311,
                ChannelOptions = new MqttClientTcpOptions
                {
                    Server = "127.0.0.1",
                    Port = 1883
                },
            };
            if (options.ChannelOptions == null)
            {
                throw new InvalidOperationException();
            }

            options.Credentials = new MqttClientCredentials
            {
                Username = "mqttusername",
                Password = "mqttpassword"
            };
            options.CleanSession = false;
			//心跳检测时间,单位:秒
            options.KeepAlivePeriod = TimeSpan.FromSeconds(30);

            mqttClient = mqttFactory.CreateMqttClient() as MqttClient;
            mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnMqttClientConnected);
            mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnMqttClientDisConnected);
            mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnSubscriberMessageReceived);
            await mqttClient.ConnectAsync(options);
            Console.WriteLine($"客户端[{options.ClientId}]尝试连接...");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"客户端尝试连接出错.>{ex.Message}");
        }
    }
	private async Task ClientStop()
    {
        try
        {
            if (mqttClient == null) return;
            await mqttClient.DisconnectAsync();
            mqttClient = null;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"客户端尝试断开Server出错. {ex.Message}");
        }
    }
    private void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs obj)
    {
        Console.WriteLine($"OnSubscriberMessageReceived..");
    }

    private void OnMqttClientDisConnected(MqttClientDisconnectedEventArgs obj)
    {
        Console.WriteLine($"Mqtt Client DisConnected.");
    }

    private void OnMqttClientConnected(MqttClientConnectedEventArgs obj)
    {
        Console.WriteLine($"Mqtt Client Connected.");
    }

发送数据

       //质量:AtMostOnce(0,最多一次)/AtLeastOnce(1,最少一次)/ExactlyOnce(2,只一次)
		string payload = "hello,from pascalming!";
		Task<MqttClientPublishResult> task = mqttClient.PublishAsync("/topic/pascalming/v1",payload,MqttQualityOfServiceLevel.AtLeastOnce,true);
        task.Wait();

3、MQTTnet 4.0 Client编程

using部分

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;

连接与事件订阅

private IMqttClient mqttClient;
private void MqttConnectAsync()
{
    try
    {
        mqttIsConnected = false;

        var mqttFactory = new MqttFactory();

        //使用Build构建
        var mqttClientOptions = new MqttClientOptionsBuilder()
            .WithTcpServer("127.0.0.1", 1883)
            .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
            .WithClientId("clientid_pascalming")
            .WithCleanSession(false)
            .WithKeepAlivePeriod(TimeSpan.FromSeconds(30))
            .WithCredentials("user", "password")
            .Build();

        mqttClient = mqttFactory.CreateMqttClient();
        //与3.1对比,事件订阅名称和接口已经变化
        mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
        mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
        mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
        Task task = mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
        task.Wait();
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Mqtt客户端尝试连接出错:{ex.Message}");
    }
}
private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
    return Task.CompletedTask;
}

private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
    Console.WriteLine($"Mqtt客户端连接成功.");
    return Task.CompletedTask;
}

private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
    Console.WriteLine($"Mqtt客户端连接断开");
    return Task.CompletedTask;
}

发送数据

string payload = "hello,from pascalming!";
string mqttTopic="/topic/pascalming/v1";
var applicationMessage = new MqttApplicationMessageBuilder()
    .WithTopic(mqttTopic)
    .WithPayload(payload)
    .Build();
Task<MqttClientPublishResult> task = mqttClient.PublishAsync(applicationMessage);
task.Wait();

4、并发执行与短暂断开数据不丢失处理

       客户端上报数据,上面的示例使用的是等待执行模式,也可以改为异步执行提高并发度。数据的计数需要使用安全的Interlocked.Increment和Interlocked.Increment执行。

      数据上报后只有两种情况:执行成功或出错,这个需要等待异步执行结果的反馈。这是可以用计数器控制发送数据、执行成功和出错的计数,并控制这个差值的窗口大小。

      上报出错的数据,需要保存,然后再次重连后重新上报。

5、参考资料

MQTTnet的Demo示例 dotnet/MQTTnet · GitHub

基于MQTTnet 3.0.12实现MQTT服务器和客户端

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐