使用MQTTnet部署MQTT服務(轉)


使用MQTTnet部署MQTT服務

下載地址:https://github.com/chkr1011/MQTTnet

一. 服務端

1. 創建配置參數

可以使用 `var options = new MqttServerOptions();` 直接構建一個options。你也可以通過參數構建器 `var options = new MqttServerOptionsBuilder();` 使代碼更簡潔美觀。

構建器的函數說明:

函數名 功能說明
Build 構建配置參數
WithApplicationMessageInterceptor 允許處理來自客戶端的所有已發布消息
WithClientId 服務端發布消息時使用的ClientId
WithConnectionBacklog 設置要保留的連接數
WithConnectionValidator 驗證連接
WithDefaultCommunicationTimeout 設置默認的通信超時
WithDefaultEndpoint 使用默認端點
WithDefaultEndpointBoundIPAddress 使用默認端點IPv4地址
WithDefaultEndpointBoundIPV6Address 使用默認端點IPv6地址
WithDefaultEndpointPort 使用默認端點端口
WithEncryptedEndpoint 使用加密的端點
WithEncryptedEndpointBoundIPAddress 使用加密的端點IPv4地址
WithEncryptedEndpointBoundIPV6Address 使用加密的端點IPv6地址
WithEncryptedEndpointPort 使用加密的端點端口
WithEncryptionCertificate 使用證書進行SSL連接
WithEncryptionSslProtocol 使用SSL協議級別
WithMaxPendingMessagesPerClient 每個客戶端允許最多未決消息
WithPersistentSessions 保持會話
WithStorage 使用存儲
WithSubscriptionInterceptor 允許處理來自客戶端的所有訂閱
WithoutDefaultEndpoint 禁用默認端點
WithoutEncryptedEndpoint 禁用默認(SSL)端點

驗證賬號密碼

  1. options.WithConnectionValidator(c =>
  2. {
  3. if (c.Username != "seven")
  4. {
  5. c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
  6. }
  7. });

2. 啟動服務端

  1. var server = new MqttFactory().CreateMqttServer();
  2. server.StartAsync(options.Build());

服務啟動事件

  1. server.StartedHandler = new MqttServerStartedHandlerDelegate(Started);
  2. static async Task Started(EventArgs e)
  3. {
  4. Console.WriteLine("Started");
  5. }

服務關閉事件

  1. server.StoppedHandler = new MqttServerStoppedHandlerDelegate(Stopped);
  2. static async Task Stopped(EventArgs e)
  3. {
  4. Console.WriteLine("Stopped");
  5. }

客戶端連接事件

  1. server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(Connected);
  1. server.UseClientConnectedHandler(c => Connected(c));
  2. static async Task Connected(MqttServerClientConnectedEventArgs e)
  3. {
  4. Console.WriteLine($"{e.ClientId} connected");
  5. }

客戶端斷開事件

  1. server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(Disconnected);
  1. server.UseClientDisconnectedHandler(c => Disconnected(c));
  2. static async Task Disconnected(MqttServerClientDisconnectedEventArgs e)
  3. {
  4. Console.WriteLine($"{e.ClientId} disconnected");
  5. }

客戶端訂閱Topic

  1. server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(Subscribed);
  2. static async Task Subscribed(MqttServerClientSubscribedTopicEventArgs e)
  3. {
  4. Console.WriteLine($"{e.ClientId} subscribed {e.TopicFilter.Topic}");
  5. }

客戶端取消訂閱Topic

  1. server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(c => Unsubscribed(c));
  2. static async Task Unsubscribed(MqttServerClientUnsubscribedTopicEventArgs e)
  3. {
  4. Console.WriteLine($"{e.ClientId} unsubscribed {e.TopicFilter}");
  5. }

消息接收

  1. server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceived);
  1. server.UseApplicationMessageReceivedHandler(c => MessageReceived(c));
  2. static async Task MessageReceived(MqttApplicationMessageReceivedEventArgs e)
  3. {
  4. Console.WriteLine($"{e.ClientId} get {e.ApplicationMessage.Topic}");
  5. }

3. 操作

發布消息

  1. var message = new MqttApplicationMessage()
  2. {
  3. Topic = "testTopic",
  4. Payload = Encoding.UTF8.GetBytes("hello seven")
  5. };
  6. server.PublishAsync(message);

查詢客戶端狀態

  1. var list = server.GetClientStatusAsync().Result;

獲取會話信息

  1. var sessions = server.GetSessionStatusAsync().Result;

查詢Retain的消息

  1. var messages = server.GetRetainedApplicationMessagesAsync().Result;

清空Retain消息

  1. server.ClearRetainedApplicationMessagesAsync();

停止服務

  1. server.StopAsync();

4. 其他

自帶的日志跟蹤

  1. var logger = new MqttNetLogger();
  2. logger.LogMessagePublished += LogMessagePublished;
  3. private static void LogMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
  4. {
  5. Console.WriteLine(e.LogMessage);
  6. Console.WriteLine(e.TraceMessage);
  7. }

二. 客戶端

1. 創建配置參數

可以使用 `var options = new MqttClientOptions();` 直接構建一個options。你也可以通過參數構建器 `var options = new MqttClientOptionsBuilder();` 使代碼更簡潔美觀。

構建器的函數說明:

函數名 功能說明
Build 構建配置參數
WithAuthentication 允許使用不同的身份驗證模式
WithCleanSession 將客戶端與MQTT干凈會話支持一起使用
WithClientId 設置客戶端ID
WithCommunicationTimeout 設置通信超時
WithCredentials 設置登錄憑證
WithExtendedAuthenticationExchangeHandler 以自定義方式處理身份驗證
WithKeepAlivePeriod 設置保持有效期
WithKeepAliveSendInterval 設置保活的發送間隔
WithMaximumPacketSize 設置最大數據包大小
WithNoKeepAlive 不要使用保持活動狀態
WithProtocolVersion 設置MQTT協議版本
WithProxy 設置代理
WithTls 客戶端使用SSL/TLS
WithTopicAliasMaximum 允許最大數量的主題別名
WithReceiveMaximum 允許最大數量的已接收數據包
WithRequestProblemInformation 顯示請求問題信息
WithRequestResponseInformation 顯示請求響應問題信息
WithSessionExpiryInterval 一段時間后終止會話
WithTcpServer 告訴客戶端(通過TCP)連接到哪個MQTT代理。
WithWebSocketServer 告訴客戶端(通過WebSocket)連接到哪個MQTT代理
WithWillMessage 告訴客戶端最后一條消息將被發送。
WithWillDelayInterval 告訴客戶端最后一個消息得延遲間隔

2. 連接服務端

  1. var client = new MqttFactory().CreateMqttClient();
  2. client.ConnectAsync(options.Build());

客戶端連接完成事件

  1. client.UseConnectedHandler(e => Connected(e));
  2. static async Task Connected(MqttClientConnectedEventArgs e)
  3. {
  4. Console.WriteLine($"Connected");
  5. }

客戶端斷開事件

  1. client.UseDisconnectedHandler(e => Disconnected(e));
  2. static async Task Disconnected(MqttClientDisconnectedEventArgs e)
  3. {
  4. Console.WriteLine($"Disconnected");
  5. }

訂閱主題(必須在成功連接以后才生效)

  1. client.UseApplicationMessageReceivedHandler(e => MessageReceived(e));
  2. static async Task MessageReceived(MqttApplicationMessageReceivedEventArgs e)
  3. {
  4. Console.WriteLine($"{e.ClientId}");
  5. }

發布消息( 必須在成功連接以后才生效 )

  1. var message = new MqttApplicationMessageBuilder()
  2. .WithTopic("myTopic")
  3. .WithPayload("seven365.cn")
  4. .WithExactlyOnceQoS()
  5. .WithRetainFlag()
  6. .Build();
  7. client.PublishAsync(message, CancellationToken.None);

3. 操作

客戶端重連

  1. client.ReconnectAsync();

客戶端斷開

  1. client.DisconnectAsync


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM