.net常用的mqtt類庫有2個,m2mqtt和mqttnet兩個類庫
當然了,這兩個庫的教程網上一搜一大把
但mqttnet搜到的教程全是2.7及以下版本的,但3.0版語法卻不再兼容,升級版本會導致很多問題,今天進行了成功的升級,現記錄下來
參考文檔地址:https://github.com/chkr1011/MQTTnet/wiki/Client
上代碼:
///開源庫地址:https://github.com/chkr1011/MQTTnet ///對應文檔:https://github.com/chkr1011/MQTTnet/wiki/Client using MQTTnet; using MQTTnet.Client; using MQTTnet.Client.Options; using System; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; namespace MqttServerTest { public partial class mqtt測試工具 : Form { private IMqttClient mqttClient = null; private bool isReconnect = true; public mqtt測試工具() { InitializeComponent(); } private void Form1_Load(object sender, EventArgs e) { } private async void BtnPublish_Click(object sender, EventArgs e) { await Publish(); } private async void BtnSubscribe_ClickAsync(object sender, EventArgs e) { await Subscribe(); } private async Task Publish() { string topic = txtPubTopic.Text.Trim(); if (string.IsNullOrEmpty(topic)) { MessageBox.Show("發布主題不能為空!"); return; } string inputString = txtSendMessage.Text.Trim(); try { var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(inputString) .WithExactlyOnceQoS() .WithRetainFlag() .Build(); await mqttClient.PublishAsync(message); } catch (Exception ex) { Invoke((new Action(() => { txtReceiveMessage.AppendText($"發布主題失敗!" + Environment.NewLine + ex.Message + Environment.NewLine); }))); } } private async Task Subscribe() { string topic = txtSubTopic.Text.Trim(); if (string.IsNullOrEmpty(topic)) { MessageBox.Show("訂閱主題不能為空!"); return; } if (!mqttClient.IsConnected) { MessageBox.Show("MQTT客戶端尚未連接!"); return; } // Subscribe to a topic await mqttClient.SubscribeAsync(new TopicFilterBuilder() .WithTopic(topic) .WithAtMostOnceQoS() .Build() ); Invoke((new Action(() => { txtReceiveMessage.AppendText($"已訂閱[{topic}]主題{Environment.NewLine}"); }))); } private async Task ConnectMqttServerAsync() { // Create a new MQTT client. if (mqttClient == null) { try { var factory = new MqttFactory(); mqttClient = factory.CreateMqttClient(); var options = new MqttClientOptionsBuilder() .WithTcpServer(txtIp.Text, Convert.ToInt32(txtPort.Text)).WithCredentials(txtUsername.Text, txtPsw.Text).WithClientId(txtClientId.Text) // Port is optional .Build(); await mqttClient.ConnectAsync(options, CancellationToken.None); Invoke((new Action(() => { txtReceiveMessage.AppendText($"連接到MQTT服務器成功!" + txtIp.Text); }))); mqttClient.UseApplicationMessageReceivedHandler(e => { Invoke((new Action(() => { txtReceiveMessage.AppendText($"收到訂閱消息!" + Encoding.UTF8.GetString(e.ApplicationMessage.Payload)); }))); }); } catch (Exception ex) { Invoke((new Action(() => { txtReceiveMessage.AppendText($"連接到MQTT服務器失敗!" + Environment.NewLine + ex.Message + Environment.NewLine); }))); } } } private void MqttClient_Connected(object sender, EventArgs e) { Invoke((new Action(() => { txtReceiveMessage.Clear(); txtReceiveMessage.AppendText("已連接到MQTT服務器!" + Environment.NewLine); }))); } private void MqttClient_Disconnected(object sender, EventArgs e) { Invoke((new Action(() => { txtReceiveMessage.Clear(); DateTime curTime = new DateTime(); curTime = DateTime.UtcNow; txtReceiveMessage.AppendText($">> [{curTime.ToLongTimeString()}]"); txtReceiveMessage.AppendText("已斷開MQTT連接!" + Environment.NewLine); }))); //Reconnecting if (isReconnect) { Invoke((new Action(() => { txtReceiveMessage.AppendText("正在嘗試重新連接" + Environment.NewLine); }))); var options = new MqttClientOptionsBuilder() .WithClientId(txtClientId.Text) .WithTcpServer(txtIp.Text, Convert.ToInt32(txtPort.Text)) .WithCredentials(txtUsername.Text, txtPsw.Text) //.WithTls() .WithCleanSession() .Build(); Invoke((new Action(async () => { await Task.Delay(TimeSpan.FromSeconds(5)); try { await mqttClient.ConnectAsync(options); } catch { txtReceiveMessage.AppendText("### RECONNECTING FAILED ###" + Environment.NewLine); } }))); } else { Invoke((new Action(() => { txtReceiveMessage.AppendText("已下線!" + Environment.NewLine); }))); } } private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) { Invoke((new Action(() => { txtReceiveMessage.AppendText($">> {"### RECEIVED APPLICATION MESSAGE ###"}{Environment.NewLine}"); }))); Invoke((new Action(() => { txtReceiveMessage.AppendText($">> Topic = {e.ApplicationMessage.Topic}{Environment.NewLine}"); }))); Invoke((new Action(() => { txtReceiveMessage.AppendText($">> Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}"); }))); Invoke((new Action(() => { txtReceiveMessage.AppendText($">> QoS = {e.ApplicationMessage.QualityOfServiceLevel}{Environment.NewLine}"); }))); Invoke((new Action(() => { txtReceiveMessage.AppendText($">> Retain = {e.ApplicationMessage.Retain}{Environment.NewLine}"); }))); } private void btnLogIn_Click(object sender, EventArgs e) { isReconnect = true; Task.Run(async () => { await ConnectMqttServerAsync(); }); } private void btnLogout_Click(object sender, EventArgs e) { isReconnect = false; Task.Run(async () => { await mqttClient.DisconnectAsync(); }); } } }