ASP.NET Core 實現 MQTT通訊協議 Demo(開源庫:MQTTnet)


一、什么是MQTT

  MQTT(message queuing telemetry transport)是IBM開發的即時通訊協議,是一種發布/訂閱極其輕量級的消息傳輸協議,專門為網絡受限設備、低寬帶以及高延遲和不可靠的網絡而設計的。由於以上輕量級的特點,是實現智能家居的首選傳輸協議,相比於XMPP,更加輕量級而且占用寬帶低。 簡單來說MQTT是一種通信協議,要實現發布/訂閱就必須遵循這個協議。

二、實現MQTT通訊協議.NET開源庫有哪些?

  MQTTnet、MqttDotNet、nMQTT、M2MQTT等,這里我們使用MQTTnet(但MQTTnet搜到的教程基本都是2.7及以下版本的,我們使用的是3.0.9版本)
  官網項目網址:https://github.com/chkr1011/MQTTnet

三、展示MQTT實現效果圖

  

  例:客戶端1只要訂閱了positon主題,客戶端2、客戶端3、客戶端4.....同樣訂閱了position主題則他們之間就能共享position主題的所發的內容了

  如果客戶端1訂閱了position主題,客戶端2訂閱了beautiful主題,1發給消息2是收不到的。

四、創建.NETCore項目(Server和Client)

  

五、服務器

  添加Nuget包:安裝MQTTnet

  

class Program
    {
        public static IMqttServer mqttServer;
        static void Main(string[] args)
        {
            StartMqttServer();
        }

        //啟動Mqtt服務器
        private static async void StartMqttServer()
        {
            try
            {
                //驗證客戶端信息
                var options = new MqttServerOptions
                {
                    //連接驗證
                    ConnectionValidator = new MqttServerConnectionValidatorDelegate(p =>
                    {
                        if (p.ClientId == "SpecialClient")
                        {
                            if (p.Username != "USER" || p.Password != "PASS")
                            {
                                p.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                            }
                        }
                    })
                };

                //設置端口號
                options.DefaultEndpointOptions.Port = 8031;

                //創建Mqtt服務器
                mqttServer = new MqttFactory().CreateMqttServer();

                //開啟訂閱事件
                mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(MqttNetServer_SubscribedTopic);

                //取消訂閱事件
                mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttNetServer_UnSubscribedTopic);

                //客戶端消息事件
                mqttServer.UseApplicationMessageReceivedHandler(MqttServe_ApplicationMessageReceived);

                //客戶端連接事件
                mqttServer.UseClientConnectedHandler(MqttNetServer_ClientConnected);

                //客戶端斷開事件
                mqttServer.UseClientDisconnectedHandler(MqttNetServer_ClientDisConnected);

                //啟動服務器
                await mqttServer.StartAsync(options);

                Console.WriteLine("服務器啟動成功!輸入任意內容並回車停止服務!");
                Console.ReadLine();

                await mqttServer.StopAsync();
            }
            catch (Exception e)
            {
                Console.Write($"服務器啟動失敗 Msg:{e}");
            }

        }

        /// <summary>
        /// 客戶訂閱
        /// </summary>
        private static void MqttNetServer_SubscribedTopic(MqttServerClientSubscribedTopicEventArgs e)
        {
            //客戶端Id
            var ClientId = e.ClientId;
            var Topic = e.TopicFilter.Topic;
            Console.WriteLine($"客戶端[{ClientId}]已訂閱主題:{Topic}");
        }

        /// <summary>
        /// 客戶取消訂閱
        /// </summary>
        private static void MqttNetServer_UnSubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e)
        {
            //客戶端Id
            var ClientId = e.ClientId;
            var Topic = e.TopicFilter;
            Console.WriteLine($"客戶端[{ClientId}]已取消訂閱主題:{Topic}");
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        private static void MqttServe_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
        {
            var ClientId = e.ClientId;
            var Topic = e.ApplicationMessage.Topic;
            var Payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
            var Qos = e.ApplicationMessage.QualityOfServiceLevel;
            var Retain = e.ApplicationMessage.Retain;
            Console.WriteLine($"客戶端[{ClientId}]>> 主題:[{Topic}] 負載:[{Payload}] Qos:[{Qos}] 保留:[{Retain}]");
        }

        /// <summary>
        /// 客戶連接
        /// </summary>
        private static void MqttNetServer_ClientConnected(MqttServerClientConnectedEventArgs e)
        {
            var ClientId = e.ClientId;
            Console.WriteLine($"客戶端[{ClientId}]已連接");
        }

        /// <summary>
        /// 客戶連接斷開
        /// </summary>
        private static void MqttNetServer_ClientDisConnected(MqttServerClientDisconnectedEventArgs e)
        {
            var ClientId = e.ClientId;
            Console.WriteLine($"客戶端[{ClientId}]已斷開連接");
        }
    }

六、客戶端

  也要添加Nuget包:安裝MQTTnet

 public static IMqttClient mqttClient;

        static void Main(string[] args)
        {
            ConnectMqttServerAsync();
            ImportData();
        }
        private static async void ConnectMqttServerAsync()
        {
            try
            {

                var factory = new MqttFactory();

                mqttClient = factory.CreateMqttClient();

                var options = new MqttClientOptionsBuilder()
                    .WithTcpServer("127.0.0.1", 8031)
                    .WithCredentials("test", "test")
                    .WithClientId(Guid.NewGuid().ToString().Substring(0, 5))
                    .Build();

                //消息
                mqttClient.UseApplicationMessageReceivedHandler(e =>
                {
                    Console.WriteLine("### 收到的信息 ###");
                    Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");//主題
                    Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");//頁面信息
                    Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");//消息等級
                    Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");//是否保留
                    Console.WriteLine();
                });

                //重連機制
                mqttClient.UseDisconnectedHandler(async e =>
                {
                    Console.WriteLine("與服務器斷開連接!");
                    await Task.Delay(TimeSpan.FromSeconds(5));
                    try
                    {
                        await mqttClient.ConnectAsync(options);
                    }
                    catch (Exception exp)
                    {
                        Console.Write($"重新連接服務器失敗 Msg:{exp}");
                    }
                });

               await mqttClient.ConnectAsync(options);

               Console.Write("連接服務器成功!輸入任意內容並回車進入菜單頁面!");
            }
            catch (Exception exp)
            {
                Console.Write($"連接服務器失敗 Msg:{exp}");
            }
        }

        private static void ImportData()
        {
            Console.ReadLine();
            bool isExit = false;
            while (!isExit)
            {
                Console.WriteLine(@"請輸入
                    1.訂閱主題
                    2.取消訂閱
                    3.發送消息
                    4.退出");
                var input = Console.ReadLine();

                switch (input)
                {
                    case "1":
                        Console.WriteLine(@"請輸入主題名稱:");
                        var topicName = Console.ReadLine();
                        Subscribe(topicName);
                        break;
                    case "2":
                        Console.WriteLine(@"請輸入需要取消訂閱主題名稱:");
                        topicName = Console.ReadLine();
                        Unsubscribe(topicName);
                        break;
                    case "3":
                        Console.WriteLine("請輸入需要發送的主題名稱");
                        topicName = Console.ReadLine();
                        Console.WriteLine("請輸入需要發送的消息");
                        var message = Console.ReadLine();
                        Publish(topicName, message);
                        break;
                    case "4":
                        isExit = true;
                        break;
                    default:
                        Console.WriteLine("請輸入正確指令!");
                        break;
                }
            }
        }

        /// <summary>
        /// 訂閱
        /// </summary>
        /// <param name="topicName"></param>
        private static async void Subscribe(string topicName)
        {
            string topic = topicName.Trim();
            if (string.IsNullOrEmpty(topic))
            {
                Console.Write("訂閱主題不能為空!");
                return;
            }

            if (!mqttClient.IsConnected)
            {
                Console.Write("MQTT客戶端尚未連接!");
                return;
            }
            await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
        }

        /// <summary>
        /// 取消訂閱
        /// </summary>
        /// <param name="topicName"></param>
        private static async void Unsubscribe(string topicName)
        {
            string topic = topicName.Trim();
            if (string.IsNullOrEmpty(topic))
            {
                Console.Write("訂閱主題不能為空!");
                return;
            }

            if (!mqttClient.IsConnected)
            {
                Console.Write("MQTT客戶端尚未連接!");
                return;
            }
            await mqttClient.UnsubscribeAsync(topic);
        }

        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="message"></param>
        private static async void Publish(string topicName, string message)
        {
            string topic = topicName.Trim();
            string msg = message.Trim();

            if (string.IsNullOrEmpty(topic))
            {
                Console.Write("主題不能為空!");
                return;
            }
            if (!mqttClient.IsConnected)
            {
                Console.Write("MQTT客戶端尚未連接!");
                return;
            }

            var MessageBuilder = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(msg)
                .WithExactlyOnceQoS()
                .WithRetainFlag()
                .Build();

            await mqttClient.PublishAsync(MessageBuilder);

        }

  源代碼:

  鏈接:https://pan.baidu.com/s/1rxTBZHHAmkDVcO6XmJPXng
  提取碼:05qr
  后續會陸續更新其他資料,喜歡請關注哦!

  我的博客:https://www.cnblogs.com/duhaoran/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM