C#中使用M2Mqtt連接mqtt server


1、創建控制台程序

環境
net core 3.1
M2MqttDotnetCore 版本1.0.0

2、配置連接信息進行連接

   選擇ca根證書及客戶端pfx格式證書(pfx證書由pem格式及key格式證書合成)
   設置ssl通訊版本(MqttSslProtocols.TLSv1_2)、
   設置mqtt版本(MqttProtocolVersion.Version_3_1)
   注冊證書驗證回調事件(cafileValidCallback)、
   注冊消息接收觸發事件(messageReceive)、
   注冊連接關閉事件(connectionClosed)、
   注冊消息推送事件(msgPublished)
        public static void MqttClientConnection(string clientId, string ip,int port, string name, string pwd)
        {
            X509Certificate2 caCert = new X509Certificate2("ca.crt");
            X509Certificate2 clientCert= new X509Certificate2("client.pfx", certificate);

            //單向SSL通信
            client = new MqttClient(ip, port, true, caCert, clientCert, MqttSslProtocols.TLSv1_2, new RemoteCertificateValidationCallback(cafileValidCallback));
            client.ProtocolVersion = MqttProtocolVersion.Version_3_1;
            //消息接受
            client.MqttMsgPublishReceived += new MqttClient.MqttMsgPublishEventHandler(messageReceive);
            client.ConnectionClosed += new MqttClient.ConnectionClosedEventHandler(connectionClosed);
            client.MqttMsgPublished += new MqttClient.MqttMsgPublishedEventHandler(msgPublished);
            //連接Broker
            client.Connect(clientId, name, pwd);
            Console.WriteLine("連接成功");
        }

3、消息接收事件

        /// <summary>
        /// 消息接收事件
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        static void messageReceive(object sender, MqttMsgPublishEventArgs e)
        {
            string msg = "接收消息[Topic:" + e.Topic + "   Message:" + Encoding.Default.GetString(e.Message)+"]";
            Console.WriteLine(msg);
        }

4、連接斷開事件

        /// <summary>
        /// 連接斷開事件
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        static void connectionClosed(object sender, EventArgs e)
        {
            string msg = "連接已斷開";
            Console.WriteLine(msg);
        }

5、消息推送事件

        /// <summary>
        /// 
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        static void msgPublished(object sender, MqttMsgPublishedEventArgs e)
        {
            string msg = "連接斷開";
            Console.WriteLine(msg);
        }

6、證書校驗回調事件

     /// <summary>
        /// 證書校驗
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="certificate"></param>
        /// <param name="chain"></param>
        /// <param name="sslPolicyErrors"></param>
        /// <returns></returns>
        static bool cafileValidCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
        {
            string msg = "X509 鏈狀態:";
            foreach (X509ChainStatus status in chain.ChainStatus)
            {
                msg += status.StatusInformation;
            }
            msg += "SSL策略問題:" + (int)sslPolicyErrors;

            Console.WriteLine(msg);

            if (sslPolicyErrors != SslPolicyErrors.None)
                return false;
            return true;
        }

7、主題訂閱

 /// <summary>
        /// 主題訂閱
        /// </summary>
        /// <param name="topic">主題</param>
        /// <param name="qosLevels">消息質量</param>
        public static void MsgSubscribe(string[] topic,byte[] qosLevels)
        {
            if (client.IsConnected)
            {
                client.Subscribe(topic, qosLevels);
                foreach (string item in topic)
                {
                    Console.WriteLine("訂閱主題[" + item.ToString() + "]");
                }
            }
            else
            {
                Console.WriteLine("未連接!");
            }
        }

8、消息推送

        /// <summary>
        /// 消息推送
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="msg"></param>
        public static void MsgPublish(string topic,string pubmsg)
        {
            client.Publish(topic, Encoding.Default.GetBytes(pubmsg));
            string msg = "推送消息[Topic:" + topic + "   Message:" + pubmsg + "]";
            Console.WriteLine(msg);
        }

9、測試連接

            string clientId = "test";
            string ip = "127.0.0.1";
            int port = 8883;
            string name = "";
            string pwd = "";

            //連接
            MqttClientConnection(clientId, ip, port, name, pwd);

10、測試訂閱

            string[] topic = { "test" };
            byte[] qosLevels = { 0, 1 };
            //訂閱
            MqttClientHelper.MsgSubscribe(topic, new byte[] { 1 });

11、測試發布

            Task.Run(() =>
            {
                while (true)
                {
                    try
                    {
                        //推送
                        string msg = "當前時間:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
                        MqttClientHelper.MsgPublish("test", msg);
                        Thread.Sleep(1000);
                    }
                    catch (Exception err)
                    {
                        Console.WriteLine(err.Message);
                    }
                }
            });


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM