https://github.com/chkr1011/MQTTnet/wiki/Server#intercepting-subscriptions
引用自:https://www.cnblogs.com/zhaoqm999/p/13020836.html
FinitelyFailed edited this page on 28 Apr · 4月28日編輯本頁失敗46 revisions 46個修訂版
Preparation 制作方法
Creating a MQTT server is similar to creating a MQTT client. The following code shows the most simple way of creating a new MQTT server with a TCP endpoint which is listening at the default port 1883.
創建 MQTT 服務器類似於創建 MQTT 客戶機。下面的代碼展示了創建一個新的 MQTT 服務器的最簡單的方法,該服務器的 TCP 端點在缺省端口1883處監聽。
// Start a MQTT server. var mqttServer = new MqttFactory().CreateMqttServer(); await mqttServer.StartAsync(new MqttServerOptions()); Console.WriteLine("Press any key to exit."); Console.ReadLine(); await mqttServer.StopAsync();
Setting several options for the MQTT server is possible by setting the property values of the MqttServerOptions directly or via using the MqttServerOptionsBuilder (which is recommended). The following code shows how to use the MqttServerOptionsBuilder.
為 MQTT 服務器設置幾個選項是可能的,可以直接設置 MqttServerOptions 的屬性值,也可以通過使用 MqttServerOptionsBuilder (建議使用)設置。下面的代碼演示如何使用 MqttServerOptionsBuilder。
// Configure MQTT server. var optionsBuilder = new MqttServerOptionsBuilder() .WithConnectionBacklog(100) .WithDefaultEndpointPort(1884); var mqttServer = new MqttFactory().CreateMqttServer(); await mqttServer.StartAsync(optionsBuilder.Build());
Validating MQTT clients 驗證 MQTT 客戶機
The following code shows how to validate an incoming MQTT client connection request:
下面的代碼展示了如何驗證傳入的 MQTT 客戶端連接請求:
// Setup client validator. var optionsBuilder = new MqttServerOptionsBuilder() .WithConnectionValidator(c => { if (c.ClientId.Length < 10) { c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid; return; } if (c.Username != "mySecretUser") { c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } if (c.Password != "mySecretPassword") { c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } c.ReasonCode = MqttConnectReasonCode.Success; });
Using a certificate 使用證書
In order to use an encrypted connection a certificate including the private key is required. The following code shows how to start a server using a certificate for encryption:
為了使用加密連接,需要包含私鑰的證書。下面的代碼演示如何使用加密證書啟動服務器:
using System.Reflection; using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; ... var currentPath = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); var certificate = new X509Certificate2(Path.Combine(currentPath, "certificate.pfx"),"yourPassword", X509KeyStorageFlags.Exportable); var optionsBuilder = new MqttServerOptionsBuilder() .WithoutDefaultEndpoint() // This call disables the default unencrypted endpoint on port 1883 .WithEncryptedEndpoint() .WithEncryptedEndpointPort(config.Port) .WithEncryptionCertificate(certificate.Export(X509ContentType.Pfx)) .WithEncryptionSslProtocol(SslProtocols.Tls12)
But also other overloads getting a valid certificate blob (byte array) can be used.
但是也可以使用獲得有效證書 blob (字節數組)的其他重載。
For creating a self-signed certificate for testing the following command can be used (Windows SDK must be installed):
要創建用於測試的自簽名證書,可以使用以下命令(必須安裝 Windows SDK) :
makecert.exe -sky exchange -r -n "CN=selfsigned.crt" -pe -a sha1 -len 2048 -ss My "test.cer"
2048-ss My“ test.cer”
OpenSSL can also be used to create a self-signed PFX certificate as described here.
OpenSSL 還可用於創建自簽名的 PFX 證書,如本文所述。
Example: 例子:
openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 openssl pkcs12 -export -out certificate.pfx -inkey key.pem -in cert.pem
Publishing messages 發布信息
The server is also able to publish MQTT application messages. The object is the same as for the client implementation. Due to the fact that the server is able to publish its own messages it is not required having a loopback connection in the same process.
服務器還能夠發布 MQTT 應用程序消息。該對象與客戶端實現相同。由於服務器能夠發布自己的消息,因此在相同的進程中不需要環回連接。
This allows also running the server in a Windows IoT Core UWP app. This platform has a network isolation which makes it impossible to communicate via localhost etc.
這也允許在 Windows IoT Core UWP 應用程序中運行服務器。這個平台有一個網絡隔離,這使得它不可能通過本地主機等通信。
Examples for publishing a message are described at the client section of this Wiki.
發布消息的示例在 Wiki 的客戶端部分進行了描述。
Consuming messages 消費信息
The server is also able to process every application message which was published by any client. The event ApplicationMessageReceived will be fired for every processed message. It has the same format as for the client but additionally has the ClientId.
服務器還能夠處理任何客戶機發布的每個應用程序消息。事件 ApplicationMessageReceived 將為每個處理的消息觸發。它具有與客戶機相同的格式,但另外還具有 ClientId。
Details for consuming a application messages are described at the client section of this Wiki.
使用應用程序消息的詳細信息在這個 Wiki 的客戶端部分進行了描述。
Saving retained application messages 保存保留的應用程序消息
The server supports retained MQTT messages. Those messages are kept and send to clients when they connect and subscribe to them. It is also supported to save all retained messages and loading them after the server has started. This required implementing an interface. The following code shows how to serialize retained messages as JSON:
服務器支持保留的 MQTT 消息。當客戶端連接並訂閱這些消息時,這些消息將被保存並發送給客戶端。它還支持保存所有保留的消息,並在服務器啟動后加載這些消息。這需要實現一個接口。下面的代碼展示了如何將保留的消息序列化為 JSON:
// Setting the options options.Storage = new RetainedMessageHandler(); // The implementation of the storage: // This code uses the JSON library "Newtonsoft.Json". public class RetainedMessageHandler : IMqttServerStorage { private const string Filename = "C:\\MQTT\\RetainedMessages.json"; public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages) { File.WriteAllText(Filename, JsonConvert.SerializeObject(messages)); return Task.FromResult(0); } public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync() { IList<MqttApplicationMessage> retainedMessages; if (File.Exists(Filename)) { var json = File.ReadAllText(Filename); retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json); } else { retainedMessages = new List<MqttApplicationMessage>(); } return Task.FromResult(retainedMessages); } }
Intercepting application messages 攔截應用程序消息
A custom interceptor can be set at the server options. This interceptor is called for every application message which is received by the server. This allows extending application messages before they are persisted (in case of a retained message) and before being dispatched to subscribers. This allows use cases like adding a time stamp to every application message if the hardware device does not know the time or time zone etc. The following code shows how to use the interceptor:
可以在服務器選項中設置自定義攔截器。對於服務器接收到的每個應用程序消息,都會調用這個攔截器。這允許在持久化應用程序消息(在保留消息的情況下)之前以及發送到訂閱者之前擴展應用程序消息。這允許在硬件設備不知道時間或時區等情況下向每個應用程序消息添加時間戳之類的用例。下面的代碼展示了如何使用攔截器:
var optionsBuilder = new MqttServerOptionsBuilder() .WithApplicationMessageInterceptor(context => { if (context.ApplicationMessage.Topic == "my/custom/topic") { context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes("The server injected payload."); } // It is possible to disallow the sending of messages for a certain client id like this: if (context.ClientId != "Someone") { context.AcceptPublish = false; return; } // It is also possible to read the payload and extend it. For example by adding a timestamp in a JSON document. // This is useful when the IoT device has no own clock and the creation time of the message might be important. }) .Build();
If you want to stop processing an application message completely (like a delete) then the property context.ApplicationMessage.Payload must be set to null.
如果您希望完全停止處理應用程序消息(如 delete) ,則使用屬性上下文。應用信息。有效負載必須設置為空。
Intercepting subscriptions 攔截訂閱
A custom interceptor can be set to control which topics can be subscribed by a MQTT client. This allows moving private API-Topics to a protected area which is only available for certain clients. The following code shows how to use the subscription interceptor.
可以將自定義攔截器設置為控制 MQTT 客戶機可以訂閱哪些主題。這允許將私有 api 主題移動到只對某些客戶機可用的受保護區域。下面的代碼演示如何使用訂閱攔截器。
// Protect several topics from being subscribed from every client. var optionsBuilder = new MqttServerOptionsBuilder() .WithSubscriptionInterceptor(context => { if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin") { context.AcceptSubscription = false; } if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator") { context.AcceptSubscription = false; context.CloseConnection = true; } }) .Build();
It is also supported to use an async method instead of a synchronized one like in the above example.
還支持使用異步方法,而不是像上面示例中那樣使用同步方法。
Storing data in the session 在會話中存儲數據
From version 3.0.6 and up, there is a Dictionary<object, object>
called SessionItems
. It allows to store custom data in the session and is available in all interceptors:
從版本3.0.6及以上,有一個 Dictionary < object,object > called SessionItems。它允許在會話中存儲自定義數據,並且在所有攔截器中都可用:
var optionsBuilder = new MqttServerOptionsBuilder() .WithConnectionValidator(c => { c.SessionItems.Add("SomeData", true); } .WithSubscriptionInterceptor(c => { c.SessionItems.Add("YourData", new List<string>{"a", "b"}); } .WithApplicationMessageInterceptor(c => { c.SessionItems.Add("Test", 123); }
ASP.NET Core Integration
ASP.NET Core 2.0
This library also has support for a WebSocket based server which is integrated into ASP.NET Core 2.0. This functionality requires an additional library called MQTTnet.AspNetCore. After adding this library a MQTT server can be added to a Kestrel HTTP server.
該庫還支持一個基於 WebSocket 的服務器,該服務器集成到 ASP.NET Core 2.0中。這個功能需要一個稱為 MQTTnet 的附加庫。AspNetCore.添加這個庫之后,可以將 MQTT 服務器添加到 kestrelhttp 服務器。
// In class _Startup_ of the ASP.NET Core 2.0 project. public void ConfigureServices(IServiceCollection services) { // This adds a hosted mqtt server to the services services.AddHostedMqttServer(builder => builder.WithDefaultEndpointPort(1883)); // This adds TCP server support based on System.Net.Socket services.AddMqttTcpServerAdapter(); // This adds websocket support services.AddMqttWebSocketServerAdapter(); } public void Configure(IApplicationBuilder app, IHostingEnvironment env) { // This maps the websocket to an MQTT endpoint app.UseMqttEndpoint(); // Other stuff }
ASP.NET Core 2.1+
MQTTnet.AspNetCore is compatible with the abstractions present in ASP.NET Core 2.0 but it also offers a new TCP transport based on ASP.NET Core 2.1 Microsoft.AspNetCore.Connections.Abstractions. This transport is mutual exclusive with the old TCP transport so you may only add and use one of them. Our benchmark indicates that the new transport is up to 30 times faster.
MQTTnet.與 ASP.NET Core 2.0中的抽象兼容,但是它也提供了一個基於 ASP.NET Core 2.1 Microsoft 的新的 TCP 傳輸協議。AspNetCore.聯系。抽象概念。這種傳輸與舊的 TCP 傳輸是相互排斥的,因此您只能添加和使用其中一種傳輸。我們的基准測試表明,新的傳輸速度要快30倍。
// In class _Program_ of the ASP.NET Core 2.1 or 2.2 project. private static IWebHost BuildWebHost(string[] args) => WebHost.CreateDefaultBuilder(args) .UseKestrel(o => { o.ListenAnyIP(1883, l => l.UseMqtt()); // MQTT pipeline o.ListenAnyIP(5000); // Default HTTP pipeline }) .UseStartup<Startup>() .Build(); // In class _Startup_ of the ASP.NET Core 2.1 or 2.2 project. public void ConfigureServices(IServiceCollection services) { //this adds a hosted mqtt server to the services services.AddHostedMqttServer(builder => builder.WithDefaultEndpointPort(1883)); //this adds tcp server support based on Microsoft.AspNetCore.Connections.Abstractions services.AddMqttConnectionHandler(); //this adds websocket support services.AddMqttWebSocketServerAdapter(); }
ASP.NET Core 3.0+ (Since MQTT version 3.0.9)
In ASP.NET Core 3.0+, the server can be configured like this. Remember, that the TLS middleware connection is not yet available, so this will only work for WebSocket connections (Check https://github.com/chkr1011/MQTTnet/issues/464).
在 ASP.NET Core 3.0 + 中,服務器可以這樣配置。請記住,TLS 中間件連接還不可用,因此這只適用於 WebSocket 連接(請檢查 https://github.com/chkr1011/mqttnet/issues/464連接)。
// In class _Program_ of the ASP.NET Core 3.0+ project. private static IWebHost BuildWebHost(string[] args) => WebHost.CreateDefaultBuilder(args) .UseKestrel(o => { o.ListenAnyIP(1883, l => l.UseMqtt()); // MQTT pipeline o.ListenAnyIP(5000); // Default HTTP pipeline }) .UseStartup<Startup>() .Build(); // In class _Startup_ of the ASP.NET Core 3.0+ project. public void ConfigureServices(IServiceCollection services) { services .AddHostedMqttServer(mqttServer => mqttServer.WithoutDefaultEndpoint()) .AddMqttConnectionHandler() .AddConnections(); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { app.UseEndpoints(endpoints => { endpoints.MapMqtt("/mqtt"); }); app.UseMqttServer(server => { // Todo: Do something with the server }); }
Windows IoT Core and UWP localhost loopback addresses 和 UWP 本地主機回送地址
In Windows IoT Core as well as in UWP, loopback connections (127.0.0.1) are not allowed. If you try to connect to a locally running server (broker), this will fail. See Communicating with localhost (loopback) for enable loopback in Windows 10 IoT Core and UWP-apps.
在 Windows IoT Core 以及 UWP 中,loopback 連接(127.0.0.1)是不允許的。如果嘗試連接到本地運行的服務器(代理) ,則會失敗。請參閱與本地主機通信(loopback)以便在 Windows 10的物聯網核心和 UWP-apps 中啟用 loopback。
Special notice for using the server project in Android 在 Android 中使用服務器項目的特別注意事項
Under Android, there is an issue with the default bound IP address. So you have to use the actual address of the device. Check the example below.
在 Android 系統下,缺省綁定 IP 地址有一個問題。所以你必須使用設備的實際地址。查看下面的例子。
IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName()); IPAddress ipAddress = ipHostInfo.AddressList[0]; var server = new MqttFactory().CreateMqttServer(); server.StartAsync(new MqttServerOptionsBuilder() .WithDefaultEndpointBoundIPAddress(ipAddress) .WithDefaultEndpointBoundIPV6Address(IPAddress.None) .Build()).GetAwaiter().GetResult();
Accessing the MQTT server in an ASP.NET MVC controller
If we have an ASP.NET Core application that needs to send MQTT messages from an MVC controller, the MqttService
singleton needs to be registered with dependency injection. The trick is to have two methods to correctly setup the MQTT part:
如果我們有一個 ASP.NET Core 應用程序需要從一個 MVC 控制器發送 MQTT 消息,那么 MqttService 單例應用程序需要向依賴注入注冊。關鍵在於有兩種方法可以正確設置 MQTT 部分:
-
Have
MqttService
implement all the interfaces needed to hook with MqttServer (likeIMqttServerClientConnectedHandler
,IMqttServerApplicationMessageInterceptor
, etc.)讓 MqttService 實現與 MqttServer 掛接所需的所有接口(比如 IMqttServerClientConnectedHandler、 IMqttServerApplicationMessageInterceptor 等)
-
Write a
ConfigureMqttServerOptions(AspNetMqttServerOptionsBuilder options)
method that sets up the current object as callback for the needed methods:編寫一個 ConfigureMqttServerOptions (aspnetmqttserveroptionsbilder options)方法,將當前對象設置為所需方法的回調:
public void ConfigureMqttServerOptions(AspNetMqttServerOptionsBuilder options) { options.WithConnectionValidator(this); options.WithApplicationMessageInterceptor(this); }
- Write a 寫一個
ConfigureMqttServer(IMqttServer mqtt)
that stores the reference to the MQTT server for later use and setup the handlers: 存儲對 MQTT 服務器的引用,以供以后使用,並設置處理程序:
public void ConfigureMqttServer(IMqttServer mqtt) { this.mqtt = mqtt; mqtt.ClientConnectedHandler = this; mqtt.ClientDisconnectedHandler = this; }
Then, in your Startup
class configure and use the service.
然后,在啟動類中配置和使用該服務。
In ConfigureServices
:
配置服務:
services.AddSingleton<MqttService>(); services.AddHostedMqttServerWithServices(options => { var s = options.ServiceProvider.GetRequiredService<MqttService>(); s.ConfigureMqttServerOptions(options); }); services.AddMqttConnectionHandler(); services.AddMqttWebSocketServerAdapter();
In Configure
:
配置:
app.UseMqttEndpoint();
app.UseMqttServer(server => app.ApplicationServices.GetRequiredService<MqttService().ConfigureMqttServer(server));