.NET Core集成MQTTnet實現MQTT服務端
一,什么是MQTT
二,常見的MQTT軟件
MQTT官網上有很多推薦,博友們可以慢慢看,附上一篇別人的開源 MQTT Broker 對比文章https://wivwiv.com/post/best-mqtt-broker
三,MQTT通訊類型
名字 |
值 |
流向 |
描述 |
CONNECT |
1 |
C->S |
客戶端請求與服務端建立連接 |
CONNACK |
2 |
S->C |
服務端確認連接建立 |
PUBLISH |
3 |
CóS |
發布消息 |
PUBACK |
4 |
CóS |
收到發布消息確認 |
PUBREC |
5 |
CóS |
發布消息收到 |
PUBREL |
6 |
CóS |
發布消息釋放 |
PUBCOMP |
7 |
CóS |
發布消息完成 |
SUBSCRIBE |
8 |
C->S |
訂閱請求 |
SUBACK |
9 |
S->C |
訂閱確認 |
UNSUBSCRIBE |
10 |
C->S |
取消訂閱 |
UNSUBACK |
11 |
S->C |
取消訂閱確認 |
PING |
12 |
C->S |
客戶端發送PING(連接保活)命令 |
PINGRSP |
13 |
S->C |
PING命令回復 |
DISCONNECT |
14 |
C->S |
斷開連接 |
四,MQTT QoS等級
QoS(Quality of Service),服務質量。MQTT中有三種Qos:
QoS0,At most once,至多一次;
QoS1,At least once,至少一次;
QoS2,Exactly once,確保只有一次。
QoS 是消息的發送方(Sender)和接受方(Receiver)之間達成的一個協議:
l QoS0 代表,Sender 發送的一條消息,Receiver 最多能收到一次,也就是說 Sender 盡力向 Receiver 發送消息,如果發送失敗,也就算了;
Sender 向 Receiver 發送一個包含消息數據的 PUBLISH 包,然后不管結果如何,丟棄掉已發送的 PUBLISH 包,一條消息的發送完成。
l QoS1 代表,Sender 發送的一條消息,Receiver 至少能收到一次,也就是說 Sender 向 Receiver 發送消息,如果發送失敗,會繼續重試,直到 Receiver 收到消息為止,
QoS 要保證消息至少到達 Sender 一次,所以有一個應答的機制。
- Sender 向 Receiver 發送一個帶有消息數據的 PUBLISH 包, 並在本地保存這個 PUBLISH 包。
- Receiver 收到 PUBLISH 包以后,向 Sender 發送一個 PUBACK 數據包,PUBACK 數據包沒有消息體(Payload),在可變頭中(Variable header)中有一個包標識(Packet Identifier),和它收到的 PUBLISH 包中的 Packet Identifier 一致。
- Sender 收到 PUBACK 之后,根據 PUBACK 包中的 Packet Identifier 找到本地保存的 PUBLISH 包,然后丟棄掉,一次消息的發送完成。
- 如果 Sender 在一段時間內沒有收到 PUBLISH 包對應的 PUBACK,它將該 PUBLISH 包的 DUP 標識設為 1(代表是重新發送的 PUBLISH 包),然后重新發送該 PUBLISH 包。重復這個流程,直到收到 PUBACK,然后執行第 3 步。
l QoS2 代表,Sender 發送的一條消息,Receiver 確保能收到而且只收到一次,也就是說 Sender 盡力向 Receiver 發送消息,如果發送失敗,會繼續重試,直到 Receiver 收到消息為止,同時保證 Receiver 不會因為消息重傳而收到重復的消息。
- Sender 發送 QoS 為 2 的 PUBLISH 數據包,數據包 Packet Identifier 為 P,並在本地保存該 PUBLISH 包;
- Receiver 收到 PUBLISH 數據包以后,在本地保存 PUBLISH 包的 Packet Identifier P,並回復 Sender 一個 PUBREC 數據包,PUBREC 數據包可變頭中的 Packet Identifier 為 P,沒有消息體(Payload);
- 當 Sender 收到 PUBREC,它就可以安全地丟棄掉初始的 Packet Identifier 為 P 的 PUBLISH 數據包,同時保存該 PUBREC 數據包,同時回復 Receiver 一個 PUBREL 數據包,PUBREL 數據包可變頭中的 Packet Identifier 為 P,沒有消息體;如果 Sender 在一定時間內沒有收到 PUBREC,它會把 PUBLISH 包的 DUP 標識設為 1,重新發送該 PUBLISH 數據包(Payload);
- 當 Receiver 收到 PUBREL 數據包,它可以丟棄掉保存的 PUBLISH 包的 Packet Identifier P,並回復 Sender 一個 PUBCOMP 數據包,PUBCOMP 數據包可變頭中的 Packet Identifier 為 P,沒有消息體(Payload);
- 當 Sender 收到 PUBCOMP 包,那么它認為數據包傳輸已完成,它會丟棄掉對應的 PUBREC 包。如果 Sender 在一定時間內沒有收到 PUBCOMP 包,它會重新發送 PUBREL 數據包。
注:QoS 是 Sender 和 Receiver 之間達成的協議,不是 Publisher 和 Subscriber 之間達成的協議。也就是說 Publisher 發布一條 QoS1 的消息,只能保證 Broker 能至少收到一次這個消息;至於對應的 Subscriber 能否至少收到一次這個消息,還要取決於 Subscriber 在 Subscribe 的時候和 Broker 協商的 QoS 等級。
五,QoS 和會話(Session)
如果 Client 想接收離線消息,必須使用持久化的會話(Clean Session = 0)連接到 Broker,這樣 Broker 才會存儲 Client 在離線期間沒有確認接收的 QoS 大於 1 的消息。
六,QoS降級問題
在 MQTT 協議中,從 Broker 到 Subscriber 這段消息傳遞的實際 QoS 等於:Publisher 發布消息時指定的 QoS 等級和 Subscriber 在訂閱時與 Broker 協商的 QoS 等級,這兩個 QoS 等級中的最小那一個。
七,QoS選擇
- 在以下情況下你可以選擇 QoS0:
- Client 和 Broker 之間的網絡連接非常穩定,例如一個通過有線網絡連接到 Broker 的測試用 Client;
- 可以接受丟失部分消息,比如你有一個傳感器以非常短的間隔發布狀態數據,所以丟一些也可以接受;
- 不需要離線消息。
- 在以下情況下你應該選擇 QoS1:
- 你需要接收所有的消息,而且你的應用可以接受並處理重復的消息;
- 你無法接受 QoS2 帶來的額外開銷,QoS1 發送消息的速度比 QoS2 快很多。
- 在以下情況下你應該選擇 QoS2:
- 你的應用必須接收到所有的消息,而且你的應用在重復的消息下無法正常工作,同時你也能接受 QoS2 帶來的額外開銷。
八,MQTT的retain標志位
當我們使用MQTT客戶端發布消息(PUBLISH)時,如果將RETAIN標志位設置為true,那么MQTT服務器會將最近收到的一條RETAIN標志位為true的消息保存在服務器端(內存或文件)。
特別注意:MQTT服務器只會為每一個Topic保存最近收到的一條RETAIN標志位為true的消息!也就是說,如果MQTT服務器上已經為某個Topic保存了一條Retained消息,當客戶端再次發布一條新的Retained消息,那么服務器上原來的那條消息會被覆蓋!
每當MQTT客戶端連接到MQTT服務器並訂閱了某個topic,如果該topic下有Retained消息,那么MQTT服務器會立即向客戶端推送該條Retained消息。
九,MQTT的will(遺願消息)
想一下以下場景,你的設備向服務端發送了在線的消息后突然爆炸了,它還沒來得及和服務端說它爆炸了就死了,這樣會勿讓我們以為它還在線,但其實它已經掛了。 有沒有方法讓客戶端非正常斷線后通知服務端呢? 有的,就是使用遺願消息,
在建立與服務端的連接時約定好遺願消息,服務端會存儲這個消息,當客戶端非正常斷線時則會向約定好的主題發送遺願消息,同樣,它也可以設置為retian。
十,基於MQTTnet實現MQTT服務端
新建一個控制台程序
安裝nuget包MQTTnet,.NET Core下安裝最新版本即可
修改Program.cs,編寫實現代碼

1 using MQTTnet; 2 using MQTTnet.Client.Receiving; 3 using MQTTnet.Protocol; 4 using MQTTnet.Server; 5 using System; 6 using System.Collections.Generic; 7 using System.Text; 8 using System.Threading.Tasks; 9 10 namespace MQTT 11 { 12 class Program 13 { 14 static void Main(string[] args) 15 { 16 MqttServerClass serverClass = new MqttServerClass(); 17 serverClass.StartMqttServer().Wait(); 18 Console.ReadLine(); 19 } 20 } 21 public static class Config 22 { 23 public static int Port { get; set; } = 1883; 24 public static string UserName { get; set; } = "Username"; 25 public static string Password { get; set; } = "Password"; 26 27 } 28 public class UserInstance 29 { 30 public string ClientId { get; set; } 31 public string UserName { get; set; } 32 public string Password { get; set; } 33 } 34 public class MqttServerClass 35 { 36 private IMqttServer mqttServer; 37 private List<MqttApplicationMessage> messages = new List<MqttApplicationMessage>(); 38 39 public async Task StartMqttServer() 40 { 41 try 42 { 43 if (mqttServer == null) 44 { 45 var optionsBuilder = new MqttServerOptionsBuilder() 46 .WithDefaultEndpoint() 47 .WithDefaultEndpointPort(Config.Port) 48 //連接攔截器 49 .WithConnectionValidator( 50 c => 51 { 52 var flag = c.Username == Config.UserName && c.Password == Config.Password; 53 if (!flag) 54 { 55 c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; 56 return; 57 } 58 //設置代碼為 Success 59 c.ReasonCode = MqttConnectReasonCode.Success; 60 //instances.Add(new UserInstance() //緩存到內存的List集合當中 61 //{ 62 // ClientId = c.ClientId, 63 // UserName = c.Username, 64 // Password = c.Password 65 //}); 66 }) 67 //訂閱攔截器 68 .WithSubscriptionInterceptor( 69 c => 70 { 71 if (c == null) return; 72 c.AcceptSubscription = true; 73 }) 74 //應用程序消息攔截器 75 .WithApplicationMessageInterceptor( 76 c => 77 { 78 if (c == null) return; 79 c.AcceptPublish = true; 80 }) 81 //clean sesison是否生效 82 .WithPersistentSessions(); 83 84 mqttServer = new MqttFactory().CreateMqttServer(); 85 86 //客戶端斷開連接攔截器 87 //mqttServer.UseClientDisconnectedHandler(c => 88 //{ 89 // //var user = instances.FirstOrDefault(t => t.ClientId == c.ClientId); 90 // //if (user != null) 91 // //{ 92 // // instances.Remove(user); 93 // //} 94 //}); 95 96 //服務開始 97 mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted); 98 //服務停止 99 mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped); 100 //客戶端連接 101 mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected); 102 //客戶端斷開連接(此事件會覆蓋攔截器) 103 mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected); 104 //客戶端訂閱 105 mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(OnMqttServerClientSubscribedTopic); 106 //客戶端取消訂閱 107 mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(OnMqttServerClientUnsubscribedTopic); 108 //服務端收到消息 109 mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServerApplicationMessageReceived); 110 111 await mqttServer.StartAsync(optionsBuilder.Build()); 112 113 //主動發送消息到客戶端 114 //await mqttServer.PublishAsync(new 115 // MqttApplicationMessage 116 //{ 117 // Topic = "testtopic", 118 // Payload = Encoding.UTF8.GetBytes("dsdsd") 119 //}); 120 //mqttServer.GetClientStatusAsync(); 121 //mqttServer.GetRetainedApplicationMessagesAsync(); 122 //mqttServer.GetSessionStatusAsync(); 123 124 } 125 } 126 catch (Exception ex) 127 { 128 Console.WriteLine($"MQTT Server start fail.>{ex.Message}"); 129 } 130 } 131 private void OnMqttServerStarted(EventArgs e) 132 { 133 if (mqttServer.IsStarted) 134 { 135 Console.WriteLine("MQTT服務啟動完成!"); 136 } 137 } 138 private void OnMqttServerStopped(EventArgs e) 139 { 140 if (!mqttServer.IsStarted) 141 { 142 Console.WriteLine("MQTT服務停止完成!"); 143 } 144 } 145 private void OnMqttServerClientConnected(MqttServerClientConnectedEventArgs e) 146 { 147 Console.WriteLine($"客戶端[{e.ClientId}]已連接"); 148 } 149 private void OnMqttServerClientDisconnected(MqttServerClientDisconnectedEventArgs e) 150 { 151 Console.WriteLine($"客戶端[{e.ClientId}]已斷開連接!"); 152 } 153 private void OnMqttServerClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs e) 154 { 155 Console.WriteLine($"客戶端[{e.ClientId}]已成功訂閱主題[{e.TopicFilter}]!"); 156 } 157 private void OnMqttServerClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e) 158 { 159 Console.WriteLine($"客戶端[{e.ClientId}]已成功取消訂閱主題[{e.TopicFilter}]!"); 160 } 161 private void OnMqttServerApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) 162 { 163 messages.Add(e.ApplicationMessage); 164 Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")); 165 Console.WriteLine($"客戶端[{e.ClientId}]>> Topic[{e.ApplicationMessage.Topic}] Payload[{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[] { })}] Qos[{e.ApplicationMessage.QualityOfServiceLevel}] Retain[{e.ApplicationMessage.Retain}]"); 166 } 167 } 168 }
下載客戶端測試工具MQTTX,安裝即用。
啟動程序,啟動MQTTX
在MQTTX種新建連接client a,鍵入Broker地址,端口,賬號和密碼,然后點擊連接
新建主題訂閱,訂閱主題testtopic/#,#代表所有testtopic下所有的子主題都訂閱
新建多一個連接client b,通過client b發布消息給client a
可看到Client a已收到消息。
文章介紹MQTT的部分引用許多人的文章,由於來源太多且我也不知道看的是否為原創,因此不細細列出來了。
文章介紹MQTT的部分引用許多人的文章,由於來源太多且我也不知道看的是否為原創,因此不細細列出來了。
文章介紹MQTT的部分引用許多人的文章,由於來源太多且我也不知道看的是否為原創,因此不細細列出來了。