1、環境
MQTTnet(3.1.2)、net6.0
Client · dotnet/MQTTnet Wiki · GitHub
2、Form1.cs
using MQTTnet; using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; using MQTTnet.Protocol; using System.Text; using WinFormsMq.core; namespace WinFormsMq { public partial class Form1 : Form { private MqttClient mqttClient = null; public Form1() { InitializeComponent(); } public void Init() { cmbQos.SelectedIndex = 0; cmbRetain.SelectedIndex = 0; } ///<summary> ///連接服務器 /// </summary> /// private async Task ConnectMqttServerAsync() { try { if (mqttClient == null||!mqttClient.IsConnected) { var mqttFactory = new MqttFactory(); mqttClient = mqttFactory.CreateMqttClient() as MqttClient; mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnMqttClientConnected); mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnMqttClientDisConnected); mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnSubscriberMessageReceived); var tcpServer = txtIPAddr.Text;//mqtt服務器地址 var tcpPort = int.Parse(txtPort.Text.Trim()); var mqttUser = txtUserName.Text.Trim(); var mqttPassword = txtPWD.Text.Trim(); var options = new MqttClientOptions { ClientId = txtClientID.Text.Trim(), ProtocolVersion = MQTTnet.Formatter.MqttProtocolVersion.V311, ChannelOptions = new MqttClientTcpOptions { Server = tcpServer, Port = tcpPort, }, WillDelayInterval = 100, WillMessage = new MqttApplicationMessage() { Topic = $"LastWill/{txtClientID.Text.Trim()}", Payload = Encoding.UTF8.GetBytes("I lost the connection!"), QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce } }; if (options.ChannelOptions == null) { throw new InvalidOperationException(); } if (!string.IsNullOrEmpty(mqttUser)) { options.Credentials = new MqttClientCredentials { Username = mqttUser, Password = Encoding.UTF8.GetBytes(mqttPassword) }; } options.CleanSession = true; options.KeepAlivePeriod = TimeSpan.FromSeconds(5); await mqttClient.ConnectAsync(options); //客戶端嘗試連接 } } catch(Exception ex) { //客戶端嘗試連接出錯 this.Invoke(new Action(() => { txtReceiveMessage.AppendText($"MQTT服務器失敗!" + Environment.NewLine+ex.Message+Environment.NewLine); })); } } public void OnMqttClientConnected(MqttClientConnectedEventArgs e) { this.Invoke(new Action(() => { txtReceiveMessage.AppendText("已連接到MQTT服務器!" + Environment.NewLine); })); } public void OnMqttClientDisConnected(MqttClientDisconnectedEventArgs e) { this.Invoke(new Action(() => { txtReceiveMessage.AppendText("客戶機已斷開!" + Environment.NewLine); })); } public void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs e) { this.Invoke(new Action(() => { txtReceiveMessage.AppendText($">>{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}"); })); } ///<summary> ///客戶機斷開 /// </summary> private async Task ClientStop() { try { if(mqttClient != null) { await mqttClient.DisconnectAsync(); mqttClient=null; } else { return; } }catch (Exception ex) { //客戶端嘗試斷開server出錯 } } /// <summary> /// 發布消息 /// </summary> public async void ClientPublishMqttTopic(string topic,string payload) { try { var message = new MqttApplicationMessage() { Topic = topic, Payload = Encoding.UTF8.GetBytes(payload), QualityOfServiceLevel = (MqttQualityOfServiceLevel)cmbQos.SelectedIndex, Retain = bool.Parse(cmbRetain.SelectedItem.ToString()), }; await mqttClient.PublishAsync(message); //客戶端發送成工 mqttClient.Options.ClientId topic }catch(Exception ex) { //客戶端發送異常 this.Invoke(new Action(() => { txtReceiveMessage.AppendText(Logger.TraceLog(Logger.LogLevel.Info, String.Format($"發布消息失敗{{1}}!{Environment.NewLine}", ex.Message))); })); } } /// <summary> /// 傳入消息主題 訂閱消息 /// </summary> /// <param name="topic"></param> public async void ClientSubscribeTopic(string topic) { await mqttClient.SubscribeAsync(topic); //訂閱成功 this.Invoke(new Action(() => { txtReceiveMessage.AppendText(Logger.TraceLog(Logger.LogLevel.Info,String.Format($"客戶端{{0}}訂閱主題{{1}}成功!{Environment.NewLine}", mqttClient.Options.ClientId,topic))); })); } public async void ClientUnsubscribeTopic(string topic) { await mqttClient.UnsubscribeAsync(topic); //取消訂閱 //訂閱成功 this.Invoke(new Action(() => { txtReceiveMessage.AppendText(Logger.TraceLog(Logger.LogLevel.Info, String.Format($"客戶端{{0}}取消訂閱主題{{1}}成功!{Environment.NewLine}", mqttClient.Options.ClientId, topic))); })); } private void butCon_Click(object sender, EventArgs e) { Task.Run(async () => { await ConnectMqttServerAsync(); }); } private void Form1_Load(object sender, EventArgs e) { Init(); } /// <summary> /// 訂閱 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void BtnSubscribe_Click(object sender, EventArgs e) { string topic=txtSubTopic.Text.Trim(); if (string.IsNullOrEmpty(topic)) { MessageBox.Show("訂閱主題不能為空!"); return; } else if (!mqttClient.IsConnected) { MessageBox.Show("MQTT客戶端尚未連接"); return; } else { ClientSubscribeTopic(topic); } } private void BtnPublish_Click(object sender, EventArgs e) { string pubtopic=txtPubTopic.Text.Trim(); if(string.IsNullOrEmpty(pubtopic)) { MessageBox.Show("發布主題不能為空!"); return; } string inputString = txtSendMessage.Text.Trim(); ClientPublishMqttTopic(pubtopic, inputString); } private void BtnUnSub_Click(object sender, EventArgs e) { string topic = txtSubTopic.Text.Trim(); if (string.IsNullOrEmpty(topic)) { MessageBox.Show("取消訂閱主題不能為空!"); return; } if (!mqttClient.IsConnected) { MessageBox.Show("MQTT客戶端尚未連接"); return; } ClientUnsubscribeTopic(topic); } private void butUnCon_Click(object sender, EventArgs e) { ClientStop(); } } }
