MQTT學習筆記(C#)-MQTTnet


  代碼地址:https://gitee.com/qq28069933146_admin/csharp_networkprotocol_research

  演示地址:C#-MQTT調用示例演示

一,什么是MQTT:

  MQTT(消息隊列遙測傳輸)是IBM開發的即時通訊協議,是一個基於客戶端-服務器的消息發布/訂閱極其輕量級的消息傳輸協議。

  它工作在 TCP/IP協議族上,專門為網絡受限設備、低寬帶以及高延遲和不可靠的網絡而設計的(傳輸特點:至多一次、至少一次、只有一次)。使其在物聯網、小型設備、移動應用等方面有較廣泛的應用。

 1.MQTT報文:

  詳情見:https://www.cnblogs.com/hayasi/p/7743356.html

 2.MQTT三種身份:發布者(客戶端)、代理(服務器)、訂閱者(客戶端)。

 

二,示例

  可以選擇MQTTnet包或者DotNetty.Codecs.Mqtt包進行學習,這里我們選用Mqtt進行學習。

  VS2022

  Net5

  MQTTnet 4.1.4.563

 1.服務端:

  ①服務器對象初始化(new MqttFactory().CreateMqttServer(new MqttServerOptionsBuilder().Build());)

  MqttServerOptionsBuilder mqttServerOptionsBuilder = new MqttServerOptionsBuilder();  // MQTT服務器配置
  mqttServerOptionsBuilder.WithDefaultEndpoint();
  mqttServerOptionsBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip));  // 設置Server的IP
  mqttServerOptionsBuilder.WithDefaultEndpointPort(port);                           // 設置Server的端口號
  //mqttServerOptionsBuilder.WithEncryptedEndpointPort(port);                        // 使用加密的端點端口
  mqttServerOptionsBuilder.WithPersistentSessions(withPersistentSessions);  // 持續會話
  mqttServerOptionsBuilder.WithConnectionBacklog(2000);                     // 最大連接數
  //mqttServerOptionsBuilder.WithConnectionValidator(c =>  // 鑒權-方法失效
  //{
  //    if (c.Username != uName || c.Password != uPwd)
  //    {
  //        c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  //    }
  //})
 
  MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build();
  _MqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions);  // 創建服務(配置)

   補充:MqttServerOptionsBuilder的屬性(注:有些方法4.1.4.563版本不支持了)

函數名 功能說明
Build() 構建配置參數
WithApplicationMessageInterceptor() 允許處理來自客戶端的所有已發布消息
WithClientId() 服務端發布消息時使用的ClientId
WithConnectionBacklog() 設置要保留的連接數
WithConnectionValidator() 驗證連接
WithDefaultCommunicationTimeout() 設置默認的通信超時
WithDefaultEndpoint() 使用默認端點
WithDefaultEndpointBoundIPAddress() 使用默認端點IPv4地址
WithDefaultEndpointBoundIPV6Address() 使用默認端點IPv6地址
WithDefaultEndpointPort() 使用默認端點端口
WithEncryptedEndpoint() 使用加密的端點
WithEncryptedEndpointBoundIPAddress() 使用加密的端點IPv4地址
WithEncryptedEndpointBoundIPV6Address() 使用加密的端點IPv6地址
WithEncryptedEndpointPort() 使用加密的端點端口
WithEncryptionCertificate() 使用證書進行SSL連接
WithEncryptionSslProtocol() 使用SSL協議級別
WithMaxPendingMessagesPerClient() 每個客戶端允許最多未決消息
WithPersistentSessions() 保持會話
WithStorage() 使用存儲
WithSubscriptionInterceptor() 允許處理來自客戶端的所有訂閱
WithoutDefaultEndpoint() 禁用默認端點
WithoutEncryptedEndpoint() 禁用默認(SSL)端點

  ②服務器開啟(await mqttServer.StartAsync())

  await _MqttServer.StartAsync();  // 開啟服務

  ③服務器關閉(await _MqttServer.StopAsync())

  foreach (var clientStatus in _MqttServer.GetClientsAsync().Result)
  {
      await clientStatus.DisconnectAsync();
  }
  await _MqttServer.StopAsync();
  _MqttServer = null;

  ④是否對客戶端的進行驗證(賬號密碼-原方法失效)

  //mqttServerOptionsBuilder.WithConnectionValidator(c =>  // 鑒權-方法失效
  //{
  //    if (c.Username != uName || c.Password != uPwd)
  //    {
  //        c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  //    }
  //})

  ⑤給客戶端發送數據(原方法失效)

  /// <summary>
  /// 發送消息-未寫(原方法失效)
  /// </summary>
  /// <param name="Topic">主題</param>
  /// <param name="msg">消息</param>
  /// <returns></returns>
  public Task SedMessage(string Topic, string msg)
  {
      try
      {
          //var clients = _MqttServer.GetClientsAsync().Result;

          //foreach (var client in clients)
          //{

          //}
      }
      catch { }
      return Task.CompletedTask;
  }

  ⑥獲取所有的客戶端(_MqttServer.GetClientsAsync().Result.ToList())

  _MqttServer.GetClientsAsync().Result.ToList()

  ⑦服務器開啟/關閉事件(StartedAsync與StoppedAsync)

  _MqttServer.StartedAsync += StartedHandle;  // 服務器開啟事件

  /// <summary>
  /// 開啟Server的處理程序
  /// </summary>
  private Task StartedHandle(EventArgs arg)
  {
      return Task.CompletedTask;
  }

  _MqttServer.StoppedAsync += StoppedHandle;  // 服務器關閉事件

  /// <summary>
  /// 關閉Server的處理程序
  /// </summary>
  private Task StoppedHandle(EventArgs arg)
  {
      return Task.CompletedTask;
  }

  ⑧客戶端連接/斷開的處理事件

  _MqttServer.ClientConnectedAsync += ClientConnectedHandle;        // 設置客戶端連接成功后的處理程序

  /// <summary>
  /// 設置客戶端連接成功后的處理程序
  /// </summary>
  private Task ClientConnectedHandle(ClientConnectedEventArgs arg)
  {
      return Task.CompletedTask;
  }

  _MqttServer.ClientDisconnectedAsync += ClientDisconnectedHandle;  // 設置客戶端斷開后的處理程序

  /// <summary>
  /// 設置客戶端斷開后的處理程序
  /// </summary>
  private Task ClientDisconnectedHandle(ClientDisconnectedEventArgs arg)
  {
      return Task.CompletedTask;
  }

  ⑨消息被訂閱/被退訂事件

  _MqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandle;      // 設置消息訂閱通知
  _MqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandle;  // 設置消息退訂通知

  /// <summary>
  /// 設置消息訂閱通知
  /// </summary>
  private Task ClientSubscribedTopicHandle(ClientSubscribedTopicEventArgs arg)
  {
      //if (!arg.Equals("admin"))
      //{
      //    var client = clients.Where(a => a.Id == arg.ClientId).FirstOrDefault();
      //    client?.DisconnectAsync();

      //    return Task.CompletedTask;
      //}
      return Task.CompletedTask;
  }

  /// <summary>
  /// 設置消息退訂通知
  /// </summary>
  private Task ClientUnsubscribedTopicHandle(ClientUnsubscribedTopicEventArgs arg)
  {
      return Task.CompletedTask;
  }

  ⑩接收到消息時的處理程序

  _MqttServer.ApplicationMessageNotConsumedAsync += ApplicationMessageNotConsumedHandle;  // 設置消息處理程序

  /// <summary>
  /// 設置消息處理程序
  /// </summary>
  private Task ApplicationMessageNotConsumedHandle(ApplicationMessageNotConsumedEventArgs arg)
  {
      _Callback?.Invoke(new ResultData_MQTT()
      {
          ResultCode = -1,
          ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客戶端:'{arg.SenderId}'發布了消息:主題:'{arg.ApplicationMessage.Topic}'!內容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';服務質量:{arg.ApplicationMessage.QualityOfServiceLevel};保留:{arg.ApplicationMessage.Retain}"
      });

      return Task.CompletedTask;
  }

 2.客戶端:

  ①客戶端對象初始化並連接(_MqttClient.ConnectAsync(new MqttClientOptionsBuilder().Build()))

  MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder();
  mqttClientOptionsBuilder.WithTcpServer(mqttServerUrl, port);          // 設置MQTT服務器地址
  if (!string.IsNullOrEmpty(userName))
  {
      mqttClientOptionsBuilder.WithCredentials(userName, userPassword);  // 設置鑒權參數
  }
  mqttClientOptionsBuilder.WithClientId(Guid.NewGuid().ToString("N"));  // 設置客戶端序列號
  MqttClientOptions options = mqttClientOptionsBuilder.Build();
 
  _MqttClient = new MqttFactory().CreateMqttClient();
  _MqttClient.ConnectedAsync += ConnectedHandle;        // 服務器連接事件
  _MqttClient.DisconnectedAsync += DisconnectedHandle;  // 服務器斷開事件(可以寫入重連事件)
  _MqttClient.ApplicationMessageReceivedAsync += ApplicationMessageReceivedHandle;  // 發送消息事件
  await _MqttClient.ConnectAsync(options);  // 連接

  ②與服務器斷開連接(await _MqttClient.DisconnectAsync())

  await _MqttClient.DisconnectAsync();
  _MqttClient.Dispose();
  _MqttClient = null;

  ③與服務器重新連接(await _MqttClient.ReconnectAsync())

  await _MqttClient.ReconnectAsync();

  ④訂閱與退訂主題()

  MqttTopicFilter topicFilter = new MqttTopicFilterBuilder().WithTopic(topic).Build();
  await _MqttClient.SubscribeAsync(topicFilter, CancellationToken.None);  // 訂閱

  await MqttClientExtensions.UnsubscribeAsync(_MqttClient, topic, CancellationToken.None);  // 退訂

  ⑤發布消息

  MqttApplicationMessageBuilder mqttApplicationMessageBuilder = new MqttApplicationMessageBuilder();  // 設置內容
  mqttApplicationMessageBuilder.WithTopic(topic);          // 主題
  mqttApplicationMessageBuilder.WithPayload(msg);          // 信息
  mqttApplicationMessageBuilder.WithRetainFlag(retained);  // 保留
  MqttApplicationMessage messageObj = mqttApplicationMessageBuilder.Build();

  await _MqttClient.PublishAsync(messageObj, CancellationToken.None);  // 發送

  ⑥與服務器連接/斷開事件

  _MqttClient.ConnectedAsync += ConnectedHandle;        // 服務器連接事件
  _MqttClient.DisconnectedAsync += DisconnectedHandle;  // 服務器斷開事件(可以寫入重連事件)

  /// <summary>
  /// 服務器連接事件
  /// </summary>
  private Task ConnectedHandle(MqttClientConnectedEventArgs arg)
  {
      return Task.CompletedTask;
  }

  /// <summary>
  /// 服務器斷開事件(可以寫入重連事件)
  /// </summary>
  private Task DisconnectedHandle(MqttClientDisconnectedEventArgs arg)
  {
      return Task.CompletedTask;
  }

  ⑦訂閱/退訂事件

  // 略-原方法失效

  ⑧發送消息事件

  _MqttClient.ApplicationMessageReceivedAsync += ApplicationMessageReceivedHandle;  // 發送消息事件

  /// <summary>
  /// 發送消息事件
  /// </summary>
  private Task ApplicationMessageReceivedHandle(MqttApplicationMessageReceivedEventArgs arg)
  {
      string resultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient'{arg.ClientId}'內容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';主題:'{arg.ApplicationMessage.Topic}',消息等級Qos:[{arg.ApplicationMessage.QualityOfServiceLevel}],是否保留:[{arg.ApplicationMessage.Retain}]",

      return Task.CompletedTask;
  }

三、MQTTHelper

/**
*┌──────────────────────────────────────────────────────────────┐
*│ 描    述:MQTT通訊相關的工具類(MQTTnet 4.1.4.563)
*│ 作    者:執筆小白                                              
*│ 版    本:1.0                                       
*│ 創建時間:2023-3-18 10:40:56                            
*└──────────────────────────────────────────────────────────────┘
*┌──────────────────────────────────────────────────────────────┐
*│ 命名空間: MqttnetServerWin                               
*│ 類    名:MQTTHelper                                     
*└──────────────────────────────────────────────────────────────┘
*/
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Packets;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace MqttnetServerWin
{
    /// <summary>
    /// MQTT通訊相關的工具類
    /// </summary>
    public class MQTTHelper
    {
        #region 變量
        /// <summary>
        /// 記錄日志、輸出、保存等操作
        /// </summary>
        private Action<ResultData_MQTT>? _Callback = null;
        #endregion 變量

        #region Server
        /// <summary>
        /// MQTT服務
        /// </summary>
        MqttServer _MqttServer = null;

        /// <summary>
        /// 創建MQTTServer並運行
        /// </summary>
        public async Task<ResultData_MQTT> CreateMQTTServerAndStart(MqttServerOptionsBuilder mqttServerOptionsBuilder, Action<ResultData_MQTT>? callback)
        {
            ResultData_MQTT resultData_MQTT = new ResultData_MQTT();

            _Callback = callback;
            try
            {
                MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build();
                _MqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions);  // 創建服務(配置)

                _MqttServer.StartedAsync += StartedHandle;  // 服務器開啟事件
                _MqttServer.StoppedAsync += StoppedHandle;  // 服務器關閉事件
                _MqttServer.ClientConnectedAsync += ClientConnectedHandle;        // 設置客戶端連接成功后的處理程序
                _MqttServer.ClientDisconnectedAsync += ClientDisconnectedHandle;  // 設置客戶端斷開后的處理程序
                _MqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandle;      // 設置消息訂閱通知
                _MqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandle;  // 設置消息退訂通知
                _MqttServer.ApplicationMessageNotConsumedAsync += ApplicationMessageNotConsumedHandle;  // 設置消息處理程序

                await _MqttServer.StartAsync();  // 開啟服務

                if (_MqttServer.IsStarted)
                {
                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = 1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了開啟MQTTServer_成功!"
                    };
                }
                else
                {
                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = -1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了開啟MQTTServer_失敗!"
                    };
                }
            }
            catch (Exception ex)
            {
                resultData_MQTT = new ResultData_MQTT()
                {
                    ResultCode = -1,
                    ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了開啟MQTTServer_失敗!錯誤信息:" + ex.Message
                };
            }

            _Callback?.Invoke(resultData_MQTT);
            return resultData_MQTT;
        }

        /// <summary>
        /// 簡易創建MQTTServer並運行-不使用加密
        /// </summary>
        /// <param name="ip">IP</param>
        /// <param name="port">端口</param>
        /// <param name="withPersistentSessions">是否保持會話</param>
        /// <param name="callback">處理方法</param>
        /// <returns></returns>
        public async Task<ResultData_MQTT> CreateMQTTServerAndStart(string ip, int port, bool withPersistentSessions, Action<ResultData_MQTT>? callback)
        {
            ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
            _Callback = callback;

            try
            {
                MqttServerOptionsBuilder mqttServerOptionsBuilder = new MqttServerOptionsBuilder();  // MQTT服務器配置
                mqttServerOptionsBuilder.WithDefaultEndpoint();
                mqttServerOptionsBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip));  // 設置Server的IP
                mqttServerOptionsBuilder.WithDefaultEndpointPort(port);                           // 設置Server的端口號
                //mqttServerOptionsBuilder.WithEncryptedEndpointPort(port);                        // 使用加密的端點端口
                mqttServerOptionsBuilder.WithPersistentSessions(withPersistentSessions);  // 持續會話
                mqttServerOptionsBuilder.WithConnectionBacklog(2000);                     // 最大連接數
                //mqttServerOptionsBuilder.WithConnectionValidator(c =>  // 鑒權-方法失效
                //{
                //    if (c.Username != uName || c.Password != uPwd)
                //    {
                //        c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                //    }
                //})

                MqttServerOptions mqttServerOptions = mqttServerOptionsBuilder.Build();
                _MqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions);  // 創建服務(配置)

                _MqttServer.StartedAsync += StartedHandle;  // 服務器開啟事件
                _MqttServer.StoppedAsync += StoppedHandle;  // 服務器關閉事件
                _MqttServer.ClientConnectedAsync += ClientConnectedHandle;        // 設置客戶端連接成功后的處理程序
                _MqttServer.ClientDisconnectedAsync += ClientDisconnectedHandle;  // 設置客戶端斷開后的處理程序
                _MqttServer.ClientSubscribedTopicAsync += ClientSubscribedTopicHandle;      // 設置消息訂閱通知
                _MqttServer.ClientUnsubscribedTopicAsync += ClientUnsubscribedTopicHandle;  // 設置消息退訂通知
                _MqttServer.ValidatingConnectionAsync += ValidatingConnectionHandle;                    // 鑒權-未完
                _MqttServer.ApplicationMessageNotConsumedAsync += ApplicationMessageNotConsumedHandle;  // 設置消息處理程序

                await _MqttServer.StartAsync();  // 開啟服務

                if (_MqttServer.IsStarted)
                {
                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = 1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了開啟MQTTServer_成功!"
                    };
                }
                else
                {
                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = -1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了開啟MQTTServer_失敗!"
                    };
                }
            }
            catch (Exception ex)
            {
                resultData_MQTT = new ResultData_MQTT()
                {
                    ResultCode = -1,
                    ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了開啟MQTTServer_失敗!錯誤信息:" + ex.Message
                };
            }

            _Callback?.Invoke(resultData_MQTT);
            return resultData_MQTT;
        }

        /// <summary>
        /// 關閉MQTTServer
        /// </summary>
        public async Task<ResultData_MQTT> StopMQTTServer()
        {
            ResultData_MQTT resultData_MQTT = new ResultData_MQTT();

            try
            {
                if (_MqttServer == null)
                {
                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = -1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了關閉MQTTServer_出錯!MQTTServer未在運行。"
                    };
                }
                else
                {
                    foreach (var clientStatus in _MqttServer.GetClientsAsync().Result)
                    {
                        await clientStatus.DisconnectAsync();
                    }
                    await _MqttServer.StopAsync();
                    _MqttServer = null;

                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = 1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了關閉MQTTServer_成功!"
                    };
                }
            }
            catch (Exception ex)
            {
                resultData_MQTT = new ResultData_MQTT()
                {
                    ResultCode = -1,
                    ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了關閉MQTTServer_失敗!錯誤信息:" + ex.Message
                };
            }

            _Callback?.Invoke(resultData_MQTT);
            return resultData_MQTT;
        }

        /// <summary>
        /// 獲取所有的客戶端
        /// </summary>
        public List<MqttClientStatus> GetClientsAsync()
        {
            return _MqttServer.GetClientsAsync().Result.ToList();
        }

        /// <summary>
        /// 發送消息-未寫
        /// </summary>
        /// <param name="Topic">主題</param>
        /// <param name="msg">消息</param>
        /// <returns></returns>
        public Task SedMessage(string Topic, string msg)
        {
            try
            {
                //var clients = _MqttServer.GetClientsAsync().Result;

                //foreach (var client in clients)
                //{

                //}
            }
            catch { }
            return Task.CompletedTask;
        }
        #region 處理事件
        /// <summary>
        /// 開啟Server的處理程序
        /// </summary>
        private Task StartedHandle(EventArgs arg)
        {
            _Callback?.Invoke(new()
            {
                ResultCode = 1,
                ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已開啟!"
            });
            return Task.CompletedTask;
        }

        /// <summary>
        /// 關閉Server的處理程序
        /// </summary>
        private Task StoppedHandle(EventArgs arg)
        {
            _Callback?.Invoke(new()
            {
                ResultCode = 1,
                ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>MQTTServer已關閉!"
            });
            return Task.CompletedTask;
        }

        /// <summary>
        /// 設置客戶端連接成功后的處理程序
        /// </summary>
        private Task ClientConnectedHandle(ClientConnectedEventArgs arg)
        {
            var clients = _MqttServer.GetClientsAsync().Result;

            _Callback?.Invoke(new()
            {
                ResultCode = 1,
                ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客戶端'{arg.ClientId}'已成功連接!當前客戶端連接數:{clients?.Count}個。"
            });
            return Task.CompletedTask;
        }

        /// <summary>
        /// 設置客戶端斷開后的處理程序
        /// </summary>
        private Task ClientDisconnectedHandle(ClientDisconnectedEventArgs arg)
        {
            var clients = _MqttServer.GetClientsAsync().Result;
            _Callback?.Invoke(new ResultData_MQTT()
            {
                ResultCode = 1,
                ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客戶端'{arg.ClientId}'已斷開連接!當前客戶端連接數:{clients?.Count}個。"
            });

            return Task.CompletedTask;
        }

        /// <summary>
        /// 設置消息訂閱通知
        /// </summary>
        private Task ClientSubscribedTopicHandle(ClientSubscribedTopicEventArgs arg)
        {
            //if (!arg.Equals("admin"))
            //{
            //    var client = clients.Where(a => a.Id == arg.ClientId).FirstOrDefault();
            //    client?.DisconnectAsync();

            //    return Task.CompletedTask;
            //}

            _Callback?.Invoke(new ResultData_MQTT()
            {
                ResultCode = 1,
                ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客戶端'{arg.ClientId}'訂閱了主題'{arg.TopicFilter.Topic}',主題服務質量:'{arg.TopicFilter.QualityOfServiceLevel}'!"
            });

            return Task.CompletedTask;
        }

        /// <summary>
        /// 設置消息退訂通知
        /// </summary>
        private Task ClientUnsubscribedTopicHandle(ClientUnsubscribedTopicEventArgs arg)
        {
            _Callback?.Invoke(new ResultData_MQTT()
            {
                ResultCode = 1,
                ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客戶端{arg.ClientId}退訂了主題{arg.TopicFilter}!"
            });

            return Task.CompletedTask;
        }

        /// <summary>
        /// 鑒權-未寫完
        /// </summary>
        /// <returns></returns>
        private Task ValidatingConnectionHandle(ValidatingConnectionEventArgs arg)  // 鑒權
        {
            if (arg.UserName != "Admin" || arg.Password != "Admin123")
            {

            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 設置消息處理程序
        /// </summary>
        private Task ApplicationMessageNotConsumedHandle(ApplicationMessageNotConsumedEventArgs arg)
        {
            _Callback?.Invoke(new ResultData_MQTT()
            {
                ResultCode = -1,
                ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>客戶端:'{arg.SenderId}'發布了消息:主題:'{arg.ApplicationMessage.Topic}'!內容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';服務質量:{arg.ApplicationMessage.QualityOfServiceLevel};保留:{arg.ApplicationMessage.Retain}"
            });

            return Task.CompletedTask;
        }
        #endregion 處理事件
        #endregion Server

        #region Client
        /// <summary>
        /// 客戶端
        /// </summary>
        IMqttClient _MqttClient = null;

        /// <summary>
        /// 創建MQTTClient並運行
        /// </summary>
        /// <param name="mqttClientOptionsBuilder">MQTTClient連接配置</param>
        /// <param name="callback">信息處理邏輯</param>
        /// <returns></returns>
        public async Task<ResultData_MQTT> CreateMQTTClientAndStart(MqttClientOptionsBuilder mqttClientOptionsBuilder, Action<ResultData_MQTT>? callback)
        {
            ResultData_MQTT resultData_MQTT = new ResultData_MQTT();

            _Callback = callback;
            try
            {
                MqttClientOptions options = mqttClientOptionsBuilder.Build();

                _MqttClient = new MqttFactory().CreateMqttClient();
                _MqttClient.ConnectedAsync += ConnectedHandle;        // 服務器連接事件
                _MqttClient.DisconnectedAsync += DisconnectedHandle;  // 服務器斷開事件(可以寫入重連事件)
                _MqttClient.ApplicationMessageReceivedAsync += ApplicationMessageReceivedHandle;  // 發送消息事件
                await _MqttClient.ConnectAsync(options);  // 連接

                if (_MqttClient.IsConnected)
                {
                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = 1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了開啟MQTTClient_成功!"
                    };
                }
                else
                {
                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = -1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了開啟MQTTClient_失敗!"
                    };
                }
            }
            catch (Exception ex)
            {
                resultData_MQTT = new ResultData_MQTT()
                {
                    ResultCode = -1,
                    ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了開啟MQTTClient_失敗!錯誤信息:" + ex.Message
                };
            }

            _Callback?.Invoke(resultData_MQTT);
            return resultData_MQTT;
        }

        /// <summary>
        /// 簡易創建MQTTClient並運行
        /// </summary>
        /// <param name="mqttServerUrl">mqttServer的Url</param>
        /// <param name="port">mqttServer的端口</param>
        /// <param name="userName">認證用用戶名</param>
        /// <param name="userPassword">認證用密碼</param>
        /// <param name="callback">信息處理邏輯</param>
        /// <returns></returns>
        public async Task<ResultData_MQTT> CreateMQTTClientAndStart(string mqttServerUrl, int port, string userName, string userPassword, Action<ResultData_MQTT>? callback)
        {
            ResultData_MQTT resultData_MQTT = new ResultData_MQTT();

            _Callback = callback;
            try
            {
                MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder();
                mqttClientOptionsBuilder.WithTcpServer(mqttServerUrl, port);          // 設置MQTT服務器地址
                if (!string.IsNullOrEmpty(userName))
                {
                    mqttClientOptionsBuilder.WithCredentials(userName, userPassword);  // 設置鑒權參數
                }
                mqttClientOptionsBuilder.WithClientId(Guid.NewGuid().ToString("N"));  // 設置客戶端序列號
                MqttClientOptions options = mqttClientOptionsBuilder.Build();

                _MqttClient = new MqttFactory().CreateMqttClient();
                _MqttClient.ConnectedAsync += ConnectedHandle;        // 服務器連接事件
                _MqttClient.DisconnectedAsync += DisconnectedHandle;  // 服務器斷開事件(可以寫入重連事件)
                _MqttClient.ApplicationMessageReceivedAsync += ApplicationMessageReceivedHandle;  // 發送消息事件
                await _MqttClient.ConnectAsync(options);  // 連接

                if (_MqttClient.IsConnected)
                {
                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = 1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了開啟MQTTClient_成功!"
                    };
                }
                else
                {
                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = -1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了開啟MQTTClient_失敗!"
                    };
                }
            }
            catch (Exception ex)
            {
                resultData_MQTT = new ResultData_MQTT()
                {
                    ResultCode = -1,
                    ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了開啟MQTTClient_失敗!錯誤信息:" + ex.Message
                };
            }
            _Callback?.Invoke(resultData_MQTT);
            return resultData_MQTT;
        }

        /// <summary>
        /// 關閉MQTTClient
        /// </summary>
        public async Task DisconnectAsync_Client()
        {
            ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
            try
            {
                if (_MqttClient != null && _MqttClient.IsConnected)
                {
                    await _MqttClient.DisconnectAsync();
                    _MqttClient.Dispose();
                    _MqttClient = null;

                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = 1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了關閉MQTTClient_成功!"
                    };
                }
                else
                {
                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = -1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了關閉MQTTClient_失敗!MQTTClient未開啟連接!"
                    };
                }
            }
            catch (Exception ex)
            {
                resultData_MQTT = new ResultData_MQTT()
                {
                    ResultCode = -1,
                    ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了關閉MQTTClient_失敗!錯誤信息:" + ex.Message
                };
            }
            _Callback?.Invoke(resultData_MQTT);
        }

        /// <summary>
        /// 重連
        /// </summary>
        /// <returns></returns>
        public async Task ReconnectAsync_Client()
        {
            ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
            try
            {
                if (_MqttClient != null)
                {
                    await _MqttClient.ReconnectAsync();
                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = 1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了MQTTClient重連_成功!"
                    };
                }
                else
                {
                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = -1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了MQTTClient重連_失敗!未設置MQTTClient連接!"
                    };
                }
            }
            catch (Exception ex)
            {
                resultData_MQTT = new ResultData_MQTT()
                {
                    ResultCode = -1,
                    ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了MQTTClient重連_失敗!錯誤信息:" + ex.Message
                };
            }
            _Callback?.Invoke(resultData_MQTT);
        }

        /// <summary>
        /// 訂閱
        /// </summary>
        /// <param name="topic">主題</param>
        public async void SubscribeAsync_Client(string topic)
        {
            ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
            try
            {
                MqttTopicFilter topicFilter = new MqttTopicFilterBuilder().WithTopic(topic).Build();
                await _MqttClient.SubscribeAsync(topicFilter, CancellationToken.None);

                resultData_MQTT = new ResultData_MQTT()
                {
                    ResultCode = 1,
                    ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient執行了訂閱'{topic}'_成功!"
                };
            }
            catch (Exception ex)
            {
                resultData_MQTT = new ResultData_MQTT()
                {
                    ResultCode = -1,
                    ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient執行了訂閱'{topic}'_失敗!錯誤信息:" + ex.Message
                };
            }
            _Callback?.Invoke(resultData_MQTT);
        }
        /// <summary>
        /// 退訂閱
        /// </summary>
        /// <param name="topic">主題</param>
        public async void UnsubscribeAsync_Client(string topic)
        {
            ResultData_MQTT resultData_MQTT = new ResultData_MQTT();
            try
            {
                await MqttClientExtensions.UnsubscribeAsync(_MqttClient, topic, CancellationToken.None);
                resultData_MQTT = new ResultData_MQTT()
                {
                    ResultCode = 1,
                    ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient執行了退訂'{topic}'_成功!"
                };
            }
            catch (Exception ex)
            {
                resultData_MQTT = new ResultData_MQTT()
                {
                    ResultCode = -1,
                    ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient執行退訂'{topic}'_失敗!錯誤信息:" + ex.Message
                };
            }
            _Callback?.Invoke(resultData_MQTT);
        }

        /// <summary>
        /// 發布消息( 必須在成功連接以后才生效 )
        /// </summary>
        /// <param name="topic">主題</param>
        /// <param name="msg">信息</param>
        /// <param name="retained">是否保留</param>
        /// <returns></returns>
        public async Task PublishAsync_Client(string topic, string msg, bool retained)
        {
            ResultData_MQTT resultData_MQTT = new ResultData_MQTT();

            try
            {
                MqttApplicationMessageBuilder mqttApplicationMessageBuilder = new MqttApplicationMessageBuilder();
                mqttApplicationMessageBuilder.WithTopic(topic);          // 主題
                mqttApplicationMessageBuilder.WithPayload(msg);          // 信息
                mqttApplicationMessageBuilder.WithRetainFlag(retained);  // 保留

                MqttApplicationMessage messageObj = mqttApplicationMessageBuilder.Build();

                if (_MqttClient.IsConnected)
                {
                    await _MqttClient.PublishAsync(messageObj, CancellationToken.None);

                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = 1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>執行了發布信息_成功!主題:'{topic}',信息:'{msg}',是否保留:'{retained}'"
                    };
                }
                else
                {
                    // 未連接
                    resultData_MQTT = new ResultData_MQTT()
                    {
                        ResultCode = -1,
                        ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了發布信息_失敗!MQTTClient未開啟連接!"
                    };
                }
            }
            catch (Exception ex)
            {
                resultData_MQTT = new ResultData_MQTT()
                {
                    ResultCode = -1,
                    ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>執行了發布信息_失敗!錯誤信息:" + ex.Message
                };
            }

            _Callback?.Invoke(resultData_MQTT);
        }

        #region 事件
        /// <summary>
        /// 服務器連接事件
        /// </summary>
        private Task ConnectedHandle(MqttClientConnectedEventArgs arg)
        {
            _Callback?.Invoke(new ResultData_MQTT()
            {
                ResultCode = 1,
                ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + ">>>已連接到MQTT服務器!"
            });
            return Task.CompletedTask;
        }

        /// <summary>
        /// 服務器斷開事件(可以寫入重連事件)
        /// </summary>
        private Task DisconnectedHandle(MqttClientDisconnectedEventArgs arg)
        {
            _Callback?.Invoke(new ResultData_MQTT()
            {
                ResultCode = 1,
                ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>已斷開與MQTT服務器連接!"
            });
            return Task.CompletedTask;
        }

        /// <summary>
        /// 發送消息事件
        /// </summary>
        private Task ApplicationMessageReceivedHandle(MqttApplicationMessageReceivedEventArgs arg)
        {
            _Callback?.Invoke(new ResultData_MQTT()
            {
                ResultCode = 1,
                ResultMsg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff") + $">>>MQTTClient'{arg.ClientId}'內容:'{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}';主題:'{arg.ApplicationMessage.Topic}',消息等級Qos:[{arg.ApplicationMessage.QualityOfServiceLevel}],是否保留:[{arg.ApplicationMessage.Retain}]",
                ResultObject1 = arg.ApplicationMessage.Topic,
                ResultObject2 = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)
            });
            return Task.CompletedTask;
        }
        #endregion 事件
        #endregion Client
    }

    /// <summary>
    /// 信息載體
    /// </summary>
    public class ResultData_MQTT
    {
        /// <summary>
        /// 結果Code
        /// 正常1,其他為異常;0不作為回復結果
        /// </summary>
        public int ResultCode { get; set; } = 0;

        /// <summary>
        /// 結果信息
        /// </summary>
        public string ResultMsg { get; set; } = string.Empty;

        /// <summary>
        /// 擴展1
        /// </summary>
        public object? ResultObject1 { get; set; } = string.Empty;

        /// <summary>
        /// 擴展2
        /// </summary>
        public object? ResultObject2 { get; set; } = string.Empty;
    }
}

四、分布式MQTT消息服務器推薦

  EMQ X(簡稱 EMQ):是一款完全開源,高度可伸縮,高可用的分布式 MQTT 消息服務器,同時也支持 CoAP/LwM2M 一站式 IoT 協議接入。可處理千萬級別的並發客戶端。

 

參考:

  https://www.cnblogs.com/hayasi/p/7743356.html

  https://www.cnblogs.com/dathlin/p/11631894.html

  https://www.cnblogs.com/sxkgeek/p/9140180.html

  https://www.shangmayuan.com/a/67b2f4a9f2c440e9b2db3a97.html

  https://zhuanlan.zhihu.com/p/419561816

  https://blog.csdn.net/qq_37258787/article/details/80183923

  https://www.cnblogs.com/dathlin/p/11631894.html

  https://www.jianshu.com/p/a371c6ac076b


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM