最近一段時間由於要做一套智能設備系統,而有幸了解到Surging中的Mqtt broker,學習了很多東西本篇文章基於Surging開源的.netcore項目有興趣的朋友可點擊此處進行了解。話不多說我們來基於Surging 中的WS與MqttClient結合來開發服務端MqttClient的使用。
准備工作
開發環境: Visual Studio 2017 15.9.5
.netCore版本:2.2.102(目前Surging已經升級至netcore 2.2版本)
surging項目下載地址 https://github.com/dotnetcore/surging
開始工作
接口部分
新建類庫Surging.IModuleServices.MqttWithWS
添加引用Surging.Core.Protocol.WS
新建文件夾Model 並創建類MqttClientOption.cs 此類為讀取surgingSettings.json中配置的MqttClient的相關參數MqttClientOption的代碼如下
public class MqttClientOption { public string ClientID { get; set; } public string MqttClientConnection { get; set; } = ""; public string MqttClientUserName { get; set; } public string MqttClientPassword { get; set; } public int Port { get; set; } public int KeepAlivedTime { get; set; } public List<string> Topics { get; set; } = new List<string>(); public bool CleanSession { get; set; } }
surgingSettings.json的MqttClient配置代碼參見底部Surging.MqttClientWithWsServices.Server中的配置
根據Surging作者創建的例子來創建一個接口IChatService.cs
using Surging.Core.CPlatform.Ioc; using Surging.Core.CPlatform.Runtime.Client.Address.Resolvers.Implementation.Selectors.Implementation; using Surging.Core.CPlatform.Runtime.Server.Implementation.ServiceDiscovery.Attributes; using Surging.Core.CPlatform.Support.Attributes; using Surging.Core.Protocol.WS.Attributes; using Surging.IModuleServices.MqttWithWS.Models; using System; using System.Threading.Tasks; namespace Surging.IModuleServices.MqttWithWS { [ServiceBundle("Api/{Service}")] [BehaviorContract(IgnoreExtensions = true)] public interface IChatService: IServiceKey { [Command(ShuntStrategy = AddressSelectorMode.HashAlgorithm)] Task RunMqttClient(MqttClientOption mqttClientOption); } }
結構如圖所示

接口實現部分
新建類庫Surging.Modules.WSWithMqtt
添加引用Surging.IModuleServices.MqttWithWS
添加依賴項:MQTTnet
由於要獲取surgingSettings.json中的配置項,因此需要創建一個文件夾Configurations並在文件夾下創建SurgingMqttBuilderExtendsions.cs類代碼如下:
using Autofac; using Microsoft.Extensions.Configuration; using Surging.Core.CPlatform; using Surging.Core.ServiceHosting.Internal; using Surging.IModuleServices.MqttWithWS; using Surging.IModuleServices.MqttWithWS.Models; using System; using System.Collections.Generic; using System.Text; namespace Surging.Modules.WSWithMqtt.Configurations { public static class SurgingMqttBuilderExtendsions { public static IServiceHostBuilder UseMqttClient(this IServiceHostBuilder builder) { builder.MapServices(collection => { MqttClientOption mqttClientOption = new MqttClientOption(); var section = AppConfig.GetSection("MqttClient"); if (section.Exists()) mqttClientOption = section.Get<MqttClientOption>(); collection.Resolve<IChatService>().RunMqttClient(mqttClientOption); }); return builder; } } }
由於此服務基於WSServiceBase 因此我創建了MqttWSServieBase繼承自WSServiceBase用於創建MqttClient客戶端,代碼如下:
using MQTTnet; using MQTTnet.Client; using Surging.Core.Protocol.WS; using Surging.IModuleServices.MqttWithWS.Models; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace Surging.Modules.WSWithMqtt { public abstract class MqttWSServieBase : WSServiceBase { protected IMqttClientOptions _mqttClientOptions; protected IMqttClient _mqttClient; protected IEnumerable<TopicFilter> _topicFilters; public Task RunMqttClientBase(MqttClientOption mqttClientOption) { var clientOptions = new MqttClientOptionsBuilder(); if (mqttClientOption != null) { clientOptions.WithClientId(mqttClientOption.ClientID + Guid.NewGuid().ToString("N")) .WithCleanSession(mqttClientOption.CleanSession); clientOptions.WithTcpServer(mqttClientOption.MqttClientConnection, mqttClientOption.Port); if (!string.IsNullOrWhiteSpace(mqttClientOption.MqttClientUserName)) clientOptions.WithCredentials(mqttClientOption.MqttClientUserName, mqttClientOption.MqttClientPassword); clientOptions.WithKeepAlivePeriod(TimeSpan.FromSeconds(mqttClientOption.KeepAlivedTime)); } _mqttClientOptions = clientOptions.Build(); IList<TopicFilter> filters = new List<TopicFilter>(); if (mqttClientOption != null) { foreach (var item in mqttClientOption.Topics) { var topicFilerbuilder = new TopicFilterBuilder(); topicFilerbuilder.WithTopic(item); filters.Add(topicFilerbuilder.Build()); } } _topicFilters = filters; _mqttClient = new MqttFactory().CreateMqttClient(); return Task.CompletedTask; } } }
接口繼承類ChatService,此類用於連接WS,並通過此連接對Surging的 Mqtt Broker進行發布訂閱。 代碼如下:
using MQTTnet; using MQTTnet.Client; using Surging.IModuleServices.MqttWithWS; using Surging.IModuleServices.MqttWithWS.Models; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading.Tasks; using WebSocketCore; namespace Surging.Modules.WSWithMqtt { public class ChatService : MqttWSServieBase, IChatService { private static readonly ConcurrentDictionary<string, string> _users = new ConcurrentDictionary<string, string>(); private static readonly ConcurrentDictionary<string, string> _clients = new ConcurrentDictionary<string, string>(); private string _name; private string _to; protected override void OnMessage(MessageEventArgs e) { if (_clients.ContainsKey(ID)) { Dictionary<string, object> model = new Dictionary<string, object>(); model.Add("name", _to); model.Add("data", e.Data); //var result = ServiceLocator.GetService<IServiceProxyProvider>() // .Invoke<object>(model, "api/chat/SendMessage").Result; } } protected override void OnOpen() { _name = Context.QueryString["name"]; _to = Context.QueryString["to"]; if (!string.IsNullOrEmpty(_name)) { _clients[ID] = _name; _users[_name] = ID; } } public Task SendMessage(string name, string data) { if (_users.ContainsKey(name)) { this.GetClient().SendTo($"hello,{name},{data}", _users[name]); } return Task.CompletedTask; } public async Task RunMqttClient(MqttClientOption mqttClientOption) { await base.RunMqttClientBase(mqttClientOption); _mqttClient.ApplicationMessageReceived += _mqttClient_ApplicationMessageReceived; _mqttClient.Connected += _mqttClient_Connected; _mqttClient.Disconnected += _mqttClient_Disconnected; await _mqttClient.ConnectAsync(_mqttClientOptions); } #region mqttClient /// <summary> /// 接收消息 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void _mqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) { if (e.ApplicationMessage != null) { } } /// <summary> /// 斷開連接 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private async void _mqttClient_Disconnected(object sender, MqttClientDisconnectedEventArgs e) { Console.WriteLine("mqtt客戶端與服務端斷開連接!"); await Task.Delay(TimeSpan.FromSeconds(5)); try { await _mqttClient.ConnectAsync(_mqttClientOptions); } catch { Console.WriteLine("mqtt客戶端與服務端嘗試連接失敗!"); } } /// <summary> /// 連接成功 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void _mqttClient_Connected(object sender, MqttClientConnectedEventArgs e) { _mqttClient.SubscribeAsync(_topicFilters); } #endregion } }
結構如圖

創建控制台應用程序,用於項目啟動
新建控制台程序Surging.MqttClientWithWsServices.Server將Surging.Services.Server中的配置文件cacheSettings.json、eventBusSettings.json、log4net.config、NLog.config、以及surgingSettings.json文件都拷貝到新建的這個控制台程序中。
添加依賴項:Autofac Autofac.Extensions.DependencyInjection System.Text.Encoding.CodePages Microsoft.Extensions.Logging Microsoft.Extensions.Logging.Console
添加引用

修改Program.cs
using Autofac; using Microsoft.Extensions.Logging; using Surging.Core.Caching.Configurations; using Surging.Core.CPlatform; using Surging.Core.CPlatform.Configurations; using Surging.Core.CPlatform.Utilities; using Surging.Core.Nlog; using Surging.Core.ProxyGenerator; using Surging.Core.ServiceHosting; using Surging.Core.ServiceHosting.Internal.Implementation; using Surging.Modules.WSWithMqtt.Configurations; using System; using System.Text; namespace Surging.MqttClientWithWsServices.Server { class Program { static void Main(string[] args) { Encoding.RegisterProvider(CodePagesEncodingProvider.Instance); var host = new ServiceHostBuilder() .RegisterServices(builder => { builder.AddMicroService(option => { option.AddServiceRuntime() .AddClientProxy() .AddRelateService() .AddConfigurationWatch(); builder.Register(p => new CPlatformContainer(ServiceLocator.Current)); }); }) .ConfigureLogging(logger => { logger.AddConfiguration( Surging.Core.CPlatform.AppConfig.GetSection("Logging")); }) .UseNLog(LogLevel.Debug, "NLog.config") .UseServer(options => { }) .UseConsoleLifetime() .UseMqttClient() .UseProxy() .Configure(build => build.AddCacheFile("${cachepath}|cacheSettings.json", optional: false, reloadOnChange: true)) .Configure(build => build.AddCPlatformFile("${surgingpath}|surgingSettings.json", optional: false, reloadOnChange: true)) .UseStartup<Startup>() .Build(); using (host.Run()) { Console.WriteLine($"服務端啟動成功,{DateTime.Now}。"); } } } }
注意添加是需要加一段代碼.UseMqttClient()用於啟動MqttClient。
添加Startup.cs 此類與Surging.Services.Server中的Startup.cs一致。
using Autofac; using Autofac.Extensions.DependencyInjection; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Surging.Core.Caching.Configurations; using Surging.Core.CPlatform.Utilities; using System.IO; namespace Surging.MqttClientWithWsServices.Server { public class Startup { public Startup(IConfigurationBuilder config) { config.SetBasePath(Directory.GetCurrentDirectory()); ConfigureCache(config); } public IContainer ConfigureServices(ContainerBuilder builder) { var services = new ServiceCollection(); ConfigureLogging(services); builder.Populate(services); ServiceLocator.Current = builder.Build(); return ServiceLocator.Current; } public void Configure(IContainer app) { } #region 私有方法 /// <summary> /// 配置日志服務 /// </summary> /// <param name="services"></param> private void ConfigureLogging(IServiceCollection services) { services.AddLogging(); } /// <summary> /// 配置緩存服務 /// </summary> private void ConfigureCache(IConfigurationBuilder build) { build .AddCacheFile("cacheSettings.json", optional: false); } #endregion } }
現在基本上已經完事了,下面再說下surgingSettings.json配置信息為了避免與Surging.Services.Server的端口重復因此配置如下:
{
"Surging": {
"Ip": "${Surging_Server_IP}",
"WatchInterval": 30,
"Port": "${Surging_Server_Port}|100",
"MappingIp": "${Mapping_ip}",
"MappingPort": "${Mapping_Port}",
"Token": "true",
"WanIp": "${Mapping_WanIp}",
"Libuv": true,
"MaxConcurrentRequests": 20,
"ExecutionTimeoutInMilliseconds": 30000,
"Protocol": "${Protocol}|None", //Http、Tcp、None
"RootPath": "${RootPath}|",
"Ports": {
"HttpPort": "${HttpPort}|2801",
"MQTTPort": "${MQTTPort}|971",
"WSPort": "${WSPort}|961"
},
"RequestCacheEnabled": false,
"Packages": [
{
"TypeName": "EnginePartModule",
"Using": "${UseEngineParts}|DotNettyModule;NLogModule;MessagePackModule;ServiceProxyModule;ConsulModule;EventBusRabbitMQModule;CachingModule;"
}
]
}, //如果引用多個同類型的組件,需要配置Packages,如果是自定義按需引用,無需配置Packages
"Consul": {
"ConnectionString": "${Register_Conn}|127.0.0.1:8500", // "127.0.0.1:8500",
"SessionTimeout": "${Register_SessionTimeout}|50",
"RoutePath": "${Register_RoutePath}",
"ReloadOnChange": true,
"EnableChildrenMonitor": false
},
"Swagger": {
"Version": "${SwaggerVersion}|V1", // "127.0.0.1:8500",
"Title": "${SwaggerTitle}|Surging Demo",
"Description": "${SwaggerDes}|surging demo",
"Contact": {
"Name": "API Support",
"Url": "https://github.com/dotnetcore/surging",
"Email": "fanliang1@hotmail.com"
},
"License": {
"Name": "MIT",
"Url": "https://github.com/dotnetcore/surging/blob/master/LICENSE"
}
},
"EventBus_Kafka": {
"Servers": "${EventBusConnection}|localhost:9092",
"MaxQueueBuffering": "${MaxQueueBuffering}|10",
"MaxSocketBlocking": "${MaxSocketBlocking}|10",
"EnableAutoCommit": "${EnableAutoCommit}|false",
"LogConnectionClose": "${LogConnectionClose}|false",
"OffsetReset": "${OffsetReset}|earliest",
"GroupID": "${EventBusGroupID}|surgingdemo"
},
"WebSocket": {
"WaitTime": 2,
"KeepClean": false,
"Behavior": {
"IgnoreExtensions": true,
"EmitOnPing": false
}
},
"EventBus": {
"EventBusConnection": "${EventBusConnection}|192.168.1.127",
"EventBusUserName": "${EventBusUserName}|guest",
"EventBusPassword": "${EventBusPassword}|guest",
"VirtualHost": "${VirtualHost}|/",
"MessageTTL": "${MessageTTL}|30000",
"RetryCount": "${RetryCount}|1",
"FailCount": "${FailCount}|3",
"prefetchCount": "${PrefetchCount}|0",
"BrokerName": "${BrokerName}|surging_demo",
"Port": "${EventBusPort}|32671"
},
"Zookeeper": {
"ConnectionString": "${Zookeeper_ConnectionString}|127.0.0.1:2181",
"SessionTimeout": 50,
"ReloadOnChange": true
},
"Logging": {
"Debug": {
"LogLevel": {
"Default": "Information"
}
},
"Console": {
"IncludeScopes": true,
"LogLevel": {
"Default": "${LogLevel}|Debug"
}
},
"LogLevel": {
"Default": "${LogLevel}|Debug"
}
},
"MqttClient": {
"ClientID": "${MqttClientID}|serverclientid",
"MqttClientConnection": "${MqttClientConnection}|127.0.0.1",
"MqttClientUserName": "admin",
"MqttClientPassword": "123456",
"Port": 97,
"KeepAlivedTime": 60,
"CleanSession": true,
"Topics": [ "test1","test2" ]
}
}
總的Server代碼如圖所示:

此時代碼開發階段結束。我們可以設置多項目啟動,啟動項目前確定你的 Consul能夠正常使用。本地啟動Consul 可以通過控制台來做測試就OK
1、Surging.Services.Server 2、Surging.MqttClientWithWsServices.Server即(紅框標注內容):

說到此處,想必大家都知道怎么使用SurgingWS、MqttClient 與MqttBroker進行連接了。寫出這段代碼主要是針對於Surging不了解,或者攝入不深的人,能直接快速的使用本代碼讓用戶與設備間可以正常通訊。
注意如果使用Docker編排,或者Rancher編排Surging Broker 或者 WSMqttClient 時如果涉及到多個編排我們需要進行相應的邏輯判斷。
近期由於個人需求,需要把設備在線狀態通知給各個DBMqttClient端,用於保存現有設備狀態。在不處理設備連接時,我們就可以知道設備是否在線,是否有異常。異常時長等。如有此需求可在下方留言。此代碼近期可能會貢獻給Surging,讓Surging更加強大。寫這篇文章呢,主要目的是再沒有看懂作者代碼的情況下,可以盡情使用MqttBroker的功能。次處只是引用了WS與MqttClient,其實可以以此為參考部署更多的MqttClient,比如數據保存與服務通訊分開等。在此感謝Surging作者在業余時間為我們做了這么好的開源項目,願Surging越來越好。
