C#中使用MQTTnet库实现MQTT通信

在C#中使用MQTTnet库实现MQTT通信的完整流程如下,涵盖客户端创建、连接、订阅/发布消息及错误处理:

一、环境准备

1. 安装MQTTnet库

通过NuGet安装:

bash

dotnet add package MQTTnet

2. 创建控制台应用

示例代码基于控制台应用演示。

二、客户端创建与连接

using MQTTnet;

using MQTTnet.Client;

using MQTTnet.Client.Options;

// 创建客户端工厂

var factory = new MqttFactory();

var client = factory.CreateMqttClient();

// 配置连接参数

var options = new MqttClientOptionsBuilder()

.WithTcpServer(“broker.hivemq.com”, 1883) // 默认公共代理

.WithClientId(“CSharpClient_001”) // 客户端ID

.WithCleanSession() // 断开后清除会话

.Build();

// 异步连接

await client.ConnectAsync(options);

三、订阅消息

// 订阅主题(示例:test/topic)

await client.SubscribeAsync(new[] {

new MqttTopicFilterBuilder()

.WithTopic(“test/topic”)

.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)

.Build()

});

// 处理消息到达事件

client.UseApplicationMessageReceivedHandler(e => {

Console.WriteLine($”Received: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}”);

});

四、发布消息

// 发布消息到主题

var message = new MqttApplicationMessageBuilder()

.WithTopic(“test/topic”)

.WithPayload(“Hello MQTTnet!”)

.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)

.WithRetainFlag(true) // 保留消息

.Build();

await client.PublishAsync(message);

五、错误处理与断开连接

try {

// 主循环保持连接

while (true) {

await Task.Delay(Timeout.InfiniteTimeSpan);

}

} catch (Exception ex) {

Console.WriteLine($”Error: {ex.Message}”);

} finally {

// 异步断开连接

await client.DisconnectAsync();

client.Dispose();

}

六、完整代码示例

using System.Text;

using MQTTnet;

using MQTTnet.Client;

using MQTTnet.Client.Options;

class Program {

static async Task Main(string[] args) {

var factory = new MqttFactory();

var client = factory.CreateMqttClient();

var options = new MqttClientOptionsBuilder()

.WithTcpServer(“broker.hivemq.com”, 1883)

.WithClientId(“CSharpClient_001”)

.Build();

await client.ConnectAsync(options);

// 订阅

await client.SubscribeAsync(new[]

{ new MqttTopicFilterBuilder().WithTopic(“test/#”).Build() });

client.UseApplicationMessageReceivedHandler(e =>

Console.WriteLine($”Topic: {e.ApplicationMessage.Topic}, Payload: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}”));

// 发布

await client.PublishAsync(new MqttApplicationMessageBuilder()

.WithTopic(“test/topic”)

.WithPayload(“Hello World!”)

.Build());

await Task.Delay(10000);

await client.DisconnectAsync();

}

}

七、关键配置说明

1. QoS级别

– AtMostOnce(0):可能丢失,性能最优

– AtLeastOnce(1):确保到达,可能重复

– ExactlyOnce(2):严格一次,开销最大

2. 保留消息

设置WithRetainFlag(true)后,代理会保留最后一条消息供新订阅者获取。

3. 心跳机制

通过WithKeepAlivePeriod()设置心跳间隔,默认30秒。

八、高级功能扩展

– TLS加密:使用.WithTls()配置SSL/TLS连接

– 认证:通过.WithCredentials(username, password)添加凭据

– Last Will:设置遗嘱消息(客户端异常断开时触发)

> 提示:实际应用中提议使用异步方法(如ConnectAsync)避免阻塞主线程。更多配置可参考。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 共1条

请登录后发表评论