开发指南:使用 MQTTNet 库构建 .Net 物联网 MQTT 应用程序
云计算中的无服务器 (Serverless) 架构可让开发人员专注于代码开发和部署,而无需进行繁琐的基础设施管理。EMQ 的云服务中包含了一款 Serverless MQTT 消息服务器产品,它可根据需求自动扩展,减少额外的人工干预。
要了解有关无服务器 MQTT 的更多信息,请阅读我们的博文《EMQX Cloud Serverless:下一代 MQTT 云服务》。在本系列博客中,我们将指导您使用常见的客户端库,通过 EMQ Serverless MQTT 消息服务器建立 MQTT 连接、订阅、消息传递等。
背景介绍
随着物联网的兴起,.Net 框架在构建物联网应用程序方面变得越来越流行。微软的 .Net Core 和 .Net 框架为开发人员提供了一组工具和库,以构建可以在 Raspberry Pi、HummingBoard、BeagleBoard、Pine A64 等平台上运行的物联网应用程序。
MQTTnet 是一个实现 MQTT 协议的高性能 .Net 库,在 GitHub 上开源,具有丰富的功能,支持 MQTT 5.0 协议和 TLS/SSL。
这篇博文演示了如何使用 MQTTnet 库连接到 EMQX Serverless MQTT 消息服务器。相关代码可以在 MQTT 客户端示例 中下载。
免费的 Serverless MQTT 消息服务器
EMQX Serverless MQTT 消息服务器是公有云上最新的 MQTT 消息服务器产品,具有所有 Serverless 的优势。只需点击几下,您就可以在几秒钟内开始无服务器部署。此外,用户每月可以获得 100 万的免费会话分钟,足以让 23 台设备在线整整一个月,非常适合小型物联网测试场景。
如果您还没有尝试过无服务器部署,请按照本博客中的指南免费创建一个。一旦您使用在线指南完成注册过程,您将从部署中的 Overview 页面获得一个具有类似以下所示的运行实例。我们稍后将使用连接信息和 CA证书。
代码演示
1. 安装 .NET 和 Visual Studio
如果您尚未在计算机上安装 .NET 环境,可以访问 Microsoft 官方文档以获详细说明。
Visual Studio 是面向 .NET 开发人员的综合 IDE,为开发、调试和部署 .NET 应用程序提供功能丰富的开发环境。您可以根据计算机的系统和版本在此处下载 .Net 安装介质。
2. 安装 MQTTnet 包
MQTTnet 通过 NuGet 包管理器交付。要安装它,请创建控制台应用程序并使用 NuGet 安装 MQTTnet 包。有关在 Visual Studio 中使用 NuGet 的详细说明,请参阅官方文档。如果您使用的是 Visual Studio for Mac,请参阅在 Visual Studio for Mac 中安装和管理 NuGet 包。
3. 设置连接
MqttClientOptionsBuilderMqttClientOptionsBuilder
string broker = "******.emqxsl.com";
int port = 8883;
string clientId = Guid.NewGuid().ToString();
string topic = "Csharp/mqtt";
string username = "emqxtest";
string password = "******";
// Create a MQTT client factory
var factory = new MqttFactory();
// Create a MQTT client instance
var mqttClient = factory.CreateMqttClient();
// Create MQTT client options
var options = new MqttClientOptionsBuilder()
.WithTcpServer(broker, port) // MQTT broker address and port
.WithCredentials(username, password) // Set username and password
.WithClientId(clientId)
.WithCleanSession()
.Build();
请将连接参数替换为您的 EMQX 连接信息和登录凭据。
Guid. NewGuid()
4. 使用 TLS/SSL
连接到 EMQX Serverless 时,需要注意的是,它依赖于多租户架构,该架构使多个用户能够共享单个 EMQX 集群,为了保证这种多租户环境内数据传输的安全性和可靠性,需要 TLS,并且如果服务器使用的是自签名证书,则必须从部署概览面板下载相应的 CA 文件,并在连接建立过程中提供。
MqttClientOptionsBuilderWithTls()MqttClientOptionsBuilder
string broker = "******.emqxsl.com";
int port = 8883;
string clientId = Guid.NewGuid().ToString();
string topic = "Csharp/mqtt";
string username = "emqxtest";
string password = "******";
// Create a MQTT client factory
var factory = new MqttFactory();
// Create a MQTT client instance
var mqttClient = factory.CreateMqttClient();
// Create MQTT client options
var options = new MqttClientOptionsBuilder()
.WithTcpServer(broker, port) // MQTT broker address and port
.WithCredentials(username, password) // Set username and password
.WithClientId(clientId)
.WithCleanSession()
.WithTls(
o =>
{
// The used public broker sometimes has invalid certificates. This sample accepts all
// certificates. This should not be used in live environments.
o.CertificateValidationHandler = _ => true;
// The default value is determined by the OS. Set manually to force version.
o.SslProtocol = SslProtocols.Tls12; ;
// Please provide the file path of your certificate file. The current directory is /bin.
var certificate = new X509Certificate("/opt/emqxsl-ca.crt", "");
o.Certificates = new List<X509Certificate> { certificate };
}
)
.Build();
5. 连接到 MQTT 消息服务器
PublishAsync
var connectResult = await mqttClient.ConnectAsync(options);
这里我们使用异步编程,在订阅的同时允许消息发布,防止阻塞。
6. 订阅话题
ResultCode
if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
{
Console.WriteLine("Connected to MQTT broker successfully.");
// Subscribe to a topic
await mqttClient.SubscribeAsync(topic);
// Callback function when a message is received
mqttClient.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"Received message: {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
return Task.CompletedTask;
};
在此功能中,您还可以打印相应的接收到的消息。这使您可以根据需要查看和处理接收到的数据。
7. 发布消息
PublishAsync
for (int i = 0; i < 10; i++)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload($"Hello, MQTT! Message number {i}")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag()
.Build();
await mqttClient.PublishAsync(message);
await Task.Delay(1000); // Wait for 1 second
}
8. 取消订阅
要取消对消息主题的订阅,请调用:
await mqttClient.UnsubscribeAsync(topic);
9. 断开连接
要断开连接,请调用:
await mqttClient.DisconnectAsync();
完整代码
下面的代码展示了如何连接到服务器、订阅主题以及发布和接收消息。有关所有功能的完整演示,请参阅项目的 GitHub 代码库。
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
class Program
{
static async Task Main(string[] args)
{
string broker = '******.emqxsl.com';
int port = 8883;
string clientId = Guid.NewGuid().ToString();
string topic = "Csharp/mqtt";
string username = 'emqxtest';
string password = '**********';
// Create a MQTT client factory
var factory = new MqttFactory();
// Create a MQTT client instance
var mqttClient = factory.CreateMqttClient();
// Create MQTT client options
var options = new MqttClientOptionsBuilder()
.WithTcpServer(broker, port) // MQTT broker address and port
.WithCredentials(username, password) // Set username and password
.WithClientId(clientId)
.WithCleanSession()
.WithTls(
o =>
{
// The used public broker sometimes has invalid certificates. This sample accepts all
// certificates. This should not be used in live environments.
o.CertificateValidationHandler = _ => true;
// The default value is determined by the OS. Set manually to force version.
o.SslProtocol = SslProtocols.Tls12;
// Please provide the file path of your certificate file. The current directory is /bin.
var certificate = new X509Certificate("/opt/emqxsl-ca.crt", "");
o.Certificates = new List<X509Certificate> { certificate };
}
)
.Build();
// Connect to MQTT broker
var connectResult = await mqttClient.ConnectAsync(options);
if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
{
Console.WriteLine("Connected to MQTT broker successfully.");
// Subscribe to a topic
await mqttClient.SubscribeAsync(topic);
// Callback function when a message is received
mqttClient.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"Received message: {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
return Task.CompletedTask;
};
// Publish a message 10 times
for (int i = 0; i < 10; i++)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload($"Hello, MQTT! Message number {i}")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag()
.Build();
await mqttClient.PublishAsync(message);
await Task.Delay(1000); // Wait for 1 second
}
// Unsubscribe and disconnect
await mqttClient.UnsubscribeAsync(topic);
await mqttClient.DisconnectAsync();
}
else
{
Console.WriteLine($"Failed to connect to MQTT broker: {connectResult.ResultCode}");
}
}
}
测试
在 Visual Studio 中运行项目,我们可以在终端窗口上看到输出信息如下。客户端已成功连接到 EMQX Cloud Serverless 消息服务,每秒收到一条消息。
"CSharp/mqtt"
当您将消息发布到主题时,服务器将收到该消息,您可以在 MQTTX 和控制台上查看消息输出。
MQTTX 上显示收到的消息
终端上显示收到的消息
总结
本博客提供了通过 MQTTnet 库连接到 EMQX Cloud Serverless 消息服务的教程。按照这些说明,您已经成功创建了一个 .NET 应用程序,能够发布和订阅 EMQX Cloud Serverless 消息服务。
加入 EMQ 社区
要深入了解这个主题,请探索我们的 GitHub 代码库以获取源代码,加入我们的微信群进行讨论,并观看我们的 B 站视频进行实践学习。我们重视您的反馈和贡献,所以请随时参与并成为我们蓬勃发展的社区的一部分。