mqttnet3.0用法


.net常用的mqtt類庫有2個,m2mqtt和mqttnet兩個類庫

當然了,這兩個庫的教程網上一搜一大把

但mqttnet搜到的教程全是2.7及以下版本的,但3.0版語法卻不再兼容,升級版本會導致很多問題,今天進行了成功的升級,現記錄下來

參考文檔地址:https://github.com/chkr1011/MQTTnet/wiki/Client

上代碼:

///開源庫地址:https://github.com/chkr1011/MQTTnet
///對應文檔:https://github.com/chkr1011/MQTTnet/wiki/Client

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace MqttServerTest
{
    public partial class mqtt測試工具 : Form
    {
        private IMqttClient mqttClient = null;
        private bool isReconnect = true;

        public mqtt測試工具()
        {
            InitializeComponent();
        }

        private void Form1_Load(object sender, EventArgs e)
        {

        }

        private async void BtnPublish_Click(object sender, EventArgs e)
        {
            await Publish();
        }

        private async void BtnSubscribe_ClickAsync(object sender, EventArgs e)
        {
            await Subscribe();
        }

        private async Task Publish()
        {
            string topic = txtPubTopic.Text.Trim();

            if (string.IsNullOrEmpty(topic))
            {
                MessageBox.Show("發布主題不能為空!");
                return;
            }

            string inputString = txtSendMessage.Text.Trim();
            try
            {

                var message = new MqttApplicationMessageBuilder()
        .WithTopic(topic)
        .WithPayload(inputString)
        .WithExactlyOnceQoS()
        .WithRetainFlag()
        .Build();

                await mqttClient.PublishAsync(message);
            }
            catch (Exception ex)
            {

                Invoke((new Action(() =>
                {
                    txtReceiveMessage.AppendText($"發布主題失敗!" + Environment.NewLine + ex.Message + Environment.NewLine);
                })));
            }




        }

        private async Task Subscribe()
        {
            string topic = txtSubTopic.Text.Trim();

            if (string.IsNullOrEmpty(topic))
            {
                MessageBox.Show("訂閱主題不能為空!");
                return;
            }

            if (!mqttClient.IsConnected)
            {
                MessageBox.Show("MQTT客戶端尚未連接!");
                return;
            }

            // Subscribe to a topic
            await mqttClient.SubscribeAsync(new TopicFilterBuilder()
                .WithTopic(topic)
                .WithAtMostOnceQoS()
                .Build()
                );
            Invoke((new Action(() =>
            {
                txtReceiveMessage.AppendText($"已訂閱[{topic}]主題{Environment.NewLine}");
            })));

        }

        private async Task ConnectMqttServerAsync()
        {
            // Create a new MQTT client.

            if (mqttClient == null)
            {
                try
                {
                    var factory = new MqttFactory();
                    mqttClient = factory.CreateMqttClient();

                    var options = new MqttClientOptionsBuilder()
                        .WithTcpServer(txtIp.Text, Convert.ToInt32(txtPort.Text)).WithCredentials(txtUsername.Text, txtPsw.Text).WithClientId(txtClientId.Text) // Port is optional
                        .Build();


                    await mqttClient.ConnectAsync(options, CancellationToken.None);
                    Invoke((new Action(() =>
                    {
                        txtReceiveMessage.AppendText($"連接到MQTT服務器成功!" + txtIp.Text);
                    })));
                    mqttClient.UseApplicationMessageReceivedHandler(e =>
                    {

                        Invoke((new Action(() =>
                        {
                            txtReceiveMessage.AppendText($"收到訂閱消息!" + Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
                        })));
                   
                    });
                }
                catch (Exception ex)
                {

                    Invoke((new Action(() =>
                    {
                        txtReceiveMessage.AppendText($"連接到MQTT服務器失敗!" + Environment.NewLine + ex.Message + Environment.NewLine);
                    })));
                }
            }
        }

        private void MqttClient_Connected(object sender, EventArgs e)
        {
            Invoke((new Action(() =>
            {
                txtReceiveMessage.Clear();
                txtReceiveMessage.AppendText("已連接到MQTT服務器!" + Environment.NewLine);
            })));
        }

        private void MqttClient_Disconnected(object sender, EventArgs e)
        {
            Invoke((new Action(() =>
            {
                txtReceiveMessage.Clear();
                DateTime curTime = new DateTime();
                curTime = DateTime.UtcNow;
                txtReceiveMessage.AppendText($">> [{curTime.ToLongTimeString()}]");
                txtReceiveMessage.AppendText("已斷開MQTT連接!" + Environment.NewLine);
            })));

            //Reconnecting
            if (isReconnect)
            {
                Invoke((new Action(() =>
                {
                    txtReceiveMessage.AppendText("正在嘗試重新連接" + Environment.NewLine);
                })));

                var options = new MqttClientOptionsBuilder()
                    .WithClientId(txtClientId.Text)
                    .WithTcpServer(txtIp.Text, Convert.ToInt32(txtPort.Text))
                    .WithCredentials(txtUsername.Text, txtPsw.Text)
                    //.WithTls()
                    .WithCleanSession()
                    .Build();
                Invoke((new Action(async () =>
                {
                    await Task.Delay(TimeSpan.FromSeconds(5));
                    try
                    {
                        await mqttClient.ConnectAsync(options);
                    }
                    catch
                    {
                        txtReceiveMessage.AppendText("### RECONNECTING FAILED ###" + Environment.NewLine);
                    }
                })));
            }
            else
            {
                Invoke((new Action(() =>
                {
                    txtReceiveMessage.AppendText("已下線!" + Environment.NewLine);
                })));
            }
        }

        private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
        {
            Invoke((new Action(() =>
            {
                txtReceiveMessage.AppendText($">> {"### RECEIVED APPLICATION MESSAGE ###"}{Environment.NewLine}");
            })));
            Invoke((new Action(() =>
            {
                txtReceiveMessage.AppendText($">> Topic = {e.ApplicationMessage.Topic}{Environment.NewLine}");
            })));
            Invoke((new Action(() =>
            {
                txtReceiveMessage.AppendText($">> Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
            })));
            Invoke((new Action(() =>
            {
                txtReceiveMessage.AppendText($">> QoS = {e.ApplicationMessage.QualityOfServiceLevel}{Environment.NewLine}");
            })));
            Invoke((new Action(() =>
            {
                txtReceiveMessage.AppendText($">> Retain = {e.ApplicationMessage.Retain}{Environment.NewLine}");
            })));
        }

        private void btnLogIn_Click(object sender, EventArgs e)
        {
            isReconnect = true;
            Task.Run(async () => { await ConnectMqttServerAsync(); });
        }

        private void btnLogout_Click(object sender, EventArgs e)
        {
            isReconnect = false;
            Task.Run(async () => { await mqttClient.DisconnectAsync(); });
        }

    }
}

參考:https://www.cnblogs.com/bjjjunjie/p/mqtt.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM