C#中使用M2Mqtt连接mqtt server


1、创建控制台程序

环境
net core 3.1
M2MqttDotnetCore 版本1.0.0

2、配置连接信息进行连接

   选择ca根证书及客户端pfx格式证书(pfx证书由pem格式及key格式证书合成)
   设置ssl通讯版本(MqttSslProtocols.TLSv1_2)、
   设置mqtt版本(MqttProtocolVersion.Version_3_1)
   注册证书验证回调事件(cafileValidCallback)、
   注册消息接收触发事件(messageReceive)、
   注册连接关闭事件(connectionClosed)、
   注册消息推送事件(msgPublished)
        public static void MqttClientConnection(string clientId, string ip,int port, string name, string pwd)
        {
            X509Certificate2 caCert = new X509Certificate2("ca.crt");
            X509Certificate2 clientCert= new X509Certificate2("client.pfx", certificate);

            //单向SSL通信
            client = new MqttClient(ip, port, true, caCert, clientCert, MqttSslProtocols.TLSv1_2, new RemoteCertificateValidationCallback(cafileValidCallback));
            client.ProtocolVersion = MqttProtocolVersion.Version_3_1;
            //消息接受
            client.MqttMsgPublishReceived += new MqttClient.MqttMsgPublishEventHandler(messageReceive);
            client.ConnectionClosed += new MqttClient.ConnectionClosedEventHandler(connectionClosed);
            client.MqttMsgPublished += new MqttClient.MqttMsgPublishedEventHandler(msgPublished);
            //连接Broker
            client.Connect(clientId, name, pwd);
            Console.WriteLine("连接成功");
        }

3、消息接收事件

        /// <summary>
        /// 消息接收事件
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        static void messageReceive(object sender, MqttMsgPublishEventArgs e)
        {
            string msg = "接收消息[Topic:" + e.Topic + "   Message:" + Encoding.Default.GetString(e.Message)+"]";
            Console.WriteLine(msg);
        }

4、连接断开事件

        /// <summary>
        /// 连接断开事件
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        static void connectionClosed(object sender, EventArgs e)
        {
            string msg = "连接已断开";
            Console.WriteLine(msg);
        }

5、消息推送事件

        /// <summary>
        /// 
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        static void msgPublished(object sender, MqttMsgPublishedEventArgs e)
        {
            string msg = "连接断开";
            Console.WriteLine(msg);
        }

6、证书校验回调事件

     /// <summary>
        /// 证书校验
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="certificate"></param>
        /// <param name="chain"></param>
        /// <param name="sslPolicyErrors"></param>
        /// <returns></returns>
        static bool cafileValidCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
        {
            string msg = "X509 链状态:";
            foreach (X509ChainStatus status in chain.ChainStatus)
            {
                msg += status.StatusInformation;
            }
            msg += "SSL策略问题:" + (int)sslPolicyErrors;

            Console.WriteLine(msg);

            if (sslPolicyErrors != SslPolicyErrors.None)
                return false;
            return true;
        }

7、主题订阅

 /// <summary>
        /// 主题订阅
        /// </summary>
        /// <param name="topic">主题</param>
        /// <param name="qosLevels">消息质量</param>
        public static void MsgSubscribe(string[] topic,byte[] qosLevels)
        {
            if (client.IsConnected)
            {
                client.Subscribe(topic, qosLevels);
                foreach (string item in topic)
                {
                    Console.WriteLine("订阅主题[" + item.ToString() + "]");
                }
            }
            else
            {
                Console.WriteLine("未连接!");
            }
        }

8、消息推送

        /// <summary>
        /// 消息推送
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="msg"></param>
        public static void MsgPublish(string topic,string pubmsg)
        {
            client.Publish(topic, Encoding.Default.GetBytes(pubmsg));
            string msg = "推送消息[Topic:" + topic + "   Message:" + pubmsg + "]";
            Console.WriteLine(msg);
        }

9、测试连接

            string clientId = "test";
            string ip = "127.0.0.1";
            int port = 8883;
            string name = "";
            string pwd = "";

            //连接
            MqttClientConnection(clientId, ip, port, name, pwd);

10、测试订阅

            string[] topic = { "test" };
            byte[] qosLevels = { 0, 1 };
            //订阅
            MqttClientHelper.MsgSubscribe(topic, new byte[] { 1 });

11、测试发布

            Task.Run(() =>
            {
                while (true)
                {
                    try
                    {
                        //推送
                        string msg = "当前时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
                        MqttClientHelper.MsgPublish("test", msg);
                        Thread.Sleep(1000);
                    }
                    catch (Exception err)
                    {
                        Console.WriteLine(err.Message);
                    }
                }
            });


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM