工作中項目是物聯網項目的,管理平台又是bs架構。
如果用 Socket 的話,Web 端還需要轉發,就全部統一采用了 WebSocket 。
DotNet 平台上的 WebSocket 實現有很多種,這里介紹一下用 DotNetty 來實現的方式。
只完成基本使用功能:
管理連接、
服務端接收消息、
服務端主動向指定連接發送消息、
服務端主動端口某連接、
客戶端連接斷開響應。
本地環境 .net core 2.2
1.創建控制台應用
2.安裝NuGet包
DotNetty.Buffers
DotNetty.Codecs
DotNetty.Codecs.Http
DotNetty.Common
DotNetty.Handlers
DotNetty.Transport
DotNetty.Transport.Libuv
3.創建輔助解析的工具類
新建類庫 :Examples.Common
同步引用 NuGet 包。並安裝以下幾個。
Microsoft.Extensions.Configuration
Microsoft.Extensions.Configuration.FileExtensions
Microsoft.Extensions.Configuration.Json
Microsoft.Extensions.Logging.Console
Examples.Common.csproj

<Project Sdk="Microsoft.NET.Sdk"> <PropertyGroup> <TargetFramework>netcoreapp2.2</TargetFramework> </PropertyGroup> <ItemGroup> <PackageReference Include="DotNetty.Buffers" Version="0.6.0" /> <PackageReference Include="DotNetty.Codecs" Version="0.6.0" /> <PackageReference Include="DotNetty.Codecs.Http" Version="0.6.0" /> <PackageReference Include="DotNetty.Common" Version="0.6.0" /> <PackageReference Include="DotNetty.Handlers" Version="0.6.0" /> <PackageReference Include="DotNetty.Transport" Version="0.6.0" /> <PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" /> <PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.2.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" /> <PackageReference Include="Microsoft.Extensions.Logging.Console" Version="1.1.1" /> </ItemGroup> </Project>
安裝完了,記得在主控制台程序里面添加對該類庫的引用。
4.添加解析輔助類
創建 ExampleHelper.cs

namespace Examples.Common { using System; using DotNetty.Common.Internal.Logging; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging.Console; public static class ExampleHelper { static ExampleHelper() { Configuration = new ConfigurationBuilder() .SetBasePath(ProcessDirectory) .AddJsonFile("appsettings.json") .Build(); } public static string ProcessDirectory { get { #if NETSTANDARD1_3 return AppContext.BaseDirectory; #else return AppDomain.CurrentDomain.BaseDirectory; #endif } } public static IConfigurationRoot Configuration { get; } public static void SetConsoleLogger() => InternalLoggerFactory.DefaultFactory.AddProvider(new ConsoleLoggerProvider((s, level) => true, false)); } }
創建 ServerSettings.cs

namespace Examples.Common { public static class ServerSettings { public static bool IsSsl { get { string ssl = ExampleHelper.Configuration["ssl"]; return !string.IsNullOrEmpty(ssl) && bool.Parse(ssl); } } public static int Port => int.Parse(ExampleHelper.Configuration["port"]); public static bool UseLibuv { get { string libuv = ExampleHelper.Configuration["libuv"]; return !string.IsNullOrEmpty(libuv) && bool.Parse(libuv); } } } }
創建 ClientSettings.cs

namespace Examples.Common { using System.Net; public class ClientSettings { public static bool IsSsl { get { string ssl = ExampleHelper.Configuration["ssl"]; return !string.IsNullOrEmpty(ssl) && bool.Parse(ssl); } } public static IPAddress Host => IPAddress.Parse(ExampleHelper.Configuration["host"]); public static int Port => int.Parse(ExampleHelper.Configuration["port"]); public static int Size => int.Parse(ExampleHelper.Configuration["size"]); public static bool UseLibuv { get { string libuv = ExampleHelper.Configuration["libuv"]; return !string.IsNullOrEmpty(libuv) && bool.Parse(libuv); } } } }
5.完成WebSocket的服務端代碼
JSON 配置文件 appsettings.json
設置文件屬性,始終復制。
{ "port": "8080", "libuv": "true", "ssl": "false" }
程序啟動 Program.cs

namespace DotNettyWebSocket { using System; using System.IO; using System.Net; using System.Runtime; using System.Runtime.InteropServices; using System.Security.Cryptography.X509Certificates; using System.Threading.Tasks; using DotNetty.Codecs.Http; using DotNetty.Common; using DotNetty.Handlers.Tls; using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using DotNetty.Transport.Libuv; using Examples.Common; class Program { static Program() { ResourceLeakDetector.Level = ResourceLeakDetector.DetectionLevel.Disabled; } static async Task RunServerAsync() { Console.WriteLine( $"\n{RuntimeInformation.OSArchitecture} {RuntimeInformation.OSDescription}" + $"\n{RuntimeInformation.ProcessArchitecture} {RuntimeInformation.FrameworkDescription}" + $"\nProcessor Count : {Environment.ProcessorCount}\n"); bool useLibuv = ServerSettings.UseLibuv; Console.WriteLine("Transport type : " + (useLibuv ? "Libuv" : "Socket")); if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { GCSettings.LatencyMode = GCLatencyMode.SustainedLowLatency; } Console.WriteLine($"Server garbage collection : {(GCSettings.IsServerGC ? "Enabled" : "Disabled")}"); Console.WriteLine($"Current latency mode for garbage collection: {GCSettings.LatencyMode}"); Console.WriteLine("\n"); /* Netty 提供了許多不同的 EventLoopGroup 的實現用來處理不同的傳輸。 在這個例子中我們實現了一個服務端的應用,因此會有2個 NioEventLoopGroup 會被使用。 第一個經常被叫做‘boss’,用來接收進來的連接。第二個經常被叫做‘worker’,用來處理已經被接收的連接,一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。 如何知道多少個線程已經被使用,如何映射到已經創建的 Channel上都需要依賴於 IEventLoopGroup 的實現,並且可以通過構造函數來配置他們的關系。 */ // 主工作線程組,設置為1個線程 // Boss線程:由這個線程池提供的線程是boss種類的,用於創建、連接、綁定socket, (有點像門衛)然后把這些socket傳給worker線程池。 // 在服務器端每個監聽的socket都有一個boss線程來處理。在客戶端,只有一個boss線程來處理所有的socket。 IEventLoopGroup bossGroup; // 子工作線程組,----默認為內核數*2的線程數 // Worker線程:Worker線程執行所有的異步I/O,即處理操作 IEventLoopGroup workGroup; if (useLibuv) { var dispatcher = new DispatcherEventLoopGroup(); bossGroup = dispatcher; workGroup = new WorkerEventLoopGroup(dispatcher); } else { bossGroup = new MultithreadEventLoopGroup(1); workGroup = new MultithreadEventLoopGroup(); } X509Certificate2 tlsCertificate = null; if (ServerSettings.IsSsl) { tlsCertificate = new X509Certificate2(Path.Combine(ExampleHelper.ProcessDirectory, "dotnetty.com.pfx"), "password"); } try { // 聲明一個服務端Bootstrap,每個Netty服務端程序,都由ServerBootstrap控制,通過鏈式的方式組裝需要的參數 // ServerBootstrap 啟動NIO服務的輔助啟動類,負責初始話netty服務器,並且開始監聽端口的socket請求 var bootstrap = new ServerBootstrap(); // 設置主和工作線程組 bootstrap.Group(bossGroup, workGroup); if (useLibuv) { // 申明服務端通信通道為TcpServerChannel // 設置非阻塞,用它來建立新accept的連接,用於構造serversocketchannel的工廠類 bootstrap.Channel<TcpServerChannel>(); if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) || RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) { bootstrap .Option(ChannelOption.SoReuseport, true) .ChildOption(ChannelOption.SoReuseaddr, true); } } else { bootstrap.Channel<TcpServerSocketChannel>(); } // ChildChannelHandler 對出入的數據進行的業務操作,其繼承ChannelInitializer bootstrap // 設置網絡IO參數等 .Option(ChannelOption.SoBacklog, 8192) /* * ChannelInitializer 是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。 * 也許你想通過增加一些處理類比如DiscardServerHandler 來配置一個新的 Channel 或者其對應的ChannelPipeline 來實現你的網絡程序。 * 當你的程序變的復雜時,可能你會增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。 */ // 設置工作線程參數 .ChildHandler(new ActionChannelInitializer<IChannel>(channel => { /* * 工作線程連接器是設置了一個管道,服務端主線程所有接收到的信息都會通過這個管道一層層往下傳輸, * 同時所有出棧的消息 也要這個管道的所有處理器進行一步步處理。 */ IChannelPipeline pipeline = channel.Pipeline; if (tlsCertificate != null) { pipeline.AddLast(TlsHandler.Server(tlsCertificate)); } pipeline.AddLast(new HttpServerCodec()); pipeline.AddLast(new HttpObjectAggregator(65536)); //業務handler ,這里是實際處理業務的Handler //pipeline.AddLast(new WebSocketServerHandler()); //自己寫的業務類 pipeline.AddLast(new SendFunction()); })); // bootstrap綁定到指定端口的行為 就是服務端啟動服務,同樣的Serverbootstrap可以bind到多個端口 int port = ServerSettings.Port; IChannel bootstrapChannel = await bootstrap.BindAsync(IPAddress.Loopback, port); // 似乎沒有成功阻塞 而是連接服務端后 就馬上執行下一句了 導致連接一次就關閉 (是成功進入 ChannelActive 判斷的)也就是無法保持長連接 // 添加長連接即可,參考EchoClient Console.WriteLine("Open your web browser and navigate to " + $"{(ServerSettings.IsSsl ? "https" : "http")}" + $"://127.0.0.1:{port}/"); Console.WriteLine("Listening on " + $"{(ServerSettings.IsSsl ? "wss" : "ws")}" + $"://127.0.0.1:{port}/websocket"); Console.ReadLine(); // 關閉服務 await bootstrapChannel.CloseAsync(); } finally { // 釋放工作組線程 workGroup.ShutdownGracefullyAsync().Wait(); bossGroup.ShutdownGracefullyAsync().Wait(); } } static void Main() => RunServerAsync().Wait(); } }
業務處理類 SendFunction.cs

using DotNetty.Buffers; using DotNetty.Codecs.Http; using DotNetty.Codecs.Http.WebSockets; using DotNetty.Common.Utilities; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Groups; using Examples.Common; using System; using System.Collections.Generic; using System.Diagnostics; using System.Text; using System.Threading.Tasks; using static DotNetty.Codecs.Http.HttpResponseStatus; using static DotNetty.Codecs.Http.HttpVersion; namespace DotNettyWebSocket { public class SendFunction : SimpleChannelInboundHandler<object> { const string WebsocketPath = "/websocket"; WebSocketServerHandshaker handshaker; static volatile IChannelGroup groups; public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); //客戶端連接異常 public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { WebSocketClose(context); Console.WriteLine(" SendFunction Exception: " + exception); context.CloseAsync(); } protected override void ChannelRead0(IChannelHandlerContext ctx, object msg) { if (msg is IFullHttpRequest request) { this.HandleHttpRequest(ctx, request); } else if (msg is WebSocketFrame frame) { this.HandleWebSocketFrame(ctx, frame); } } void HandleHttpRequest(IChannelHandlerContext ctx, IFullHttpRequest req) { if (!req.Result.IsSuccess) { SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, BadRequest)); return; } if (!Equals(req.Method, HttpMethod.Get)) { SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, Forbidden)); return; } var wsFactory = new WebSocketServerHandshakerFactory( GetWebSocketLocation(req), null, true, 5 * 1024 * 1024); this.handshaker = wsFactory.NewHandshaker(req); if (this.handshaker == null) { WebSocketServerHandshakerFactory.SendUnsupportedVersionResponse(ctx.Channel); } else { this.handshaker.HandshakeAsync(ctx.Channel, req); } base.HandlerAdded(ctx); IChannelGroup g = groups; if (g == null) { lock (this) { if (groups == null) { g = groups = new DefaultChannelGroup(ctx.Executor); } } } g.Add(ctx.Channel); //主動向當前連接的客戶端發送信息 TextWebSocketFrame tst = new TextWebSocketFrame($"歡迎{ctx.Channel.RemoteAddress}加入111."); TextWebSocketFrame tstId = new TextWebSocketFrame($"歡迎{ctx.Channel.Id}加入222."); groups.WriteAndFlushAsync(tst); groups.WriteAndFlushAsync(tstId); //保存連接對象 lock (ConnChannelList) { if (ConnChannelList.Count > 0) { if (ConnChannelList.ContainsKey(ctx.Channel.Id.ToString())) { ConnChannelList.Remove(ctx.Channel.Id.ToString()); } } ConnChannelList.Add(ctx.Channel.Id.ToString(), ctx.Channel.Id); Console.WriteLine($"當前在線數:{ConnChannelList.Count}"); } Console.WriteLine("---------首次到達----------"); Console.WriteLine("連接成功"); Console.WriteLine($"歡迎{ctx.Channel.RemoteAddress}加入"); Console.WriteLine("---------首次到達----------"); } public static volatile Dictionary<string, IChannelId> ConnChannelList = new Dictionary<string, IChannelId>(); void HandleWebSocketFrame(IChannelHandlerContext ctx, WebSocketFrame frame) { //客戶端關閉連接 if (frame is CloseWebSocketFrame) { WebSocketClose(ctx); Console.WriteLine($"連接關閉 {ctx.Channel.RemoteAddress}"); this.handshaker.CloseAsync(ctx.Channel, (CloseWebSocketFrame)frame.Retain()); return; } if (frame is PingWebSocketFrame) { ctx.WriteAsync(new PongWebSocketFrame((IByteBuffer)frame.Content.Retain())); return; } if (frame is TextWebSocketFrame textFrame) { Console.WriteLine("---------消息到達----------"); Console.WriteLine("Received from client: " + frame.Content.ToString(Encoding.UTF8)); Console.WriteLine("---------消息到達----------"); //發送信息到指定連接 string[] strArg = textFrame.Text().Split(','); if (strArg.Length > 1) { lock (ConnChannelList) { if (ConnChannelList.ContainsKey(strArg[0])) { var connChannel = groups.Find(ConnChannelList[strArg[0]]);//null if (connChannel != null) { //主動向當前連接的客戶端發送信息 TextWebSocketFrame tst = new TextWebSocketFrame(strArg[1]); connChannel.WriteAndFlushAsync(tst); //服務端斷開指定客戶端連接 if (strArg[1] == "close") { connChannel.CloseAsync(); } } } } } ctx.WriteAsync(frame.Retain()); return; } if (frame is BinaryWebSocketFrame) { ctx.WriteAsync(frame.Retain()); } } static void SendHttpResponse(IChannelHandlerContext ctx, IFullHttpRequest req, IFullHttpResponse res) { if (res.Status.Code != 200) { IByteBuffer buf = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes(res.Status.ToString())); res.Content.WriteBytes(buf); buf.Release(); HttpUtil.SetContentLength(res, res.Content.ReadableBytes); } Task task = ctx.Channel.WriteAndFlushAsync(res); if (!HttpUtil.IsKeepAlive(req) || res.Status.Code != 200) { task.ContinueWith((t, c) => ((IChannelHandlerContext)c).CloseAsync(), ctx, TaskContinuationOptions.ExecuteSynchronously); } } static string GetWebSocketLocation(IFullHttpRequest req) { bool result = req.Headers.TryGet(HttpHeaderNames.Host, out ICharSequence value); Debug.Assert(result, "Host header does not exist."); string location = value.ToString() + WebsocketPath; if (ServerSettings.IsSsl) { return "wss://" + location; } else { return "ws://" + location; } } /// <summary> /// 關閉ws連接 /// </summary> /// <param name="ctx"></param> static void WebSocketClose(IChannelHandlerContext ctx) { lock (ConnChannelList) { string channelId = ctx.Channel.Id.ToString(); if (ConnChannelList.ContainsKey(channelId)) { ConnChannelList.Remove(channelId); } Console.WriteLine($"當前在線數:{ConnChannelList.Count}"); } } } }
6.測試HTML腳本

<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta http-equiv="X-UA-Compatible" content="ie=edge"> <title>Document</title> </head> <body> <input type="text" id="message"> <div id="msgBox"></div> <input type="button" onclick="sendText()" value="發送信息"> <script> var ws = ''; window.onload = function () { connect(); } function connect() { var address = "ws://127.0.0.1:8080/websocket"; // address = "wss://127.0.0.1:8080/websocket"; ws = new WebSocket(address); ws.onopen = function (e) { }; //收到信息時 ws.onmessage = function (e) { var Html = '<p>' + e.data + '</p>'; document.getElementById("msgBox").innerHTML += Html; }; //發生錯誤時 ws.onerror = function (e) { }; //連接關閉時 ws.onclose = function (e) { document.getElementById("msgBox").innerHTML += "<p>與服務器的連接已斷開。</p>"; }; } function sendText() { ws.send(document.getElementById("message").value); } </script> </body> </html>
7.運行測試
運行第一個HTML頁面
運行第二個
發送消息
給第二個HTML發送消息,要拿一些特征。
關閉第二個頁面
基礎功能都已經完成。
在 Ubuntu 上測試也 OK。
從 SuperWebSocket 換到 DotNetty 主要原因就是想上 Linux 。