(先埋怨一下微軟大大)我們做NET開發,十分羡慕JAVA上能有NETTY, SPRING, STRUTS, DUBBO等等優秀框架,而我們NET就只有干瞪眼,哎,無賴之前生態圈沒做好,恨鐵不成鋼啊。不過由於近來Net Core的發布,慢慢也拉回了一小部分屬於微軟的天下,打住,閑話扯到這兒。
DotNetty是Azure團隊仿照(幾乎可以這么說)JAVA的Netty而出來的(目前已實現Netty的一部分),目前在Github上的Star有1.8K+,地址:https://github.com/Azure/DotNetty,沒有任何文檔,和代碼中少量的注釋。雖然比Netty出來晚了很多年,不過我們NET程序員們也該慶幸了,在自己的平台上終於能用上類似Netty這樣強大的通信框架了。
傳統通訊的問題:
我們使用通用的應用程序或者類庫來實現互相通訊,比如,我們經常使用一個 HTTP 客戶端庫來從 web 服務器上獲取信息,或者通過 web 服務來執行一個遠程的調用。
然而,有時候一個通用的協議或他的實現並沒有很好的滿足需求。比如我們無法使用一個通用的 HTTP 服務器來處理大文件、電子郵件以及近實時消息,比如金融信息和多人游戲數據。我們需要一個高度優化的協議來處理一些特殊的場景。例如你可能想實現一個優化了的 Ajax 的聊天應用、媒體流傳輸或者是大文件傳輸器,你甚至可以自己設計和實現一個全新的協議來准確地實現你的需求。
另一個不可避免的情況是當你不得不處理遺留的專有協議來確保與舊系統的互操作性。在這種情況下,重要的是我們如何才能快速實現協議而不犧牲應用的穩定性和性能。
解決:
Netty 是一個提供 asynchronous event-driven (異步事件驅動)的網絡應用框架,是一個用以快速開發高性能、可擴展協議的服務器和客戶端。
換句話說,Netty 是一個 NIO 客戶端服務器框架,使用它可以快速簡單地開發網絡應用程序,比如服務器和客戶端的協議。Netty 大大簡化了網絡程序的開發過程比如 TCP 和 UDP 的 socket 服務的開發。
“快速和簡單”並不意味着應用程序會有難維護和性能低的問題,Netty 是一個精心設計的框架,它從許多協議的實現中吸收了很多的經驗比如 FTP、SMTP、HTTP、許多二進制和基於文本的傳統協議.因此,Netty 已經成功地找到一個方式,在不失靈活性的前提下來實現開發的簡易性,高性能,穩定性。
有一些用戶可能已經發現其他的一些網絡框架也聲稱自己有同樣的優勢,所以你可能會問是 Netty 和它們的不同之處。答案就是 Netty 的哲學設計理念。Netty 從開始就為用戶提供了用戶體驗最好的 API 以及實現設計。正是因為 Netty 的哲學設計理念,才讓您得以輕松地閱讀本指南並使用 Netty。
(DotNetty的框架和實現是怎么回事,筆者不太清楚,但完全可參考Netty官方的文檔來學習和使用DotNetty相關的API接口)
DotNetty中幾個重要的庫(程序集):
DotNetty.Buffers: 對內存緩沖區管理的封裝。
DotNetty.Codecs: 對編解碼是封裝,包括一些基礎基類的實現,我們在項目中自定義的協議,都要繼承該項目的特定基類和實現。
DotNetty.Codecs.Mqtt: MQTT(消息隊列遙測傳輸)編解碼是封裝,包括一些基礎基類的實現。
DotNetty.Codecs.Protobuf: Protobuf 編解碼是封裝,包括一些基礎基類的實現。
DotNetty.Codecs.ProtocolBuffers: ProtocolBuffers編解碼是封裝,包括一些基礎基類的實現。
DotNetty.Codecs.Redis: Redis 協議編解碼是封裝,包括一些基礎基類的實現。
DotNetty.Common: 公共的類庫項目,包裝線程池,並行任務和常用幫助類的封裝。
DotNetty.Handlers: 封裝了常用的管道處理器,比如Tls編解碼,超時機制,心跳檢查,日志等。
DotNetty.Transport: DotNetty核心的實現,Socket基礎框架,通信模式:異步非阻塞。
DotNetty.Transport.Libuv: DotNetty自己實現基於Libuv (高性能的,事件驅動的I/O庫) 核心的實現。
常用的庫有Codecs, Common, Handlers, Buffers, Transport,目前Azure團隊正在實現其他Netty中的API(包括非公共Netty的API),讓我們拭目以待吧。
直接上點對點之間通訊的栗子
DotNetty的Example文件夾下有許多官方提供的實例,有拋棄服務實例(Discard),有應答服務實例(echo),有Telnet服務實例等等,為了實現直接點對點通訊,筆者采用了Echo的demo,此后的RPC調用也會基於Echo而實現,注釋詳細,直接上接收端(Server)的代碼:
/*
* Netty 是一個半成品,作用是在需要基於自定義協議的基礎上完成自己的通信封裝
* Netty 大大簡化了網絡程序的開發過程比如 TCP 和 UDP 的 socket 服務的開發。
* “快速和簡單”並不意味着應用程序會有難維護和性能低的問題,
* Netty 是一個精心設計的框架,它從許多協議的實現中吸收了很多的經驗比如 FTP、SMTP、HTTP、許多二進制和基於文本的傳統協議。
* 因此,Netty 已經成功地找到一個方式,在不失靈活性的前提下來實現開發的簡易性,高性能,穩定性。
*/
namespace Echo.Server
{
using System;
using System.Threading.Tasks;
using DotNetty.Codecs;
using DotNetty.Handlers.Logging;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Libuv;
using Examples.Common;
static class Program
{
static async Task RunServerAsync()
{
ExampleHelper.SetConsoleLogger();
// 申明一個主回路調度組
var dispatcher = new DispatcherEventLoopGroup();
/*
Netty 提供了許多不同的 EventLoopGroup 的實現用來處理不同的傳輸。
在這個例子中我們實現了一個服務端的應用,因此會有2個 NioEventLoopGroup 會被使用。
第一個經常被叫做‘boss’,用來接收進來的連接。第二個經常被叫做‘worker’,用來處理已經被接收的連接,一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。
如何知道多少個線程已經被使用,如何映射到已經創建的 Channel上都需要依賴於 IEventLoopGroup 的實現,並且可以通過構造函數來配置他們的關系。
*/
// 主工作線程組,設置為1個線程
IEventLoopGroup bossGroup = dispatcher; // (1)
// 子工作線程組,設置為1個線程
IEventLoopGroup workerGroup = new WorkerEventLoopGroup(dispatcher);
try
{
// 聲明一個服務端Bootstrap,每個Netty服務端程序,都由ServerBootstrap控制,通過鏈式的方式組裝需要的參數
var serverBootstrap = new ServerBootstrap(); // (2)
// 設置主和工作線程組
serverBootstrap.Group(bossGroup, workerGroup);
if (ServerSettings.UseLibuv)
{
// 申明服務端通信通道為TcpServerChannel
serverBootstrap.Channel<TcpServerChannel>(); // (3)
}
serverBootstrap
// 設置網絡IO參數等
.Option(ChannelOption.SoBacklog, 100) // (5)
// 在主線程組上設置一個打印日志的處理器
.Handler(new LoggingHandler("SRV-LSTN"))
// 設置工作線程參數
.ChildHandler(
/*
* ChannelInitializer 是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。
* 也許你想通過增加一些處理類比如DiscardServerHandler 來配置一個新的 Channel 或者其對應的ChannelPipeline 來實現你的網絡程序。
* 當你的程序變的復雜時,可能你會增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。
*/
new ActionChannelInitializer<IChannel>( // (4)
channel =>
{
/*
* 工作線程連接器是設置了一個管道,服務端主線程所有接收到的信息都會通過這個管道一層層往下傳輸,
* 同時所有出棧的消息 也要這個管道的所有處理器進行一步步處理。
*/
IChannelPipeline pipeline = channel.Pipeline;
// 添加日志攔截器
pipeline.AddLast(new LoggingHandler("SRV-CONN"));
// 添加出棧消息,通過這個handler在消息頂部加上消息的長度。
// LengthFieldPrepender(2):使用2個字節來存儲數據的長度。
pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));
/*
入棧消息通過該Handler,解析消息的包長信息,並將正確的消息體發送給下一個處理Handler
1,InitialBytesToStrip = 0, //讀取時需要跳過的字節數
2,LengthAdjustment = -5, //包實際長度的糾正,如果包長包括包頭和包體,則要減去Length之前的部分
3,LengthFieldLength = 4, //長度字段的字節數 整型為4個字節
4,LengthFieldOffset = 1, //長度屬性的起始(偏移)位
5,MaxFrameLength = int.MaxValue, //最大包長
*/
pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
// 業務handler
pipeline.AddLast("echo", new EchoServerHandler());
}));
// bootstrap綁定到指定端口的行為就是服務端啟動服務,同樣的Serverbootstrap可以bind到多個端口
IChannel boundChannel = await serverBootstrap.BindAsync(ServerSettings.Port); // (6)
Console.WriteLine("wait the client input");
Console.ReadLine();
// 關閉服務
await boundChannel.CloseAsync();
}
finally
{
// 釋放指定工作組線程
await Task.WhenAll( // (7)
bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))
);
}
}
static void Main() => RunServerAsync().Wait();
}
}
-
IEventLoopGroup 是用來處理I/O操作的多線程事件循環器,DotNetty 提供了許多不同的 EventLoopGroup 的實現用來處理不同的傳輸。在這個例子中我們實現了一個服務端的應用,因此會有2個 IEventLoopGroup 會被使用。第一個經常被叫做‘boss’,用來接收進來的連接。第二個經常被叫做‘worker’,用來處理已經被接收的連接,一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。
-
ServerBootstrap 是一個啟動 Transport 服務的輔助啟動類。你可以在這個服務中直接使用 Channel,但是這會是一個復雜的處理過程,在很多情況下你並不需要這樣做。
-
這里我們指定使用 TcpServerChannel類來舉例說明一個新的 Channel 如何接收進來的連接。
-
ChannelInitializer 是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel,當你的程序變的復雜時,可能你會增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。
-
你可以設置這里指定的 Channel 實現的配置參數。我們正在寫一個TCP/IP 的服務端,因此我們被允許設置 socket 的參數選項比如tcpNoDelay 和 keepAlive。
-
綁定端口然后啟動服務,這里我們在機器上綁定了機器網卡上的設置端口,當然現在你可以多次調用 bind() 方法(基於不同綁定地址)。
-
使用完成后,優雅的釋放掉指定的工作組線程,當然,你可以選擇關閉程序,但這並不推薦。
Server端的事件處理代碼:
上一部分代碼中加粗地方的實現
namespace Echo.Server
{
using System;
using System.Text;
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
/// <summary>
/// 服務端處理事件函數
/// </summary>
public class EchoServerHandler : ChannelHandlerAdapter // ChannelHandlerAdapter 業務繼承基類適配器 // (1)
{
/// <summary>
/// 管道開始讀
/// </summary>
/// <param name="context"></param>
/// <param name="message"></param>
public override void ChannelRead(IChannelHandlerContext context, object message) // (2)
{
if (message is IByteBuffer buffer) // (3)
{
Console.WriteLine("Received from client: " + buffer.ToString(Encoding.UTF8));
}
context.WriteAsync(message); // (4)
}
/// <summary>
/// 管道讀取完成
/// </summary>
/// <param name="context"></param>
public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); // (5)
/// <summary>
/// 出現異常
/// </summary>
/// <param name="context"></param>
/// <param name="exception"></param>
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
Console.WriteLine("Exception: " + exception);
context.CloseAsync();
}
}
}
-
DiscardServerHandler 繼承自 ChannelInboundHandlerAdapter,這個類實現了IChannelHandler接口,IChannelHandler提供了許多事件處理的接口方法,然后你可以覆蓋這些方法。現在僅僅只需要繼承 ChannelInboundHandlerAdapter 類而不是你自己去實現接口方法。
-
這里我們覆蓋了 chanelRead() 事件處理方法。每當從客戶端收到新的數據時,這個方法會在收到消息時被調用,這個例子中,收到的消息的類型是 ByteBuf。
-
為了響應或顯示客戶端發來的信息,為此,我們將在控制台中打印出客戶端傳來的數據。
-
然后,我們將客戶端傳來的消息通過context.WriteAsync寫回到客戶端。
-
當然,步驟4只是將流緩存到上下文中,並沒執行真正的寫入操作,通過執行Flush將流數據寫入管道,並通過context傳回給傳來的客戶端。
Client端代碼:
重點看注釋的地方,其他地方跟Server端沒有任何區別
namespace Echo.Client
{
using System;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Handlers.Logging;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using Examples.Common;
static class Program
{
static async Task RunClientAsync()
{
ExampleHelper.SetConsoleLogger();
var group = new MultithreadEventLoopGroup();
try
{
var bootstrap = new Bootstrap();
bootstrap
.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(
new ActionChannelInitializer<ISocketChannel>(
channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
pipeline.AddLast(new LoggingHandler());
pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));
pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
pipeline.AddLast("echo", new EchoClientHandler());
}));
IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(ClientSettings.Host, ClientSettings.Port));
// 建立死循環,類同於While(true)
for (;;) // (4)
{
Console.WriteLine("input you data:");
// 根據設置建立緩存區大小
IByteBuffer initialMessage = Unpooled.Buffer(ClientSettings.Size); // (1)
string r = Console.ReadLine();
// 將數據流寫入緩沖區
initialMessage.WriteBytes(Encoding.UTF8.GetBytes(r ?? throw new InvalidOperationException())); // (2)
// 將緩沖區數據流寫入到管道中
await clientChannel.WriteAndFlushAsync(initialMessage); // (3)
if(r.Contains("bye"))
break;
}
Console.WriteLine("byebye");
await clientChannel.CloseAsync();
}
finally
{
await group.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
}
}
static void Main() => RunClientAsync().Wait();
}
}
-
初始化一個緩沖區的大小。
-
默認緩沖區接受的數據類型為bytes[],當然這樣也更加便於序列化成流。
-
將緩沖區的流直接數據寫入到Channel管道中。該管道一般為鏈接通訊的另一端(C端)。
-
建立死循環,這樣做的目的是為了測試每次都必須從客戶端輸入的數據,通過服務端回路一次后,再進行下一次的輸入操作。
Client端的事件處理代碼:
namespace Echo.Client
{
using System;
using System.Text;
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
public class EchoClientHandler : ChannelHandlerAdapter
{
readonly IByteBuffer initialMessage;
public override void ChannelActive(IChannelHandlerContext context) => context.WriteAndFlushAsync(this.initialMessage);
public override void ChannelRead(IChannelHandlerContext context, object message)
{
if (message is IByteBuffer byteBuffer)
{
Console.WriteLine("Received from server: " + byteBuffer.ToString(Encoding.UTF8));
}
}
public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
Console.WriteLine("Exception: " + exception);
context.CloseAsync();
}
}
}
非常簡單,將數據流顯示到控制台。
實現結果
至此,我們使用DotNetty框架搭建簡單的應答服務器就這樣做好了,很簡單,實現效果如下:
C端主動向S端主動發送數據后,S端收到數據,在控制台打印出數據,並回傳給C端,當然,S端還可以做很多很多的事情。
DotNetty內部調試記錄分析
雖然DotNetty官方沒有提供任何技術文檔,但官方卻提供了詳細的調試記錄,很多時候,我們學習者其實也可以通過調試記錄來分析某一個功能的實現流程。我們可以通過將DotNetty的內部輸入輸出記錄打印到控制台上。
InternalLoggerFactory.DefaultFactory.AddProvider(new ConsoleLoggerProvider((s, level) => true, false));
可以看到服務端的打印記錄一下多出來了許多許多,有大部分是屬於DotNetty內部調試時的打印記錄,我們只着重看如下的部分。
dbug: SRV-LSTN[0]
[id: 0x3e8afca1] HANDLER_ADDED
dbug: SRV-LSTN[0]
[id: 0x3e8afca1] REGISTERED (1)
dbug: SRV-LSTN[0]
[id: 0x3e8afca1] BIND: 0.0.0.0:8007 (2)
wait the client input
dbug: SRV-LSTN[0]
[id: 0x3e8afca1, 0.0.0.0:8007] ACTIVE (3)
dbug: SRV-LSTN[0]
[id: 0x3e8afca1, 0.0.0.0:8007] READ (4)
dbug: SRV-LSTN[0]
[id: 0x3e8afca1, 0.0.0.0:8007] RECEIVED: [id: 0x7bac2775, 127.0.0.1:64073 :> 127.0.0.1:8007] (5)
dbug: SRV-LSTN[0]
[id: 0x3e8afca1, 0.0.0.0:8007] RECEIVED_COMPLETE (6)
dbug: SRV-LSTN[0]
[id: 0x3e8afca1, 0.0.0.0:8007] READ (7)
dbug: SRV-CONN[0]
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] HANDLER_ADDED (8)
dbug: SRV-CONN[0]
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] REGISTERED (9)
dbug: SRV-CONN[0]
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] ACTIVE (10)
dbug: SRV-CONN[0]
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] READ (11)
dbug: DotNetty.Buffers.AbstractByteBuffer[0] (12)
-Dio.netty.buffer.bytebuf.checkAccessible: True
dbug: SRV-CONN[0]
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] RECEIVED: 14B (13)
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|100000000| 00 0C 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 |..hello world! |
+--------+-------------------------------------------------+----------------+
Received from client: hello world!
dbug: SRV-CONN[0] (14)
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] WRITE: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|100000000| 00 0C |.. |
+--------+-------------------------------------------------+----------------+
dbug: SRV-CONN[0] (15)
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] WRITE: 12B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|100000000| 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 |hello world! |
+--------+-------------------------------------------------+----------------+
dbug: SRV-CONN[0] (16)
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] RECEIVED_COMPLETE
dbug: SRV-CONN[0] (17)
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] FLUSH
dbug: SRV-CONN[0] (18)
[id: 0x7bac2775, 127.0.0.1:64073 => 127.0.0.1:8007] READ
咋一看,有18個操作,好像有點太多了,其實不然,還有很多很多的內部調試細節並沒打印到控制台上。
-
通過手動建立的工作線程組,並將這組線程注冊到管道中,這個管道可以是基於SOCKER,可以基於IChannel(1);
-
綁定自定的IP地址和端口號到自定義管道上(2);
-
激活自定義管道(3);
-
開始讀取(其實也是開始監聽)(4);
-
收到來自id為0x7bac2775的客戶端連接請求,建立連接,並繼續開始監聽(5)(6)(7);
-
從第8步開始,日志已經變成id為0x7bac2775的記錄了,當然一樣包含注冊管道,激活管道,開始監聽等等與S端一模一樣的操作(8)(9)(10)(11)
-
當筆者輸入一條"hello world!"數據后,DotNetty.Buffers.AbstractByteBuffer會進行數據類型檢查,以便確認能將數據放入到管道中。(12)
-
將數據發送到S端,數據大小為14B,hello world前有兩個點,代表這是數據頭,緊接着再發送兩個點,但沒有任何數據,代表數據已經結束。DotNetty將數據的十六進制存儲位用易懂的方式表現了出來,很人性化。(13)(14)
-
S端收到數據沒有任何加工和處理,馬上將數據回傳到C端。(15)(16)
-
最后,當這個過程完成后,需要將緩存區的數據強制寫入到管道中,所以會執行一次Flush操作,整個傳輸完成。接下來,不管是C端還是S端,繼續將自己的狀態改成READ,用於監聽管道中的各種情況,比如連接狀態,數據傳輸等等(17)。
總結
對於剛開始接觸Socket編程的朋友而言,這是個噩夢,因為Socket編程的復雜性不會比多線程容易,甚至會更復雜。協議,壓縮,傳輸,多線程,監聽,流控制等等一系列問題擺在面前,因此而誕生了Netty這樣優秀的開源框架,但是Netty是個半成品,因為你需要基於他來實現自己想要的協議,傳輸等等自定義操作,而底層的內容,你完全不用關心。不像某些框架,比如Newtonsoft.Json這樣的功能性框架,不用配置,不用自定義,直接拿來用就可以了。
雖然DotNetty幫我們實現了底層大量的操作,但如果不熟悉或者一點也不懂網絡通信,同樣對上面的代碼是一頭霧水,為何?行情需要,我們程序員天天都在趕業務,哪有時間去了解和學習更多的細節...通過將調試記錄打印出來,並逐行挨個的對照代碼進行分析,就會慢慢開始理解最簡單的通信流程了。
本篇只是實現了基於DotNetty最簡單的通訊過程,也只是將數據做了一下回路,並沒做到任何與RPC有關的調用,下一篇我們開始講這個例子深入,介紹基於DotNetty的RPC調用。
原文地址: https://www.cnblogs.com/SteveLee/p/9860507.html