DotNetty實現WebSocket的簡單使用


 工作中項目是物聯網項目的,管理平台又是bs架構。

如果用 Socket 的話,Web 端還需要轉發,就全部統一采用了 WebSocket 。

DotNet 平台上的 WebSocket 實現有很多種,這里介紹一下用 DotNetty 來實現的方式。

只完成基本使用功能:

  管理連接、

  服務端接收消息、

  服務端主動向指定連接發送消息、

  服務端主動端口某連接、

  客戶端連接斷開響應。

 

本地環境 .net core 2.2

1.創建控制台應用

2.安裝NuGet包

DotNetty.Buffers

DotNetty.Codecs

DotNetty.Codecs.Http

DotNetty.Common

DotNetty.Handlers

DotNetty.Transport

DotNetty.Transport.Libuv

 

 

3.創建輔助解析的工具類

 

 新建類庫 :Examples.Common

同步引用 NuGet 包。並安裝以下幾個。

Microsoft.Extensions.Configuration

Microsoft.Extensions.Configuration.FileExtensions

Microsoft.Extensions.Configuration.Json

Microsoft.Extensions.Logging.Console

 

 Examples.Common.csproj

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <TargetFramework>netcoreapp2.2</TargetFramework>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="DotNetty.Buffers" Version="0.6.0" />
    <PackageReference Include="DotNetty.Codecs" Version="0.6.0" />
    <PackageReference Include="DotNetty.Codecs.Http" Version="0.6.0" />
    <PackageReference Include="DotNetty.Common" Version="0.6.0" />
    <PackageReference Include="DotNetty.Handlers" Version="0.6.0" />
    <PackageReference Include="DotNetty.Transport" Version="0.6.0" />
    <PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" />
    <PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" />
    <PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.2.0" />
    <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" />
    <PackageReference Include="Microsoft.Extensions.Logging.Console" Version="1.1.1" />
  </ItemGroup>

</Project>
View Code

安裝完了,記得在主控制台程序里面添加對該類庫的引用。

 

4.添加解析輔助類

創建 ExampleHelper.cs

namespace Examples.Common
{
    using System;
    using DotNetty.Common.Internal.Logging;
    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.Logging.Console;

    public static class ExampleHelper
    {
        static ExampleHelper()
        {
            Configuration = new ConfigurationBuilder()
                .SetBasePath(ProcessDirectory)
                .AddJsonFile("appsettings.json")
                .Build();
        }

        public static string ProcessDirectory
        {
            get
            {
#if NETSTANDARD1_3
                return AppContext.BaseDirectory;
#else
                return AppDomain.CurrentDomain.BaseDirectory;
#endif
            }
        }

        public static IConfigurationRoot Configuration { get; }

        public static void SetConsoleLogger() => InternalLoggerFactory.DefaultFactory.AddProvider(new ConsoleLoggerProvider((s, level) => true, false));
    }
}
View Code

 

創建 ServerSettings.cs

namespace Examples.Common
{
    public static class ServerSettings
    {
        public static bool IsSsl
        {
            get
            {
                string ssl = ExampleHelper.Configuration["ssl"];
                return !string.IsNullOrEmpty(ssl) && bool.Parse(ssl);
            }
        }

        public static int Port => int.Parse(ExampleHelper.Configuration["port"]);

        public static bool UseLibuv
        {
            get
            {
                string libuv = ExampleHelper.Configuration["libuv"];
                return !string.IsNullOrEmpty(libuv) && bool.Parse(libuv);
            }
        }
    }
}
View Code

 

創建 ClientSettings.cs

namespace Examples.Common
{
    using System.Net;

    public class ClientSettings
    {
        public static bool IsSsl
        {
            get
            {
                string ssl = ExampleHelper.Configuration["ssl"];
                return !string.IsNullOrEmpty(ssl) && bool.Parse(ssl);
            }
        }

        public static IPAddress Host => IPAddress.Parse(ExampleHelper.Configuration["host"]);

        public static int Port => int.Parse(ExampleHelper.Configuration["port"]);

        public static int Size => int.Parse(ExampleHelper.Configuration["size"]);

        public static bool UseLibuv
        {
            get
            {
                string libuv = ExampleHelper.Configuration["libuv"];
                return !string.IsNullOrEmpty(libuv) && bool.Parse(libuv);
            }
        }
    }
}
View Code

 

 5.完成WebSocket的服務端代碼

 JSON 配置文件 appsettings.json

設置文件屬性,始終復制。

{
  "port": "8080",
  "libuv": "true",
  "ssl": "false"
}

 

程序啟動 Program.cs

namespace DotNettyWebSocket
{
    using System;
    using System.IO;
    using System.Net;
    using System.Runtime;
    using System.Runtime.InteropServices;
    using System.Security.Cryptography.X509Certificates;
    using System.Threading.Tasks;
    using DotNetty.Codecs.Http;
    using DotNetty.Common;
    using DotNetty.Handlers.Tls;
    using DotNetty.Transport.Bootstrapping;
    using DotNetty.Transport.Channels;
    using DotNetty.Transport.Channels.Sockets;
    using DotNetty.Transport.Libuv;
    using Examples.Common;

    class Program
    {
        static Program()
        {
            ResourceLeakDetector.Level = ResourceLeakDetector.DetectionLevel.Disabled;
        }

        static async Task RunServerAsync()
        {
            Console.WriteLine(
                $"\n{RuntimeInformation.OSArchitecture} {RuntimeInformation.OSDescription}"
                + $"\n{RuntimeInformation.ProcessArchitecture} {RuntimeInformation.FrameworkDescription}"
                + $"\nProcessor Count : {Environment.ProcessorCount}\n");

            bool useLibuv = ServerSettings.UseLibuv;
            Console.WriteLine("Transport type : " + (useLibuv ? "Libuv" : "Socket"));

            if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
            {
                GCSettings.LatencyMode = GCLatencyMode.SustainedLowLatency;
            }

            Console.WriteLine($"Server garbage collection : {(GCSettings.IsServerGC ? "Enabled" : "Disabled")}");
            Console.WriteLine($"Current latency mode for garbage collection: {GCSettings.LatencyMode}");
            Console.WriteLine("\n");

            /*
             Netty 提供了許多不同的 EventLoopGroup 的實現用來處理不同的傳輸。
             在這個例子中我們實現了一個服務端的應用,因此會有2個 NioEventLoopGroup 會被使用。
             第一個經常被叫做‘boss’,用來接收進來的連接。第二個經常被叫做‘worker’,用來處理已經被接收的連接,一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。
             如何知道多少個線程已經被使用,如何映射到已經創建的 Channel上都需要依賴於 IEventLoopGroup 的實現,並且可以通過構造函數來配置他們的關系。
             */

            // 主工作線程組,設置為1個線程
            // Boss線程:由這個線程池提供的線程是boss種類的,用於創建、連接、綁定socket, (有點像門衛)然后把這些socket傳給worker線程池。
            // 在服務器端每個監聽的socket都有一個boss線程來處理。在客戶端,只有一個boss線程來處理所有的socket。
            IEventLoopGroup bossGroup;

            // 子工作線程組,----默認為內核數*2的線程數
            // Worker線程:Worker線程執行所有的異步I/O,即處理操作
            IEventLoopGroup workGroup;
            if (useLibuv)
            {
                var dispatcher = new DispatcherEventLoopGroup();
                bossGroup = dispatcher;
                workGroup = new WorkerEventLoopGroup(dispatcher);
            }
            else
            {
                bossGroup = new MultithreadEventLoopGroup(1);
                workGroup = new MultithreadEventLoopGroup();
            }

            X509Certificate2 tlsCertificate = null;
            if (ServerSettings.IsSsl)
            {
                tlsCertificate = new X509Certificate2(Path.Combine(ExampleHelper.ProcessDirectory, "dotnetty.com.pfx"), "password");
            }
            try
            {
                // 聲明一個服務端Bootstrap,每個Netty服務端程序,都由ServerBootstrap控制,通過鏈式的方式組裝需要的參數
                // ServerBootstrap 啟動NIO服務的輔助啟動類,負責初始話netty服務器,並且開始監聽端口的socket請求
                var bootstrap = new ServerBootstrap();

                // 設置主和工作線程組
                bootstrap.Group(bossGroup, workGroup);

                if (useLibuv)
                {
                    // 申明服務端通信通道為TcpServerChannel
                    // 設置非阻塞,用它來建立新accept的連接,用於構造serversocketchannel的工廠類
                    bootstrap.Channel<TcpServerChannel>();
                    if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)
                        || RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
                    {
                        bootstrap
                            .Option(ChannelOption.SoReuseport, true)
                            .ChildOption(ChannelOption.SoReuseaddr, true);
                    }
                }
                else
                {
                    bootstrap.Channel<TcpServerSocketChannel>();
                }

                // ChildChannelHandler 對出入的數據進行的業務操作,其繼承ChannelInitializer
                bootstrap
                    // 設置網絡IO參數等
                    .Option(ChannelOption.SoBacklog, 8192)
                    /*
                    * ChannelInitializer 是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。
                    * 也許你想通過增加一些處理類比如DiscardServerHandler 來配置一個新的 Channel 或者其對應的ChannelPipeline 來實現你的網絡程序。
                    * 當你的程序變的復雜時,可能你會增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。
                    */
                    // 設置工作線程參數
                    .ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
                    {
                        /*
                        * 工作線程連接器是設置了一個管道,服務端主線程所有接收到的信息都會通過這個管道一層層往下傳輸,
                        * 同時所有出棧的消息 也要這個管道的所有處理器進行一步步處理。
                        */
                        IChannelPipeline pipeline = channel.Pipeline;
                        if (tlsCertificate != null)
                        {
                            pipeline.AddLast(TlsHandler.Server(tlsCertificate));
                        }
                        pipeline.AddLast(new HttpServerCodec());
                        pipeline.AddLast(new HttpObjectAggregator(65536));

                        //業務handler ,這里是實際處理業務的Handler
                        //pipeline.AddLast(new WebSocketServerHandler());
                        //自己寫的業務類
                        pipeline.AddLast(new SendFunction());
                    }));

                // bootstrap綁定到指定端口的行為 就是服務端啟動服務,同樣的Serverbootstrap可以bind到多個端口
                int port = ServerSettings.Port;
                IChannel bootstrapChannel = await bootstrap.BindAsync(IPAddress.Loopback, port);
                // 似乎沒有成功阻塞 而是連接服務端后 就馬上執行下一句了 導致連接一次就關閉   (是成功進入 ChannelActive 判斷的)也就是無法保持長連接
                // 添加長連接即可,參考EchoClient

                Console.WriteLine("Open your web browser and navigate to "
                    + $"{(ServerSettings.IsSsl ? "https" : "http")}"
                    + $"://127.0.0.1:{port}/");
                Console.WriteLine("Listening on "
                    + $"{(ServerSettings.IsSsl ? "wss" : "ws")}"
                    + $"://127.0.0.1:{port}/websocket");
                Console.ReadLine();

                // 關閉服務
                await bootstrapChannel.CloseAsync();
            }
            finally
            {
                // 釋放工作組線程
                workGroup.ShutdownGracefullyAsync().Wait();
                bossGroup.ShutdownGracefullyAsync().Wait();
            }
        }

        static void Main() => RunServerAsync().Wait();
    }
}
View Code

 

業務處理類 SendFunction.cs

using DotNetty.Buffers;
using DotNetty.Codecs.Http;
using DotNetty.Codecs.Http.WebSockets;
using DotNetty.Common.Utilities;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Groups;
using Examples.Common;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using static DotNetty.Codecs.Http.HttpResponseStatus;
using static DotNetty.Codecs.Http.HttpVersion;

namespace DotNettyWebSocket
{
    public class SendFunction : SimpleChannelInboundHandler<object> 
    {
        const string WebsocketPath = "/websocket";

        WebSocketServerHandshaker handshaker;

        static volatile IChannelGroup groups;

        public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();

        //客戶端連接異常
        public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
        {
            WebSocketClose(context);
            Console.WriteLine(" SendFunction Exception: " + exception);
            context.CloseAsync();
        }

        protected override void ChannelRead0(IChannelHandlerContext ctx, object msg)
        {
            if (msg is IFullHttpRequest request)
            {
                this.HandleHttpRequest(ctx, request);
            }
            else if (msg is WebSocketFrame frame)
            {
                this.HandleWebSocketFrame(ctx, frame);
            }
        }

        void HandleHttpRequest(IChannelHandlerContext ctx, IFullHttpRequest req)
        {
            if (!req.Result.IsSuccess)
            {
                SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, BadRequest));
                return;
            }

            if (!Equals(req.Method, HttpMethod.Get))
            {
                SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, Forbidden));
                return;
            }

            var wsFactory = new WebSocketServerHandshakerFactory(
                GetWebSocketLocation(req), null, true, 5 * 1024 * 1024);
            this.handshaker = wsFactory.NewHandshaker(req);
            if (this.handshaker == null)
            {
                WebSocketServerHandshakerFactory.SendUnsupportedVersionResponse(ctx.Channel);
            }
            else
            {
                this.handshaker.HandshakeAsync(ctx.Channel, req);
            }

            base.HandlerAdded(ctx);
            IChannelGroup g = groups;
            if (g == null)
            {
                lock (this)
                {
                    if (groups == null)
                    {
                        g = groups = new DefaultChannelGroup(ctx.Executor);
                    }
                }
            }
            g.Add(ctx.Channel);

            //主動向當前連接的客戶端發送信息
            TextWebSocketFrame tst = new TextWebSocketFrame($"歡迎{ctx.Channel.RemoteAddress}加入111.");
            TextWebSocketFrame tstId = new TextWebSocketFrame($"歡迎{ctx.Channel.Id}加入222.");
            groups.WriteAndFlushAsync(tst);
            groups.WriteAndFlushAsync(tstId);

            //保存連接對象
            lock (ConnChannelList)
            {
                if (ConnChannelList.Count > 0)
                {
                    if (ConnChannelList.ContainsKey(ctx.Channel.Id.ToString()))
                    {
                        ConnChannelList.Remove(ctx.Channel.Id.ToString());
                    }
                }
                ConnChannelList.Add(ctx.Channel.Id.ToString(), ctx.Channel.Id);
                Console.WriteLine($"當前在線數:{ConnChannelList.Count}");
            }


            Console.WriteLine("---------首次到達----------");
            Console.WriteLine("連接成功");
            Console.WriteLine($"歡迎{ctx.Channel.RemoteAddress}加入");
            Console.WriteLine("---------首次到達----------");
        }

        public static volatile Dictionary<string, IChannelId> ConnChannelList = new Dictionary<string, IChannelId>();
        void HandleWebSocketFrame(IChannelHandlerContext ctx, WebSocketFrame frame)
        {
            //客戶端關閉連接
            if (frame is CloseWebSocketFrame)
            {
                WebSocketClose(ctx);

                Console.WriteLine($"連接關閉     {ctx.Channel.RemoteAddress}");
                this.handshaker.CloseAsync(ctx.Channel, (CloseWebSocketFrame)frame.Retain());

                return;
            }

            if (frame is PingWebSocketFrame)
            {
                ctx.WriteAsync(new PongWebSocketFrame((IByteBuffer)frame.Content.Retain()));
                return;
            }

            if (frame is TextWebSocketFrame textFrame)
            {
                Console.WriteLine("---------消息到達----------");
                Console.WriteLine("Received from client: " + frame.Content.ToString(Encoding.UTF8));
                Console.WriteLine("---------消息到達----------");


                //發送信息到指定連接
                string[] strArg = textFrame.Text().Split(',');
                if (strArg.Length > 1)
                {
                    lock (ConnChannelList)
                    {
                        if (ConnChannelList.ContainsKey(strArg[0]))
                        {
                            var connChannel = groups.Find(ConnChannelList[strArg[0]]);//null

                            if (connChannel != null)
                            {
                                //主動向當前連接的客戶端發送信息
                                TextWebSocketFrame tst = new TextWebSocketFrame(strArg[1]);
                                connChannel.WriteAndFlushAsync(tst);

                                //服務端斷開指定客戶端連接
                                if (strArg[1] == "close")
                                {
                                    connChannel.CloseAsync();
                                }
                            }
                        }
                    }
                }

                ctx.WriteAsync(frame.Retain());

                return;
            }

            if (frame is BinaryWebSocketFrame)
            {
                ctx.WriteAsync(frame.Retain());
            }
        }

        static void SendHttpResponse(IChannelHandlerContext ctx, IFullHttpRequest req, IFullHttpResponse res)
        {
            if (res.Status.Code != 200)
            {
                IByteBuffer buf = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes(res.Status.ToString()));
                res.Content.WriteBytes(buf);
                buf.Release();
                HttpUtil.SetContentLength(res, res.Content.ReadableBytes);
            }

            Task task = ctx.Channel.WriteAndFlushAsync(res);
            if (!HttpUtil.IsKeepAlive(req) || res.Status.Code != 200)
            {
                task.ContinueWith((t, c) => ((IChannelHandlerContext)c).CloseAsync(),
                    ctx, TaskContinuationOptions.ExecuteSynchronously);
            }
        }

        static string GetWebSocketLocation(IFullHttpRequest req)
        {
            bool result = req.Headers.TryGet(HttpHeaderNames.Host, out ICharSequence value);
            Debug.Assert(result, "Host header does not exist.");
            string location = value.ToString() + WebsocketPath;

            if (ServerSettings.IsSsl)
            {
                return "wss://" + location;
            }
            else
            {
                return "ws://" + location;
            }
        }


        /// <summary>
        /// 關閉ws連接
        /// </summary>
        /// <param name="ctx"></param>
        static void WebSocketClose(IChannelHandlerContext ctx)
        {
            lock (ConnChannelList)
            {
                string channelId = ctx.Channel.Id.ToString();
                if (ConnChannelList.ContainsKey(channelId))
                {
                    ConnChannelList.Remove(channelId);
                }
                Console.WriteLine($"當前在線數:{ConnChannelList.Count}");
            }
        }

    }
}
View Code

 

6.測試HTML腳本

<!DOCTYPE html>
<html>

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Document</title>
</head>

<body>

    <input type="text" id="message">
    <div id="msgBox"></div>

    <input type="button" onclick="sendText()" value="發送信息">

    <script>

        var ws = '';

        window.onload = function () {
            connect();
        }

        function connect() {
            var address = "ws://127.0.0.1:8080/websocket";
            // address = "wss://127.0.0.1:8080/websocket";
            ws = new WebSocket(address);

            ws.onopen = function (e) {

            };

            //收到信息時
            ws.onmessage = function (e) {
                var Html = '<p>' + e.data + '</p>';
                document.getElementById("msgBox").innerHTML += Html;
            };

            //發生錯誤時
            ws.onerror = function (e) {

            };

            //連接關閉時
            ws.onclose = function (e) {
                document.getElementById("msgBox").innerHTML += "<p>與服務器的連接已斷開。</p>";
            };

        }

        function sendText() {
            ws.send(document.getElementById("message").value);
        }

    </script>

</body>

</html>
View Code

 

7.運行測試

 

 運行第一個HTML頁面

 

 

運行第二個

 

 

 

發送消息

 

 

給第二個HTML發送消息,要拿一些特征。

 

 

 

 

 

 

 

關閉第二個頁面

 

 

基礎功能都已經完成。

在 Ubuntu 上測試也 OK。

從 SuperWebSocket 換到 DotNetty 主要原因就是想上 Linux 。

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM