一、什么是MQTT
MQTT(message queuing telemetry transport)是IBM開發的即時通訊協議,是一種發布/訂閱極其輕量級的消息傳輸協議,專門為網絡受限設備、低寬帶以及高延遲和不可靠的網絡而設計的。由於以上輕量級的特點,是實現智能家居的首選傳輸協議,相比於XMPP,更加輕量級而且占用寬帶低。
簡單來說MQTT是一種通信協議,要實現發布/訂閱就必須遵循這個協議。
二、實現MQTT通訊協議.NET開源庫有哪些?
MQTTnet、MqttDotNet、nMQTT、M2MQTT等,這里我們使用MQTTnet(但MQTTnet搜到的教程基本都是2.7及以下版本的,我們使用的是3.0.9版本)
三、展示MQTT實現效果圖
例:客戶端1只要訂閱了positon主題,客戶端2、客戶端3、客戶端4.....同樣訂閱了position主題則他們之間就能共享position主題的所發的內容了。
如果客戶端1訂閱了position主題,客戶端2訂閱了beautiful主題,1發給消息2是收不到的。
四、創建.NETCore項目(Server和Client)
五、服務器
添加Nuget包:安裝MQTTnet
class Program { public static IMqttServer mqttServer; static void Main(string[] args) { StartMqttServer(); } //啟動Mqtt服務器 private static async void StartMqttServer() { try { //驗證客戶端信息 var options = new MqttServerOptions { //連接驗證 ConnectionValidator = new MqttServerConnectionValidatorDelegate(p => { if (p.ClientId == "SpecialClient") { if (p.Username != "USER" || p.Password != "PASS") { p.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; } } }) }; //設置端口號 options.DefaultEndpointOptions.Port = 8031; //創建Mqtt服務器 mqttServer = new MqttFactory().CreateMqttServer(); //開啟訂閱事件 mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(MqttNetServer_SubscribedTopic); //取消訂閱事件 mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttNetServer_UnSubscribedTopic); //客戶端消息事件 mqttServer.UseApplicationMessageReceivedHandler(MqttServe_ApplicationMessageReceived); //客戶端連接事件 mqttServer.UseClientConnectedHandler(MqttNetServer_ClientConnected); //客戶端斷開事件 mqttServer.UseClientDisconnectedHandler(MqttNetServer_ClientDisConnected); //啟動服務器 await mqttServer.StartAsync(options); Console.WriteLine("服務器啟動成功!輸入任意內容並回車停止服務!"); Console.ReadLine(); await mqttServer.StopAsync(); } catch (Exception e) { Console.Write($"服務器啟動失敗 Msg:{e}"); } } /// <summary> /// 客戶訂閱 /// </summary> private static void MqttNetServer_SubscribedTopic(MqttServerClientSubscribedTopicEventArgs e) { //客戶端Id var ClientId = e.ClientId; var Topic = e.TopicFilter.Topic; Console.WriteLine($"客戶端[{ClientId}]已訂閱主題:{Topic}"); } /// <summary> /// 客戶取消訂閱 /// </summary> private static void MqttNetServer_UnSubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e) { //客戶端Id var ClientId = e.ClientId; var Topic = e.TopicFilter; Console.WriteLine($"客戶端[{ClientId}]已取消訂閱主題:{Topic}"); } /// <summary> /// 接收消息 /// </summary> private static void MqttServe_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { var ClientId = e.ClientId; var Topic = e.ApplicationMessage.Topic; var Payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); var Qos = e.ApplicationMessage.QualityOfServiceLevel; var Retain = e.ApplicationMessage.Retain; Console.WriteLine($"客戶端[{ClientId}]>> 主題:[{Topic}] 負載:[{Payload}] Qos:[{Qos}] 保留:[{Retain}]"); } /// <summary> /// 客戶連接 /// </summary> private static void MqttNetServer_ClientConnected(MqttServerClientConnectedEventArgs e) { var ClientId = e.ClientId; Console.WriteLine($"客戶端[{ClientId}]已連接"); } /// <summary> /// 客戶連接斷開 /// </summary> private static void MqttNetServer_ClientDisConnected(MqttServerClientDisconnectedEventArgs e) { var ClientId = e.ClientId; Console.WriteLine($"客戶端[{ClientId}]已斷開連接"); } }
六、客戶端
也要添加Nuget包:安裝MQTTnet
public static IMqttClient mqttClient; static void Main(string[] args) { ConnectMqttServerAsync(); ImportData(); } private static async void ConnectMqttServerAsync() { try { var factory = new MqttFactory(); mqttClient = factory.CreateMqttClient(); var options = new MqttClientOptionsBuilder() .WithTcpServer("127.0.0.1", 8031) .WithCredentials("test", "test") .WithClientId(Guid.NewGuid().ToString().Substring(0, 5)) .Build(); //消息 mqttClient.UseApplicationMessageReceivedHandler(e => { Console.WriteLine("### 收到的信息 ###"); Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");//主題 Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");//頁面信息 Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");//消息等級 Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");//是否保留 Console.WriteLine(); }); //重連機制 mqttClient.UseDisconnectedHandler(async e => { Console.WriteLine("與服務器斷開連接!"); await Task.Delay(TimeSpan.FromSeconds(5)); try { await mqttClient.ConnectAsync(options); } catch (Exception exp) { Console.Write($"重新連接服務器失敗 Msg:{exp}"); } }); await mqttClient.ConnectAsync(options); Console.Write("連接服務器成功!輸入任意內容並回車進入菜單頁面!"); } catch (Exception exp) { Console.Write($"連接服務器失敗 Msg:{exp}"); } } private static void ImportData() { Console.ReadLine(); bool isExit = false; while (!isExit) { Console.WriteLine(@"請輸入 1.訂閱主題 2.取消訂閱 3.發送消息 4.退出"); var input = Console.ReadLine(); switch (input) { case "1": Console.WriteLine(@"請輸入主題名稱:"); var topicName = Console.ReadLine(); Subscribe(topicName); break; case "2": Console.WriteLine(@"請輸入需要取消訂閱主題名稱:"); topicName = Console.ReadLine(); Unsubscribe(topicName); break; case "3": Console.WriteLine("請輸入需要發送的主題名稱"); topicName = Console.ReadLine(); Console.WriteLine("請輸入需要發送的消息"); var message = Console.ReadLine(); Publish(topicName, message); break; case "4": isExit = true; break; default: Console.WriteLine("請輸入正確指令!"); break; } } } /// <summary> /// 訂閱 /// </summary> /// <param name="topicName"></param> private static async void Subscribe(string topicName) { string topic = topicName.Trim(); if (string.IsNullOrEmpty(topic)) { Console.Write("訂閱主題不能為空!"); return; } if (!mqttClient.IsConnected) { Console.Write("MQTT客戶端尚未連接!"); return; } await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build()); } /// <summary> /// 取消訂閱 /// </summary> /// <param name="topicName"></param> private static async void Unsubscribe(string topicName) { string topic = topicName.Trim(); if (string.IsNullOrEmpty(topic)) { Console.Write("訂閱主題不能為空!"); return; } if (!mqttClient.IsConnected) { Console.Write("MQTT客戶端尚未連接!"); return; } await mqttClient.UnsubscribeAsync(topic); } /// <summary> /// 發送消息 /// </summary> /// <param name="message"></param> private static async void Publish(string topicName, string message) { string topic = topicName.Trim(); string msg = message.Trim(); if (string.IsNullOrEmpty(topic)) { Console.Write("主題不能為空!"); return; } if (!mqttClient.IsConnected) { Console.Write("MQTT客戶端尚未連接!"); return; } var MessageBuilder = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(msg) .WithExactlyOnceQoS() .WithRetainFlag() .Build(); await mqttClient.PublishAsync(MessageBuilder); }
源代碼:
鏈接:https://pan.baidu.com/s/1rxTBZHHAmkDVcO6XmJPXng
提取碼:05qr
后續會陸續更新其他資料,喜歡請關注哦!