一.IOT設備的特性
- 硬件能力差(存儲能力基本只有幾MB,CPU頻率低連使用HTTP請求都很奢侈)
- 系統千差萬別(Brillo,mbedOS,RIOT等)
- 如使用電池供電,電量消耗敏感
- 如果是小設備,設備基數大需要維持大量在線鏈接
- 網絡情況不穩定,移動網絡網絡資費貴,需要盡量減少開銷和穩定
在以上這樣苛刻的場景下很多技術上常用在智能設備方案都望而卻步,總結一下我們主要面對下面三個問題:
- socket.io,websocket? 不同的系統可能無法使用HTTP,設備資源可能使用HTTP都奢侈 。
- TCP/IP自定協議? 雖然不用在意系統,自定義報文怎么解決網絡開銷問題?
- 自主研發成本高,使用第三方IOT平台容易被技術或硬件綁定 。
二. MQTT為什么適合IOT場景
- MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通訊協議,使用方式比較類似於隊列軟件比如RabbitMQ,使用發布/訂閱的方式提供互相之間的通訊。
- MQTT是為在計算能力有限,且工作在低帶寬、不可靠的網絡的遠程傳感器和控制設備通訊而設計的協議。
MQTT主要特性:
- 該協議支持所有平台,幾乎可以把所有聯網物品和外部連接起來
- 有三種消息發布服務質量
- “至多一次”,消息發布完全依賴底層 TCP/IP 網絡。會發生消息丟失或重復。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。
- “至少一次”,確保消息到達,但消息重復可能會發生。
- “只有一次”,確保消息到達一次。這一級別可用於如下情況,在計費系統中,消息重復或丟失會導致不正確的結果。 - 小型傳輸,開銷很小(固定長度的頭部是 2 字節),協議交換最小化,以降低網絡流量;
- 使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制;
除了MQTT的協議特性外還有一些客觀原因:
- 對語言友好主流語言的客戶端都有
- 大部分硬件方案天生支持
- 數十個MQTT服務器端程序可供選擇
- 社區成熟解決方案被廣泛運用遇到問題方便尋求幫助
以上基本是我們為什么也會選擇MQTT作為IOT協議的原因,需要更多的了解或者查看客戶端支不支持和服務端實現可以參考官方github:
- MQTT官方 : https://github.com/mqtt/mqtt.github.io
- MQTT協議中文版:https://github.com/mcxiaoke/mqtt
- 服務中間件列表: https://github.com/mqtt/mqtt.github.io/wiki/servers
- 客戶端列表: https://github.com/mqtt/mqtt.github.io/wiki/libraries
- 客戶端簡單Demo可以見官方文檔:https://github.com/chkr1011/MQTTnet/wiki/Client
三、MQTTnet
MQTTnet 是一個基於 MQTT 通信的高性能 .NET 開源庫,它同時支持 MQTT 服務器端和客戶端。而且作者也保持更新,目前支持新版的.NET core,這也是選擇 MQTTnet 的原因。
MQTTnet 在 Github 還有 MqttDotNet、nMQTT、M2MQTT 等。
在解決方案在右鍵單擊-選擇“管理解決方案的 NuGet 程序包”-在“瀏覽”選項卡下面搜索 MQTTnet,為服務端項目和客戶端項目都安裝上 MQTTnet 庫。
四、 服務端
MQTT 服務端主要用於與多個客戶端保持連接,並處理客戶端的發布和訂閱等邏輯。一般很少直接從服務端發送消息給客戶端(可以使用 mqttServer.Publish(appMsg);
直接發送消息),多數情況下服務端都是轉發主題匹配的客戶端消息,在系統中起到一個中介的作用。
1、 創建服務端並啟動
IMqttServer mqttServer = new MqttFactory().CreateMqttServer();
通過上述方式創建了一個 IMqttServer
對象后,調用其 StartAsync
方法即可啟動 MQTT 服務。值得注意的是:之前版本采用的是 Start
方法,作者也是緊跟 C# 語言新特性,能使用異步的地方也都改為異步方式。
Task.Run(async () => { await mqttServer.StartAsync(optionsBuilder.Build()); });
2、 配置設置、驗證客戶端
WithDefaultEndpointPort是設置使用的端口,協議里默認是用1883,不過調試我改成8222了。
WithConnectionValidator是用於連接驗證,驗證client id,用戶名,密碼什么的。示例沒用數據庫,隨便寫死了兩個值。
還有其他配置選項,比如加密協議,可以在官方文檔里看看,示例就是先簡單能用。
var optionsBuilder = new MqttServerOptionsBuilder() .WithConnectionBacklog(100) .WithDefaultEndpointPort(8222) .WithConnectionValidator(ValidatingMqttClients()) ;
private static Action<MqttConnectionValidatorContext> ValidatingMqttClients() { // Setup client validator. var options =new MqttServerOptions(); options.ConnectionValidator = c => { Dictionary<string, string> c_u = new Dictionary<string, string>(); c_u.Add("client001", "username001"); c_u.Add("client002", "username002"); Dictionary<string, string> u_psw = new Dictionary<string, string>(); u_psw.Add("username001", "psw001"); u_psw.Add("username002", "psw002"); if (c_u.ContainsKey(c.ClientId) && c_u[c.ClientId] == c.Username) { if (u_psw.ContainsKey(c.Username) && u_psw[c.Username] == c.Password) { c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted; } else { c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; } } else { c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected; } }; return options.ConnectionValidator; }
3、 相關事件
服務端支持 ClientConnected
、ClientDisconnected
和 ApplicationMessageReceived
事件,分別用來檢查客戶端連接、客戶端斷開以及接收客戶端發來的消息。
其中 ClientConnected
和 ClientDisconnected
事件的事件參數一個客戶端連接對象 ConnectedMqttClient
,通過該對象可以獲取客戶端ID標識 ClientId
和 MQTT 版本 ProtocolVersion
。
ApplicationMessageReceived
的事件參數包含了客戶端ID標識 ClientId
和 MQTT 應用消息 MqttApplicationMessage
對象,通過該對象可以獲取主題 Topic
、QoS QualityOfServiceLevel
和消息內容 Payload
等信息。
mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived; mqttServer.ClientConnected += MqttServer_ClientConnected; mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;
五、 客戶端
MQTT 與 HTTP 不同,后者是基於請求/響應方式的,服務器端無法直接發送數據給客戶端。而 MQTT 是基於發布/訂閱模式的,所有的客戶端均與服務端保持連接狀態。
那么客戶端之間是如何通信的呢?
具體邏輯是:某些客戶端向服務端訂閱它感興趣(主題)的消息,另一些客戶端向服務端發布(主題)消息,服務端將訂閱和發布的主題進行匹配,並將消息轉發給匹配通過的客戶端。
1、 創建客戶端並連接
使用 MQTTnet 創建 MQTT 也非常簡單,只需要使用 MqttClientFactory
對象的 CreateMqttClient
方法即可。
var factory = new MqttFactory(); var mqttClient = factory.CreateMqttClient();
創建客戶端對象后,調用其異步方法 ConnectAsync
來連接到服務端。
await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
調用該方法時需要傳遞一個 MqttClientTcpOptions
對象(之前的版本是在創建對象時使用該選項),該選項包含了客戶端ID標識 ClientId
、服務端地址(可以使用IP地址或域名)Server
、端口號 Port
、用戶名 UserName
、密碼 Password
等信息。
// Create TCP based options using the builder. var options = new MqttClientOptionsBuilder() .WithClientId("Client1") .WithTcpServer("broker.hivemq.com") .WithCredentials("bud", "%spencer%") .WithTls() .WithCleanSession() .Build();
2、 相關事件
客戶端支持 Connected
、Disconnected
和 UseApplicationMessageReceivedHandler事件,用來處理客戶端與服務端連接、客戶端從服務端斷開以及客戶端收到消息的事情。
mqttClient.UseApplicationMessageReceivedHandler(e => { Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); Console.WriteLine(); Task.Run(() => mqttClient.PublishAsync("hello/world")); });
3、 訂閱消息
客戶端連接到服務端之后,可以使用 SubscribeAsync
異步方法訂閱消息,該方法可以傳入一個可枚舉或可變參數的主題過濾器 TopicFilter
參數,主題過濾器包含主題名和 QoS 等級。
mqttClient.UseConnectedHandler(async e => { // Subscribe to a topic await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("家/客廳/空調/#").Build()); });
4、 發布消息
mqtt的消息包含topic和payload兩部分。topic就是消息主題(類型),用於另外一端判斷這個消息是干什么用的。payload就是實際想要發送的數據。
- WithTopic給一個topic。
- WithPayload給一個msg。
- WithAtMostOnceQoS設置QoS,至多1次。也可以設為別的。
- PublishAsync異步發送出去。
string topic = "topic/hello"; var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(msg) .WithAtMostOnceQoS() .WithRetainFlag() .Build(); await mqttServer.PublishAsync(message);
六、跟蹤消息
// Write all trace messages to the console window. MqttNetGlobalLogger.LogMessagePublished += (s, e) => { var trace = $">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"; if (e.TraceMessage.Exception != null) { trace += Environment.NewLine + e.TraceMessage.Exception.ToString(); } Console.WriteLine(trace); };
七、 運行效果
以下分別是服務端、客戶端1和客戶端2的運行效果,其中客戶端1和客戶端2只是同一個項目運行了兩個實例。客戶端1用於訂閱傳感器的“溫度”數據,並模擬上位機(如 APP 等)發送開關控制命令;客戶端2訂閱上位機傳來的“開關”控制命令,並模擬溫度傳感器上報溫度數據。
1、 服務端
2、 客戶端1
3、 客戶端2
八、完整Demo
完整實例:https://github.com/landbroken/MQTTLearning
使用MQTTNet在WPF框架下搭建MQTT客戶端: https://blog.csdn.net/lordwish/article/details/84970800
使用MQTTNet在WPF框架下創建MQTT服務端(broker):https://blog.csdn.net/lordwish/article/details/85042476
使用MQTTNet+ASP.NET Core創建MQTT服務器(broker):https://blog.csdn.net/lordwish/article/details/86708777