一、概要
在上一篇文章講到Dotnetty的基本認識,本文這次會講解dotnetty非常核心的模塊是屬於比較硬核的干貨了,然后繼續往下講解如何根據自己的需求或者自己的喜好去配置Dotnetty而不是生搬硬套官網的示例源碼。如果看了本文有收獲的話麻煩關注一下文章尾部的公眾號和技術討論群。各位的支持是對我莫大的幫助。
二、簡介
主要講解一下幾個知識點:
- EventLoopGroup & EventLoop
- Bootstrap
- Channel
- ChannelPipeline & ChannelHandler
- ChannelHandlerContext
- ChannelHandler
三、詳細內容
1.EventLoopGroup & EventLoop
- 高性能RPC框架的3個要素:IO模型、數據協議、線程模型
- EventLoop好比一個線程,1個EventLoop可以服務多個Channel,1個Channel只有一個EventLoop可以創建多個 EventLoop 來優化資源利用,也就是EventLoopGroup。
- EventLoopGroup 負責分配 EventLoop 到新創建的 Channel,里面包含多個EventLoop
- EventLoopGroup →多個 EventLoop ,EventLoop →維護一個 Selector。
2.服務器啟動引導類:ServerBootstrap
Group :設置線程組模型,Reactor線程模型對比EventLoopGroup
-
單線程
-
多線程
-
主從線程
Channel:設置channel通道類型NioServerSocketChannel、OioServerSocketChannel
Option: 作用於每個新建立的channel,設置TCP連接中的一些參數,如下:
-
ChannelOption.SO_BACKLOG: 存放已完成三次握手的請求的等待隊列的最大長度;
-
ChannelOption.TCP_NODELAY: 為了解決Nagle的算法問題,默認是false, 要求高實時性,有數據時馬上發送,就將該選項設置為true關閉Nagle算法;如果要減少發送次數,就設置為false,會累積一定大小后再發送。
-
ChildOption: 作用於被accept之后的連接
-
ChildHandler: 用於對每個通道里面的數據處理
3.連接通道類:Channel
Channel: 客戶端和服務端建立的一個連接通道(可以理解為一個channel就是一個socket連接) ChannelHandler: 負責Channel的邏輯處理 ChannelPipeline: 負責管理ChannelHandler的有序容器
關系: 一個Channel包含一個ChannelPipeline,所有ChannelHandler都會順序加入到ChannelPipeline中 創建 Channel時會自動創建一個ChannelPipeline,每個Channel都有一個管理它的pipeline,這關聯是永久 性的Channel當狀態出現變化,就會觸發對應的事件。
生命周期:
-
ChannelRegistered: channel注冊到一個EventLoop
-
ChannelActive: 變為活躍狀態(連接到了遠程主機),可以接受和發送數據
-
ChannelInactive: channel處於非活躍狀態,沒有連接到遠程主機
-
ChannelUnregistered: channel已經創建,但是未注冊到一個EventLoop里面,也就是沒有和Selector綁定
4.頻道的內部實現 ChannelHandler & ChannelPipeline
-
ChannelInboundHandler:(入站) 處理輸入數據和Channel狀態類型改變,適配器。
-
ChannelInboundHandlerAdapter(適配器設計模式) 常用的:SimpleChannelInboundHandler
-
ChannelOutboundHandler:(出站) 處理輸出數據,適配器 ChannelOutboundHandlerAdapter
-
ChannelPipeline: 好比廠里的流水線一樣,可以在上面添加多個ChannelHanler,也可看成是一串
-
ChannelHandler 實例,攔截穿過 Channel 的輸入輸出 event, ChannelPipeline 實現了攔截器的一種高級形 式,使得用戶可以對事件的處理以及ChannelHanler之間交互獲得完全的控制權。
5.頻道的內部實現 ChannelHandler & ChannelPipeline
ChannelHandlerContext是連接ChannelHandler和ChannelPipeline的橋梁,ChannelHandlerContext部分方法和Channel及ChannelPipeline重合。
- 好比調用write方法Channel、ChannelPipeline、ChannelHandlerContext 都可以調用此方法,前兩者都會在整個管道流里 傳播,而ChannelHandlerContext就只會在后續的Handler里面傳播。
- AbstractChannelHandlerContext類雙向鏈表結構,next/prev分別是后繼節點,和前驅節點。
- DefaultChannelHandlerContext 是實現類,但是大部分都是父類那邊完成,這個只是簡單的實現一些方法 主要就是判斷Handler的類型。
- ChannelInboundHandler之間的傳遞,主要通過調用ctx里面的FireXXX()方法來實現下個handler的調用。
6.Handler執行順序
一般的項目中,inboundHandler和outboundHandler有多個,在Pipeline中的執行順序?
InboundHandler順序執行,OutboundHandler逆序執行
- InboundHandler順序執行,OutboundHandler逆序執行
- InboundHandler之間傳遞數據,通過context.fireChannelRead(message)
- InboundHandler通過context.write(message),則會傳遞到outboundHandler
- 使用context.write(msg)傳遞消息,Inbound需要放在結尾,在Outbound之后,不然outboundhandler會不執行; 但是使用channel.write(msg)、pipline.write(msg)情況會不一致,都會執行。
- OutBound和Inbound誰先執行,針對客戶端和服務端而言,客戶端是發起請求再接受數據,先outbound再 inbound,服務端則相反。
四、實戰環節
以上概念性的東西介紹完了之后開始編寫本章實戰代碼(完整的案例代碼將在qq群文件共享里上傳,文章末尾有QQ群二維碼和聯系方式)。接下來我們先看一下項目結構。
Handlers - 主要存放所有處理相關類。
Initializer - 存放初始化tcp服務的相關內容。
appsetting.json - 主要存放的內容為,服務端的相關配置例如:ip地址、端口號等。
dotnetty - 安全證書
Program - 啟動類
項目結構介紹完畢之后,我大致將這個demo分為5個部分來實現具體根據自己需求去設計搭建結構都是可以的,這里的內容僅供參考。
-
第一步,配置構建引導類
1 //主要工作組,設置為2個線程 2 private static readonly IEventLoopGroup bossGroup = new MultithreadEventLoopGroup(4); 3 //子工作組,默認為內核數*2的線程數 4 private static readonly IEventLoopGroup workerGroup = new MultithreadEventLoopGroup(); 5 6 static async Task RunAsync() { 7 /* 8 *初始化服務端引導對象。 9 *聲明一個服務端Bootstrap,每個Netty服務端程序,都由ServerBootstrap控制, 10 *通過鏈式的方式組裝需要的參數 11 */ 12 ServerBootstrap bootstrap = new ServerBootstrap(); 13 //添加工作組 14 bootstrap.Group(bossGroup, workerGroup); 15 //初始化工作頻道 16 bootstrap.Channel<TcpServerSocketChannel>(); 17 bootstrap 18 //存放已完成三次握手的請求的等待隊列的最大長度; 19 .Option(ChannelOption.SoBacklog, 1024) 20 //ByteBuf的分配器(重用緩沖區)大小 21 .Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default) 22 //接收字符的長度 23 .Option(ChannelOption.RcvbufAllocator, new FixedRecvByteBufAllocator(1024 * 8)) 24 //保持長連接 25 .ChildOption(ChannelOption.SoKeepalive, true) 26 //取消延遲發送 27 .ChildOption(ChannelOption.TcpNodelay, true) 28 //端口復用 29 .ChildOption(ChannelOption.SoReuseport, true) 30 //初始化日志攔截器,可以不加 31 .Handler(new LoggingHandler("SRV-LSTN")) 32 //自定義初始化Tcp服務 33 .ChildHandler(new EchoServerInitializer()); 34 35 //綁定服務端,端口號。IP地址默認讀取項目配置文件。 36 await bootstrap.BindAsync(ServerSettings.Port); 37 }
- 第二步,初始化Channel相關處理類
1 /// <summary> 2 /// 初始化 3 /// </summary> 4 public class EchoServerInitializer : ChannelInitializer<ISocketChannel> 5 { 6 /// <summary> 7 /// No interaction time.300s 8 /// </summary> 9 public const int AllTimeOut = 60 * 5; 10 11 /// <summary> 12 /// Read Time Out.60s 13 /// </summary> 14 public const int ReadTimeOut = 60; 15 16 /// <summary> 17 /// Recive Time Out.60s 18 /// </summary> 19 public const int WriterTimeOut = 60; 20 21 protected override void InitChannel(ISocketChannel channel) 22 { 23 /* 24 * 工作線程連接器是設置了一個頻道,服務端主線程所有接收到的信息都會通過這個管道一層層往下傳輸 25 * 同時所有出棧的消息 也要這個頻道的所有處理器進行一步步處理 26 */ 27 IChannelPipeline pipeline = channel.Pipeline; 28 //初始化Dotnetty日志攔截器 29 pipeline.AddLast(new LoggingHandler("SRV-CONN")); 30 //心跳超時時間配置 31 pipeline.AddLast(new IdleStateHandler( 32 ReadTimeOut, 33 WriterTimeOut, 34 AllTimeOut)); 35 //消息內容編碼邏輯處理類 36 pipeline.AddLast("encoder", new EncoderHandler()); 37 //解碼邏輯處理類 38 pipeline.AddLast("decoder", new DecoderHandler()); 39 //心跳邏輯處理 40 pipeline.AddLast(new HeartBeatHandler()); 41 //每個頻道請求消息處理類 42 pipeline.AddLast(new ServerHandler()); 43 } 44 }
- 第三步,配置、實現心跳處理機制
1 public class HeartBeatHandler : ChannelHandlerAdapter 2 { 3 /// <summary> 4 /// 每個頻道都有自己的心跳管理,如果頻道長時間不操作踢掉線的邏輯可以寫在這里 5 /// </summary> 6 /// <param name="context"></param> 7 /// <param name="evt"></param> 8 public override void UserEventTriggered(IChannelHandlerContext context, object evt) 9 { 10 var eventState = evt as IdleStateEvent; 11 if (eventState != null) 12 { 13 String type = string.Empty; 14 if (eventState.State == IdleState.ReaderIdle) 15 { 16 type = "read idle";//沒有任何接受 17 } 18 else if (eventState.State == IdleState.WriterIdle) 19 { 20 type = "write idle";//沒有任何寫入 21 } 22 else if (eventState.State == IdleState.AllIdle) 23 { 24 type = "all idle"; 25 context.CloseAsync();//5分鍾內無任何交互則斷開該客戶端連接 26 } 27 } 28 else 29 { 30 base.UserEventTriggered(context, evt); 31 } 32 } 33 }
- 第四步,編碼、解碼
1 /// <summary> 2 /// 解碼 3 /// </summary> 4 public class DecoderHandler : ByteToMessageDecoder 5 { 6 protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) 7 { 8 throw new NotImplementedException(); 9 } 10 } 11 12 13 14 public class EncoderHandler : MessageToByteEncoder<byte[]> 15 { 16 /// <summary> 17 /// 編碼 18 /// </summary> 19 /// <param name="context"></param> 20 /// <param name="message"></param> 21 /// <param name="output"></param> 22 protected override void Encode(IChannelHandlerContext context, byte[] message, IByteBuffer output) 23 { 24 throw new NotImplementedException(); 25 } 26 }
- 第五步,Channel邏輯處理實現
1 public class ServerHandler : ChannelHandlerAdapter 2 { 3 /* 4 * Channel的生命周期 5 * 1.ChannelRegistered 先注冊 6 * 2.ChannelActive 再被激活 7 * 3.ChannelRead 客戶端與服務端建立連接之后的會話(數據交互) 8 * 4.ChannelReadComplete 讀取客戶端發送的消息完成之后 9 * error. ExceptionCaught 如果在會話過程當中出現dotnetty框架內部異常都會通過Caught方法返回給開發者 10 * 5.ChannelInactive 使當前頻道處於未激活狀態 11 * 6.ChannelUnregistered 取消注冊 12 */ 13 14 /// <summary> 15 /// 頻道注冊 16 /// </summary> 17 /// <param name="context"></param> 18 public override void ChannelRegistered(IChannelHandlerContext context) 19 { 20 base.ChannelRegistered(context); 21 } 22 23 /// <summary> 24 /// socket client 連接到服務端的時候channel被激活的回調函數 25 /// </summary> 26 /// <param name="context"></param> 27 public override void ChannelActive(IChannelHandlerContext context) 28 { 29 //一般可用來記錄連接對象信息 30 base.ChannelActive(context); 31 } 32 33 /// <summary> 34 /// socket接收消息方法具體的實現 35 /// </summary> 36 /// <param name="context">當前頻道的句柄,可使用發送和接收方法</param> 37 /// <param name="message">接收到的客戶端發送的內容</param> 38 public override void ChannelRead(IChannelHandlerContext context, object message) 39 { 40 var buffer = message as IByteBuffer; 41 if (buffer != null) 42 { 43 Console.WriteLine("Received from client: " + buffer.ToString(Encoding.UTF8)); 44 } 45 context.WriteAsync(message);//發送給客戶端方法 46 } 47 48 /// <summary> 49 /// 該次會話讀取完成后回調函數 50 /// </summary> 51 /// <param name="context"></param> 52 public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();// 53 54 /// <summary> 55 /// 異常捕獲 56 /// </summary> 57 /// <param name="context"></param> 58 /// <param name="exception"></param> 59 public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) 60 { 61 Console.WriteLine("Exception: " + exception); 62 context.CloseAsync(); 63 } 64 65 /// <summary> 66 /// 當前頻道未激活狀態 67 /// </summary> 68 /// <param name="context"></param> 69 public override void ChannelInactive(IChannelHandlerContext context) 70 { 71 base.ChannelInactive(context); 72 } 73 74 /// <summary> 75 /// 取消注冊當前頻道,可理解為銷毀當前頻道 76 /// </summary> 77 /// <param name="context"></param> 78 public override void ChannelUnregistered(IChannelHandlerContext context) 79 { 80 base.ChannelUnregistered(context); 81 } 82 }
希望大家多多支持。不勝感激。
- E-Mail:zhuzhen723723@outlook.com
- QQ: 580749909(個人群)
- Blog: https://www.cnblogs.com/justzhuzhu/
- Git: https://github.com/JusterZhu
- 微信公眾號