使用 MQTT 协议与 IoT 中心通信
本文介绍设备如何使用受支持的 MQTT 行为来与 Azure IoT 中心通信。 IoT 中心允许设备通过以下方式与 IoT 中心设备终结点通信:
- 在 TCP 端口 8883 上使用 MQTT v3.1.1
- 在 TCP 端口 443 上使用基于 WebSocket 的 MQTT v3.1.1。
所有通过 IoT 中心进行的设备通信都必须使用 TLS/SSL 来保护。 因此,IoT 中心不支持通过 TCP 端口 1883 进行的不安全的连接。
比较 IoT 中心和事件网格中的 MQTT 支持
IoT 中心不是功能完备的 MQTT 中转站,不支持 MQTT v3.1.1 标准中指定的所有行为。 如果你的解决方案需要 MQTT,我们推荐查看 Azure 事件网格中的 MQTT 支持。 事件网格使用发布-订阅消息传送模型在灵活分层主题上实现 MQTT 客户端之间的双向通信。 它还使你能够将 MQTT 消息路由到 Azure 服务或自定义终结点,以便进一步处理。
下表说明了两个服务之间的 MQTT 支持差异:
IoT 中心 | 事件网格 |
---|---|
设备与云应用之间耦合紧密的客户端-服务器模型。 | 解耦发布者和订阅者的发布-订阅模型。 |
对 MQTT v3.1.1 提供有限功能支持,对预览版中的 MQTT v5 提供有限功能支持。 未计划更多功能支持。 | MQTT v3.1.1 和 v5 协议支持,并计划了更多功能支持和行业合规性。 |
静态、预定义的主题。 | 支持通配符的自定义分层主题。 |
不支持云到设备广播和设备到设备通信。 | 支持设备到云、高扇出比的云到设备广播以及设备到设备通信模式。 |
256KB 消息大小上限。 | 512KB 消息大小上限。 |
连接到 IoT 中心
设备可以通过以下选项之一使用 MQTT 协议连接到 IoT 中心:
- 直接通过 MQTT 协议。
许多企业和教育网络环境中都会 MQTT 端口(TCP 端口8883)。 如果无法在防火墙中打开端口 8883,建议使用基于 WebSocket 的 MQTT。 基于 WebSocket 的 MQTT 通过端口 443 进行通信,该端口在网络环境中几乎始终是打开的。 若要了解如何在使用 Azure IoT SDK 时指定 MQTT 和基于 WebSocket 的 MQTT 协议,请参阅使用设备 SDK。
使用设备 SDK
支持 MQTT 协议的设备 SDK 可用于 Java、Node.js、C、C# 和 Python。 设备 SDK 使用选定身份验证机制来连接到 IoT 中心。 要使用 MQTT 协议,必须将客户端协议参数设置为 MQTT。 还可以在客户端协议参数中指定基于 WebSocket 的 MQTT。 默认情况下,设备 SDK 在 CleanSession 标志设置为 0 的情况下连接到 IoT 中心,并使用 QoS 1 来与 IoT 中心交换消息。 虽然可以配置 QoS 0 以加快消息交换速度,但你应该注意,这种传递不能得到保证或确认。 出于此原因,QoS 0 通常称为“用后即焚”。
当设备连接到 IoT 中心时,设备 SDK 会提供方法,让设备与 IoT 中心交换消息。
下表包含了每种受支持语言的代码示例链接,并指定了通过 MQTT 或基于 WebSocket 的 MQTT 协议建立到 IoT 中心的连接时要使用的参数。
websockets=True
以下片段展示如何在使用 Azure IoT Node.js SDK 时指定基于 WebSocket 的 MQTT 协议:
var Client = require('azure-iot-device').Client;
var Protocol = require('azure-iot-device-mqtt').MqttWs;
var client = Client.fromConnectionString(deviceConnectionString, Protocol);
以下片段展示如何在使用 Azure IoT Python SDK 时指定基于 WebSocket 的 MQTT 协议:
from azure.iot.device.aio import IoTHubDeviceClient
device_client = IoTHubDeviceClient.create_from_connection_string(deviceConnectionString, websockets=True)
默认的 keep-alive 超时
为了确保客户端/IoT 中心连接保持活动状态,服务和客户端会定期向对方发送一个 keep-alive ping。 使用 IoT SDK 的客户端按下表中定义的时间间隔发送 keep-alive:
语言 | 默认的 keep-alive 时间间隔 | 可配置性 |
---|---|---|
Node.js | 180 秒 | 否 |
Java | 230 秒 | |
C | 240 秒 | |
C# | 300 秒* | |
Python | 60 秒 |
*C# SDK 将 MQTT KeepAliveInSeconds 属性的默认值定义为 300 秒。 实际上,SDK 会在每次设置的 keep-alive 持续时间发送四次 ping 请求。 换句话说,SDK 每 75 秒发送一次 keep-alive ping。
根据 MQTT v3.1.1 规范,IoT 中心的 keep-alive ping 间隔是客户端 keep-alive 值的 1.5 倍;但 IoT 中心将服务器端超时最大值限制为 29.45 分钟(1767 秒)。 存在此限制是因为所有 Azure 服务都绑定到 Azure 负载均衡器 TCP 空闲超时,即 29.45 分钟。
(230 * 1.5) - 230 = 115
1767 / 1.5 = 1177
将设备应用从 AMQP 迁移到 MQTT
如果使用设备 SDK,则从使用 AMQP 切换到 MQTT 需要在客户端初始化中更改协议参数,如前所述。
执行此操作时,请确保检查下列各项:
AMQP 针对许多条件返回错误,而 MQTT 会终止连接。 因此异常处理逻辑可能需要进行一些更改。
MQTT 在接收云到设备消息时不支持拒绝操作。 如果后端应用需要接收来自设备应用的响应,请考虑使用直接方法。
Python SDK 不支持 AMQP。
直接使用 MQTT 协议(作为设备)
如果设备不能使用设备 SDK,仍可以使用端口 8883 上的 MQTT 协议连接到公共设备端点。
在 CONNECT 数据包中,设备应使用以下值:
使用适用于 Visual Studio Code 的 Azure IoT 中心扩展
HostName={iotHub-hostname};DeviceId=javadevice;SharedAccessSignature=SharedAccessSignature sr={iotHub-hostname}%2Fdevices%2FMyDevice01%2Fapi-version%3D2016-11-14&sig=vSgHBMUG.....Ntg%3d&se=1456481802SharedAccessSignature sr={iotHub-hostname}%2Fdevices%2FMyDevice01%2Fapi-version%3D2016-11-14&sig=vSgHBMUG.....Ntg%3d&se=1456481802
devices/{device-id}/messages/events/devices/{device-id}/messages/events/{property-bag}
直接使用 MQTT 协议(作为模块)
可使用模块标识通过 MQTT 连接到IoT 中心,类似于作为设备的形式连接到 IoT 中心。 有关通过 MQTT 作为设备连接到IoT 中心的详细信息,请参阅直接使用 MQTT 协议(用作设备)。 但需要使用以下值:
{device-id}/{module-id}.azure-devices.net/{device_id}/{module_id}/?api-version=2021-04-12devices/{device-id}/modules/{module-id}/messages/events/devices/{device-id}/modules/{module-id}/messages/events/devices/{device-id}/modules/{module-id}/#
有关将 MQTT 与模块结合使用的详细信息,请参阅使用 IoT Edge 发布和订阅并详细了解 IoT Edge 中心 MQTT 终结点。
使用 MQTT 而无 Azure IoT SDK 的示例
IoT MQTT 示例存储库包含 C/C++、Python 和 CLI 示例,它们演示了如何在不使用 Azure 设备 SDK 的情况下发送遥测消息、接收云到设备的消息以及使用设备孪生。
mosquitto_pub
TLS/SSL 配置
若要直接使用 MQTT 协议,客户端必须通过 TLS/SSL 连接。 尝试跳过此步骤失败并显示连接错误。
要建立 TLS 连接,你可能需要下载并引用 Azure 使用的 DigiCert 根证书。 在 2023 年 2 月 15 日至 10 月 15 日之间,Azure IoT 中心将其 TLS 根证书从 DigiCert Baltimore 根证书迁移到 DigiCert 全局根 G2。 在迁移期间,你应该在设备上拥有这两个证书,以确保连接性。 有关迁移的详细信息,请参阅将 IoT 资源迁移到新的 TLS 证书根。有关这些证书的详细信息,请参阅 Digicert 的网站。
下面的示例演示如何使用 Eclipse Foundation 提供的 Python 版本的 Paho MQTT 库实现此配置。
首先,从命令行环境安装 Paho 库:
pip install paho-mqtt
然后,在 Python 脚本中实现客户端。 替换以下代码片段中的这些占位符:
-----BEGIN CERTIFICATE----------END CERTIFICATE-----"\r\n
from paho.mqtt import client as mqtt
import ssl
path_to_root_cert = "<local path to digicert.cer file>"
device_id = "<device id from device registry>"
sas_token = "<generated SAS token>"
iot_hub_name = "<iot hub name>"
def on_connect(client, userdata, flags, rc):
print("Device connected with result code: " + str(rc))
def on_disconnect(client, userdata, rc):
print("Device disconnected with result code: " + str(rc))
def on_publish(client, userdata, mid):
print("Device sent message")
client = mqtt.Client(client_id=device_id, protocol=mqtt.MQTTv311)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
client.username_pw_set(username=iot_hub_name+".azure-devices.net/" +
device_id + "/?api-version=2021-04-12", password=sas_token)
client.tls_set(ca_certs=path_to_root_cert, certfile=None, keyfile=None,
cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
client.tls_insecure_set(False)
client.connect(iot_hub_name+".azure-devices.net", port=8883)
client.publish("devices/" + device_id + "/messages/events/", '{"id":123}', qos=1)
client.loop_forever()
若要使用设备证书进行身份验证,请使用以下代码片段中指定的更改更新以前的代码片段。 有关如何准备基于证书的身份验证的详细信息,请参阅使用 X.509 CA 证书验证设备的获取 X.509 CA 证书部分。
# Create the client as before
# ...
# Set the username but not the password on your client
client.username_pw_set(username=iot_hub_name+".azure-devices.net/" +
device_id + "/?api-version=2021-04-12", password=None)
# Set the certificate and key paths on your client
cert_file = "<local path to your certificate file>"
key_file = "<local path to your device key file>"
client.tls_set(ca_certs=path_to_root_cert, certfile=cert_file, keyfile=key_file,
cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
# Connect as before
client.connect(iot_hub_name+".azure-devices.net", port=8883)
发送“设备到云”消息
devices/{device-id}/messages/events/devices/{device-id}/messages/events/{property-bag}{property-bag}
RFC 2396-encoded(<PropertyName1>)=RFC 2396-encoded(<PropertyValue1>)&RFC 2396-encoded(<PropertyName2>)=RFC 2396-encoded(<PropertyValue2>)…
下面的列表描述了 IoT 中心特定于实现的行为:
ctapplication/json;charset=utf-8devices/{device-id}/messages/events/$.ct=application%2Fjson%3Bcharset%3Dutf-8
接收“云到设备”消息
devices/{device-id}/messages/devicebound/###?
devices/{device-id}/messages/devicebound/#
devices/{device-id}/messages/devicebound/devices/{device-id}/messages/devicebound/{property-bag}{property-bag}
在云到设备消息中,属性包中的值的表示形式如下表所示:
nullkeykey=key=value
null
/?prop1&prop2=&prop3=a%20string
当设备应用使用 QoS 2 订阅主题时,IoT 中心会在 SUBACK 包中授予最高 QoS 级别 1。 之后,IoT 中心会使用 QoS 1 将消息传送到设备。
检索设备克隆的属性
$iothub/twin/res/#$iothub/twin/GET/?$rid={request id}$iothub/twin/res/{status}/?$rid={request-id}
请求 ID 可以是消息属性值的任何有效值,且需要验证确保状态是整数。 有关详细信息,请参阅使用 IoT 中心发送设备到云和云到设备的消息。
响应正文包含设备孪生的 properties 节,如以下响应示例所示:
{
"desired": {
"telemetrySendFrequency": "5m",
"$version": 12
},
"reported": {
"telemetrySendFrequency": "5m",
"batteryLevel": 55,
"$version": 123
}
}
可能的状态代码为:
状态 | 说明 |
---|---|
200 | Success |
429 | 请求过多(受限制)。 有关详细信息,请参阅 IoT 中心限制 |
5** | 服务器错误 |
更新设备孪生的报告属性
$rid
以下序列描述了设备如何在 IoT 中心中更新设备孪生中报告的属性:
$iothub/twin/res/#$iothub/twin/PATCH/properties/reported/?$rid={request-id}$iothub/twin/res/{status}/?$rid={request-id}
null
{
"telemetrySendFrequency": "35m",
"batteryLevel": 60
}
可能的状态代码为:
状态 | 说明 |
---|---|
204 | 成功(不返回任何内容) |
400 | 错误的请求。 格式不正确的 JSON |
429 | 请求过多(受限),如 IoT 中心限制中所述 |
5** | 服务器错误 |
下面的 Python 代码片段演示了通过 MQTT(使用 Paho MQTT 客户端)进行的孪生体报告属性更新过程:
from paho.mqtt import client as mqtt
# authenticate the client with IoT Hub (not shown here)
client.subscribe("$iothub/twin/res/#")
rid = "1"
twin_reported_property_patch = "{\"firmware_version\": \"v1.1\"}"
client.publish("$iothub/twin/PATCH/properties/reported/?$rid=" +
rid, twin_reported_property_patch, qos=0)
$iothub/twin/res/204/?$rid=1&$version=6204$rid=1$version
接收所需属性更新通知
$iothub/twin/PATCH/properties/desired/?$version={new-version}
{
"telemetrySendFrequency": "5m",
"route": null,
"$version": 8
}
null$version
响应直接方法
$iothub/methods/POST/#$iothub/methods/POST/{method-name}/?$rid={request-id}
$iothub/methods/res/{status}/?$rid={request-id}
后续步骤
要了解有关使用 MQTT 的详细信息,请参阅:
若要详细了解如何使用 IoT 设备 SDK,请参阅:
若要深入了解如何规划 IoT 中心部署,请参阅: