.NET Core集成MQTTnet實現MQTT服務端


.NET Core集成MQTTnet實現MQTT服務端

一,什么是MQTT

MQTT(消息隊列遙測傳輸)是 ISO 標准(ISO/IEC PRF 20922)下基於 發布/ 訂閱范式的消息協議。它工作在 TCP/IP協議族上,是為 硬件性能低下的遠程設備以及網絡狀況糟糕的情況下而設計的發布/訂閱型消息 協議,為此,它需要一個 消息中間件 
MQTT是一個基於 客戶端- 服務器的消息發布/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鏈路通信傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已廣泛使用。——引自百度百科

二,常見的MQTT軟件

MQTT官網上有很多推薦,博友們可以慢慢看,附上一篇別人的開源 MQTT Broker 對比文章https://wivwiv.com/post/best-mqtt-broker

三,MQTT通訊類型

名字

流向

描述

CONNECT

1

C->S

客戶端請求與服務端建立連接

CONNACK

2

S->C

服務端確認連接建立

PUBLISH

3

CóS

發布消息

PUBACK

4

CóS

收到發布消息確認

PUBREC

5

CóS

發布消息收到

PUBREL

6

CóS

發布消息釋放

PUBCOMP

7

CóS

發布消息完成

SUBSCRIBE

8

C->S

訂閱請求

SUBACK

9

S->C

訂閱確認

UNSUBSCRIBE

10

C->S

取消訂閱

UNSUBACK

11

S->C

取消訂閱確認

PING

12

C->S

客戶端發送PING(連接保活)命令

PINGRSP

13

S->C

PING命令回復

DISCONNECT

14

C->S

斷開連接

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

四,MQTT QoS等級

QoS(Quality of Service),服務質量。MQTT中有三種Qos:

QoS0,At most once,至多一次;

QoS1,At least once,至少一次;

QoS2,Exactly once,確保只有一次。

QoS 是消息的發送方(Sender)和接受方(Receiver)之間達成的一個協議:


 l  QoS0 代表,Sender 發送的一條消息,Receiver 最多能收到一次,也就是說 Sender 盡力向 Receiver 發送消息,如果發送失敗,也就算了;

 Sender 向 Receiver 發送一個包含消息數據的 PUBLISH 包,然后不管結果如何,丟棄掉已發送的 PUBLISH 包,一條消息的發送完成。


 l  QoS1 代表,Sender 發送的一條消息,Receiver 至少能收到一次,也就是說 Sender 向 Receiver 發送消息,如果發送失敗,會繼續重試,直到 Receiver 收到消息為止,

QoS 要保證消息至少到達 Sender 一次,所以有一個應答的機制。

  • Sender 向 Receiver 發送一個帶有消息數據的 PUBLISH 包, 並在本地保存這個 PUBLISH 包。
  • Receiver 收到 PUBLISH 包以后,向 Sender 發送一個 PUBACK 數據包,PUBACK 數據包沒有消息體(Payload),在可變頭中(Variable header)中有一個包標識(Packet Identifier),和它收到的 PUBLISH 包中的 Packet Identifier 一致。
  • Sender 收到 PUBACK 之后,根據 PUBACK 包中的 Packet Identifier 找到本地保存的 PUBLISH 包,然后丟棄掉,一次消息的發送完成。
  • 如果 Sender 在一段時間內沒有收到 PUBLISH 包對應的 PUBACK,它將該 PUBLISH 包的 DUP 標識設為 1(代表是重新發送的 PUBLISH 包),然后重新發送該 PUBLISH 包。重復這個流程,直到收到 PUBACK,然后執行第 3 步。

 l  QoS2 代表,Sender 發送的一條消息,Receiver 確保能收到而且只收到一次,也就是說 Sender 盡力向 Receiver 發送消息,如果發送失敗,會繼續重試,直到 Receiver 收到消息為止,同時保證 Receiver 不會因為消息重傳而收到重復的消息。

  • Sender 發送 QoS 為 2 的 PUBLISH 數據包,數據包 Packet Identifier 為 P,並在本地保存該 PUBLISH 包;
  • Receiver 收到 PUBLISH 數據包以后,在本地保存 PUBLISH 包的 Packet Identifier P,並回復 Sender 一個 PUBREC 數據包,PUBREC 數據包可變頭中的 Packet Identifier 為 P,沒有消息體(Payload);
  • 當 Sender 收到 PUBREC,它就可以安全地丟棄掉初始的 Packet Identifier 為 P 的 PUBLISH 數據包,同時保存該 PUBREC 數據包,同時回復 Receiver 一個 PUBREL 數據包,PUBREL 數據包可變頭中的 Packet Identifier 為 P,沒有消息體;如果 Sender 在一定時間內沒有收到 PUBREC,它會把 PUBLISH 包的 DUP 標識設為 1,重新發送該 PUBLISH 數據包(Payload);
  • 當 Receiver 收到 PUBREL 數據包,它可以丟棄掉保存的 PUBLISH 包的 Packet Identifier P,並回復 Sender 一個 PUBCOMP 數據包,PUBCOMP 數據包可變頭中的 Packet Identifier 為 P,沒有消息體(Payload);
  • 當 Sender 收到 PUBCOMP 包,那么它認為數據包傳輸已完成,它會丟棄掉對應的 PUBREC 包。如果 Sender 在一定時間內沒有收到 PUBCOMP 包,它會重新發送 PUBREL 數據包。

:QoS 是 Sender 和 Receiver 之間達成的協議,不是 Publisher 和 Subscriber 之間達成的協議。也就是說 Publisher 發布一條 QoS1 的消息,只能保證 Broker 能至少收到一次這個消息;至於對應的 Subscriber 能否至少收到一次這個消息,還要取決於 Subscriber 在 Subscribe 的時候和 Broker 協商的 QoS 等級。

五,QoS 和會話(Session)

如果 Client 想接收離線消息,必須使用持久化的會話(Clean Session = 0)連接到 Broker,這樣 Broker 才會存儲 Client 在離線期間沒有確認接收的 QoS 大於 1 的消息。

六,QoS降級問題

在 MQTT 協議中,從 Broker 到 Subscriber 這段消息傳遞的實際 QoS 等於:Publisher 發布消息時指定的 QoS 等級和 Subscriber 在訂閱時與 Broker 協商的 QoS 等級,這兩個 QoS 等級中的最小那一個。

七,QoS選擇

  • 在以下情況下你可以選擇 QoS0:
  1. Client 和 Broker 之間的網絡連接非常穩定,例如一個通過有線網絡連接到 Broker 的測試用 Client;
  2. 可以接受丟失部分消息,比如你有一個傳感器以非常短的間隔發布狀態數據,所以丟一些也可以接受;
  3. 不需要離線消息。
  • 在以下情況下你應該選擇 QoS1:
  1. 你需要接收所有的消息,而且你的應用可以接受並處理重復的消息;
  2. 你無法接受 QoS2 帶來的額外開銷,QoS1 發送消息的速度比 QoS2 快很多。
  • 在以下情況下你應該選擇 QoS2:
  1. 你的應用必須接收到所有的消息,而且你的應用在重復的消息下無法正常工作,同時你也能接受 QoS2 帶來的額外開銷。

八,MQTT的retain標志位

當我們使用MQTT客戶端發布消息(PUBLISH)時,如果將RETAIN標志位設置為true,那么MQTT服務器會將最近收到的一條RETAIN標志位為true的消息保存在服務器端(內存或文件)。

特別注意:MQTT服務器只會為每一個Topic保存最近收到的一條RETAIN標志位為true的消息!也就是說,如果MQTT服務器上已經為某個Topic保存了一條Retained消息,當客戶端再次發布一條新的Retained消息,那么服務器上原來的那條消息會被覆蓋!

每當MQTT客戶端連接到MQTT服務器並訂閱了某個topic,如果該topic下有Retained消息,那么MQTT服務器會立即向客戶端推送該條Retained消息。

九,MQTT的will(遺願消息)

想一下以下場景,你的設備向服務端發送了在線的消息后突然爆炸了,它還沒來得及和服務端說它爆炸了就死了,這樣會勿讓我們以為它還在線,但其實它已經掛了。 有沒有方法讓客戶端非正常斷線后通知服務端呢? 有的,就是使用遺願消息,

在建立與服務端的連接時約定好遺願消息,服務端會存儲這個消息,當客戶端非正常斷線時則會向約定好的主題發送遺願消息,同樣,它也可以設置為retian。

十,基於MQTTnet實現MQTT服務端

新建一個控制台程序

安裝nuget包MQTTnet,.NET Core下安裝最新版本即可

修改Program.cs,編寫實現代碼

  1 using MQTTnet;
  2 using MQTTnet.Client.Receiving;
  3 using MQTTnet.Protocol;
  4 using MQTTnet.Server;
  5 using System;
  6 using System.Collections.Generic;
  7 using System.Text;
  8 using System.Threading.Tasks;
  9 
 10 namespace MQTT
 11 {
 12     class Program
 13     {
 14         static void Main(string[] args)
 15         {
 16             MqttServerClass serverClass = new MqttServerClass();
 17             serverClass.StartMqttServer().Wait();
 18             Console.ReadLine();
 19         }
 20     }
 21     public static class Config
 22     {
 23         public static int Port { get; set; } = 1883;
 24         public static string UserName { get; set; } = "Username";
 25         public static string Password { get; set; } = "Password";
 26 
 27     }
 28     public class UserInstance
 29     {
 30         public string ClientId { get; set; }
 31         public string UserName { get; set; }
 32         public string Password { get; set; }
 33     }
 34     public class MqttServerClass
 35     {
 36         private IMqttServer mqttServer;
 37         private List<MqttApplicationMessage> messages = new List<MqttApplicationMessage>();
 38 
 39         public async Task StartMqttServer()
 40         {
 41             try
 42             {
 43                 if (mqttServer == null)
 44                 {
 45                     var optionsBuilder = new MqttServerOptionsBuilder()
 46                     .WithDefaultEndpoint()
 47                     .WithDefaultEndpointPort(Config.Port)
 48                     //連接攔截器
 49                     .WithConnectionValidator(
 50                         c =>
 51                         {
 52                             var flag = c.Username == Config.UserName && c.Password == Config.Password;
 53                             if (!flag)
 54                             {
 55                                 c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
 56                                 return;
 57                             }
 58                             //設置代碼為 Success
 59                             c.ReasonCode = MqttConnectReasonCode.Success;
 60                             //instances.Add(new UserInstance()  //緩存到內存的List集合當中
 61                             //{
 62                             //    ClientId = c.ClientId,
 63                             //    UserName = c.Username,
 64                             //    Password = c.Password
 65                             //});
 66                         })
 67                     //訂閱攔截器
 68                     .WithSubscriptionInterceptor(
 69                         c =>
 70                         {
 71                             if (c == null) return;
 72                             c.AcceptSubscription = true;
 73                         })
 74                     //應用程序消息攔截器
 75                     .WithApplicationMessageInterceptor(
 76                         c =>
 77                         {
 78                             if (c == null) return;
 79                             c.AcceptPublish = true;
 80                         })
 81                     //clean sesison是否生效
 82                    .WithPersistentSessions();
 83 
 84                     mqttServer = new MqttFactory().CreateMqttServer();
 85 
 86                     //客戶端斷開連接攔截器
 87                     //mqttServer.UseClientDisconnectedHandler(c =>
 88                     //{
 89                     //    //var user = instances.FirstOrDefault(t => t.ClientId == c.ClientId);
 90                     //    //if (user != null)
 91                     //    //{
 92                     //    //    instances.Remove(user);
 93                     //    //}
 94                     //});
 95 
 96                     //服務開始
 97                     mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted);
 98                     //服務停止
 99                     mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped);
100                     //客戶端連接
101                     mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected);
102                     //客戶端斷開連接(此事件會覆蓋攔截器)
103                     mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected);
104                     //客戶端訂閱
105                     mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(OnMqttServerClientSubscribedTopic);
106                     //客戶端取消訂閱
107                     mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(OnMqttServerClientUnsubscribedTopic);
108                     //服務端收到消息
109                     mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServerApplicationMessageReceived);
110 
111                     await mqttServer.StartAsync(optionsBuilder.Build());
112 
113                     //主動發送消息到客戶端
114                     //await mqttServer.PublishAsync(new
115                     //     MqttApplicationMessage
116                     //{
117                     //    Topic = "testtopic",
118                     //    Payload = Encoding.UTF8.GetBytes("dsdsd")
119                     //});
120                     //mqttServer.GetClientStatusAsync();
121                     //mqttServer.GetRetainedApplicationMessagesAsync();
122                     //mqttServer.GetSessionStatusAsync();
123 
124                 }
125             }
126             catch (Exception ex)
127             {
128                 Console.WriteLine($"MQTT Server start fail.>{ex.Message}");
129             }
130         }
131         private void OnMqttServerStarted(EventArgs e)
132         {
133             if (mqttServer.IsStarted)
134             {
135                 Console.WriteLine("MQTT服務啟動完成!");
136             }
137         }
138         private void OnMqttServerStopped(EventArgs e)
139         {
140             if (!mqttServer.IsStarted)
141             {
142                 Console.WriteLine("MQTT服務停止完成!");
143             }
144         }
145         private void OnMqttServerClientConnected(MqttServerClientConnectedEventArgs e)
146         {
147             Console.WriteLine($"客戶端[{e.ClientId}]已連接");
148         }
149         private void OnMqttServerClientDisconnected(MqttServerClientDisconnectedEventArgs e)
150         {
151             Console.WriteLine($"客戶端[{e.ClientId}]已斷開連接!");
152         }
153         private void OnMqttServerClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs e)
154         {
155             Console.WriteLine($"客戶端[{e.ClientId}]已成功訂閱主題[{e.TopicFilter}]!");
156         }
157         private void OnMqttServerClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e)
158         {
159             Console.WriteLine($"客戶端[{e.ClientId}]已成功取消訂閱主題[{e.TopicFilter}]!");
160         }
161         private void OnMqttServerApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
162         {
163             messages.Add(e.ApplicationMessage);
164             Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff"));
165             Console.WriteLine($"客戶端[{e.ClientId}]>> Topic[{e.ApplicationMessage.Topic}] Payload[{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[] { })}] Qos[{e.ApplicationMessage.QualityOfServiceLevel}] Retain[{e.ApplicationMessage.Retain}]");
166         }
167     }
168 }
View Code

下載客戶端測試工具MQTTX,安裝即用。

啟動程序,啟動MQTTX

在MQTTX種新建連接client a,鍵入Broker地址,端口,賬號和密碼,然后點擊連接

 新建主題訂閱,訂閱主題testtopic/#,#代表所有testtopic下所有的子主題都訂閱

新建多一個連接client b,通過client b發布消息給client a

可看到Client a已收到消息。

文章介紹MQTT的部分引用許多人的文章,由於來源太多且我也不知道看的是否為原創,因此不細細列出來了。

文章介紹MQTT的部分引用許多人的文章,由於來源太多且我也不知道看的是否為原創,因此不細細列出來了。

文章介紹MQTT的部分引用許多人的文章,由於來源太多且我也不知道看的是否為原創,因此不細細列出來了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM