.Net5開發MQTT服務器


.Net5開發MQTT服務器主要借助MQTTnet包,自主開發MQTT服務器,經測試,非常穩定。

 

  

using IoT;
using JieYun.Admin.Net5;
using JieYun.IoT.Common.Models;
using JieYun.IoT.Server.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using static IoT.IoTRpc;

namespace JieYun.IoT.Server
{
    public class ServerWorker : BackgroundService
    {
        public static IMqttServer mqttServer;

        private readonly ILogger<ServerWorker> _logger;
        private readonly IoTRpcClient _client;

        public ServerWorker(ILogger<ServerWorker> logger, IoTRpcClient client, IServiceProvider provider)
        {
            _logger = logger;
            _client = client;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            await StartMqttServer();
        }

        //啟動Mqtt服務器
        private async Task StartMqttServer()
        {
            try
            {
                //驗證客戶端信息
                string hostIp = AppConfigProvider.AppConfig.IoTServerAddress;//IP地址
                int hostPort = AppConfigProvider.AppConfig.IoTServerPort;//端口號
                int timeout = 5;//超時時間
                string username = "admin";//用戶名
                string password = "admin";//密碼

                var optionBuilder = new MqttServerOptionsBuilder()
                  // .WithDefaultEndpointBoundIPAddress(System.Net.IPAddress.Parse(hostIp))
                   .WithDefaultEndpointPort(hostPort)
                   .WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(timeout))
                   .WithConnectionValidator(t =>
                   {
                       if (t.Username != username || t.Password != password)
                       {
                           t.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;
                       }
                       t.ReasonCode = MqttConnectReasonCode.Success;
                   });
                var options = optionBuilder.Build();

                //創建Mqtt服務器
                mqttServer = new MqttFactory().CreateMqttServer();

                //開啟訂閱事件
                mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(MqttNetServer_SubscribedTopic);

                //取消訂閱事件
                mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttNetServer_UnSubscribedTopic);

                //客戶端消息事件
                mqttServer.UseApplicationMessageReceivedHandler(MqttServe_ApplicationMessageReceivedAsync);

                //客戶端連接事件
                mqttServer.UseClientConnectedHandler(MqttNetServer_ClientConnected);

                //客戶端斷開事件
                mqttServer.UseClientDisconnectedHandler(MqttNetServer_ClientDisConnected);

                //啟動服務器
                await mqttServer.StartAsync(options);

                _logger.LogInformation("MQTT服務器已啟動.");
            }
            catch (Exception e)
            {
                _logger.LogError($"MQTT服務啟動失敗:{e}");
            }
        }

        /// <summary>
        /// 客戶訂閱
        /// </summary>
        private void MqttNetServer_SubscribedTopic(MqttServerClientSubscribedTopicEventArgs e)
        {
            //客戶端Id
            var ClientId = e.ClientId;
            var Topic = e.TopicFilter.Topic;
            _logger.LogInformation($"客戶端【{ClientId}】訂閱:{Topic}");
        }

        /// <summary>
        /// 客戶取消訂閱
        /// </summary>
        private void MqttNetServer_UnSubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e)
        {
            //客戶端Id
            var ClientId = e.ClientId;
            var Topic = e.TopicFilter;
            _logger.LogInformation($"客戶端【{ClientId}】取消訂閱:{Topic}");
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        private async Task MqttServe_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
        {
            _logger.LogInformation(e.ApplicationMessage.ToConsoleMessage());

            //轉發消息到WebClient
            var msgStr = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
            var msg = JsonSerializer.Deserialize<MQTTMessage>(msgStr);

            if(msg.To == "SERVER"&&e.ApplicationMessage.Topic == MQTTTOPIC.UPDATE)
            {
                var msgSend = new MQTTMessage()
                {
                    From = msg.From,
                    To = "WebClient",
                    Msg = msg.Msg
                };
                await SendMessageToWebClient(msgSend);
            }
        }

        /// <summary>
        /// 客戶連接
        /// </summary>
        private async Task MqttNetServer_ClientConnected(MqttServerClientConnectedEventArgs e)
        {
            var ClientId = e.ClientId;
            _logger.LogInformation($"{DateTime.Now} 客戶端【{ClientId}】已連接");
           
            //通知服務器,客戶端連接了
            var msg = new MQTTMessage()
            {
                From = ClientId,
                To = "SERVER",
                Msg = ClientStatus.Connected
            };
            await SendMessageToWebClient(msg);
        }

        /// <summary>
        /// 客戶連接斷開
        /// </summary>
        private async Task MqttNetServer_ClientDisConnected(MqttServerClientDisconnectedEventArgs e)
        {
            var ClientId = e.ClientId;
            
            _logger.LogInformation($"{DateTime.Now} 客戶端【{ClientId}】已斷開");

            //通知服務器,客戶端斷開了
            var msg = new MQTTMessage()
            {
                From = ClientId,
                To = "SERVER",
                Msg = ClientStatus.Disconnected
            };
            await SendMessageToWebClient(msg);
        }

        private async Task SendMessageToWebClient(MQTTMessage msg)
        {
            var msgStr = JsonSerializer.Serialize(msg);
            var payload = Encoding.UTF8.GetBytes(msgStr);
            MqttApplicationMessage mm = new MqttApplicationMessage()
            {
                Topic = MQTTTOPIC.UPDATE,
                Payload = payload
            };
            await mqttServer.PublishAsync(mm);
        }
    }
}
消息類MQTTMessage
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;

namespace JieYun.IoT.Common.Models
{
    public class MQTTMessage
    {
        public string Msg { get; set; }
        public string From { get; set; } = "SERVER";
        public string To { get; set; } = "e098060e71ef";

        public override string ToString()
        {
            return JsonSerializer.Serialize(this);
        }
    }
}
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;

namespace JieYun.IoT.Server
{
    public class Program
    {
        public static void Main(string[] args)
        {
            var config = new ConfigurationBuilder()
            .AddCommandLine(args)
            .Build();


            var host = new WebHostBuilder()
              .UseConfiguration(config)
              .UseKestrel()
              .UseContentRoot(Directory.GetCurrentDirectory())
              .UseStartup<Startup>()
              .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<ServerWorker>();
                })
              .ConfigureLogging(logging => {
                  logging.ClearProviders();
                  logging.SetMinimumLevel(LogLevel.Trace);
                  logging.AddConsole();
              })
              .Build();

            host.Run();
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM