一、概要
本章主要內容就是講解如何在dotnetty的框架中進行網絡通訊以及編解碼對象、數據包分包拆包的相關知識點。
后續會專門開一篇避坑的文章,主要會描述在使用dotnetty的框架時會遇到的哪些問題幫助各位開發者在使用過程當中出現問題,會不斷的收集問題不斷的更新肯定是附帶問題的解決方案的。
希望有興趣的小伙伴可以提供相關的“坑”一起更新一起解決困難,讓dotnetty的框架更容易使用。
二、簡介
1.什么是編碼、解碼
2.解碼器Decoder講解
3.編碼器Encoder講解
4.編解碼器類Codec講解
5.網絡傳輸TCP粘包拆包
6.核心模塊緩沖ByteBuffer
7.實戰環節
8.Dotnetty所用到的設計模式
三、詳細內容
1.什么是編碼、解碼
前面說的:高性能RPC框架的3個要素:IO模型、數據協議、線程模型
最開始接觸的編碼碼:序列化/反序列化(就是編解碼)、url編碼、base64編解碼
業界里面也有其他編碼框架: google的 protobuf(PB)、Facebook的Trift、json等
DotNetty里面的編解碼:
解碼器:負責處理入站 InboundHandler”數據
編碼器:負責出站 OutboundHandler” 數據
DotNetty里面提供默認的編解碼器,也支持自定義編解碼器
Encoder:編碼器
Decoder:解碼器
Codec:編解碼器
2.解碼器Decoder講解
Decoder對應的就是ChannelInboundHandler,主要就是字節數組轉換為消息對象
主要是兩個方法 decode decodeLast
抽象解碼器
- ByteToMessageDecoder用於將字節轉為消息,需要檢查緩沖區是否有足夠的字節
- ReplayingDecoder繼承ByteToMessageDecoder,不需要檢查緩沖區是否有足夠的字節,但是ReplayingDecoder速度略滿於ByteToMessageDecoder,不是所有的ByteBuf都支持。
- 選擇:項目復雜性高則使用ReplayingDecoder,否則使用 ByteToMessageDecoder
- MessageToMessageDecoder用於從一種消息解碼為另外一種消息(例如POJO到POJO)
解碼器具體的實現,用的比較多的是(更多是為了解決TCP底層的粘包和拆包問題)
- DelimiterBasedFrameDecoder: 指定消息分隔符的解碼器
- LineBasedFrameDecoder: 以換行符為結束標志的解碼器
- FixedLengthFrameDecoder:固定長度解碼器
- LengthFieldBasedFrameDecoder:message = header+body, 基於長度解碼的通用解碼器
- StringDecoder:文本解碼器,將接收到的對象轉化為字符串,一般會與上面的進行配合,然后在后面添加業務handle
3.編碼器Encoder講解
Encoder對應的就是ChannelOutboundHandler,消息對象轉換為字節數組
Netty本身未提供和解碼一樣的編碼器,是因為場景不同,兩者非對等的
- MessageToByteEncoder消息轉為字節數組,調用write方法,會先判斷當前編碼器是否支持需要發送的消息類
型,如果不支持,則透傳;
- MessageToMessageEncoder用於從一種消息編碼為另外一種消息
4.編解碼器類Codec講解
組合解碼器和編碼器,以此提供對於字節和消息都相同的操作
優點:成對出現,編解碼都是在一個類里面完成
缺點:耦合在一起,拓展性不佳
Codec:組合編解碼
1)ByteToMessageCodec
2)MessageToMessageCodec
decoder:解碼
1)ByteToMessageDecoder
2)MessageToMessageDecoder
encoder:編碼
1)ByteToMessageEncoder
2)MessageToMessageEncoder
5.網絡傳輸TCP粘包拆包
- TCP拆包: 一個完整的包可能會被TCP拆分為多個包進行發送
- TCP粘包: 把多個小的包封裝成一個大的數據包發送, client發送的若干數據包 Server接收時粘成一包發送方和接收方都可能出現這個原因
- 發送方的原因:TCP默認會使用Nagle算法
- 接收方的原因: TCP接收到數據放置緩存中,應用程序從緩存中讀取
- UDP: 是沒有粘包和拆包的問題,有邊界協議
應用層解決半包讀寫的辦法:
1.設置定長消息 (10字符)
123456789 123456789 123456789 123456789
2.設置消息的邊界 ( | | 切割)
123456789||123456789||123456789||
3.使用帶消息頭的協議,消息頭存儲消息開始標識及消息的長度信息
DelimiterBasedFrameDecoder: 指定消息分隔符的解碼器 LineBasedFrameDecoder: 以換行符為結束標志的解碼器
FixedLengthFrameDecoder:固定長度解碼器 LengthFieldBasedFrameDecoder:message = header+body, 基於長
度解碼的通用解碼器。
使用解碼器LineBasedFrameDecoder解決半包讀寫
1)LineBaseFrameDecoder 以換行符為結束標志的解碼器 ,構造函數里面的數字表示最長遍歷的幀數
2)StringDecoder解碼器將對象轉成字符串。
- 自定義分隔符解決TCP讀寫
MaxLength:表示一行最大的長度,如果超過這個長度依然沒有檢測自定義分隔符,將會拋出
TooLongFrameException
FailFast:如果為true,則超出maxLength后立即拋出TooLongFrameException,不進行繼續解碼.如果為
False,則等到完整的消息被解碼后,再拋出TooLongFrameException異常
StripDelimiter:解碼后的消息是否去除掉分隔符
Delimiters:分隔符,ByteBuf類型
- 自定義長度半包讀寫器LengthFieldBasedFrameDecoder
MaxFrameLength 數據包的最大長度
LengthFieldOffset 長度字段的偏移位,長度字段開始的地方,意思是跳過指定長度個字節之后的才是消息體字段
LengthFieldLength 長度字段占的字節數, 幀數據長度的字段本身的長度
LengthAdjustment
一般 Header + Body,添加到長度字段的補償值,如果為負數,開發人員認為這個 Header的長度字段是整個消息
包的長度,則Netty應該減去對應的數字
InitialBytesToStrip 從解碼幀中第一次去除的字節數, 獲取完一個完整的數據包之后,忽略前面的指定位數的長度字節,
應用解碼器拿到的就是不帶長度域的數據包
6.核心模塊緩沖ByteBuffer
ByteBuf:傳遞字節數據的容器
ByteBuf的創建方法
1)ByteBufAllocator
池化( PooledByteBufAllocator提高性能並且最大程度減少內存碎片
非池化UnpooledByteBufAllocator: 每次返回新的實例
2)Unpooled: 提供靜態方法創建未池化的ByteBuf,可以創建堆內存和直接內存緩沖區
ByteBuf使用模式
堆緩存區HEAP BUFFER:
優點:存儲在的堆空間中,可以快速的分配和釋放
缺點:每次使用前會拷貝到直接緩存區(也叫堆外內存)
直接緩存區DIRECR BUFFER:
優點:存儲在堆外內存上,堆外分配的直接內存,不會占用堆空間
缺點:內存的分配和釋放,比在堆緩沖區更復雜
復合緩沖區COMPOSITE BUFFER:
可以創建多個不同的ByteBuf,然后放在一起,但是只是一個視圖
選擇:大量IO數據讀寫,用“直接緩存區”; 業務消息編解碼用“堆緩存區”
四、實戰環節
實戰環節使用的編解碼器是
- ByteToMessageDecoder
- MessageToByteEncoder
數據包結構定義(https://www.cnblogs.com/justzhuzhu/p/12129328.html)之前已經在其他文章里寫過了,所以這里直接開始編解碼的操作。
解碼
/// <summary> /// Decoder Packet /// </summary> public class DecoderHandler : ByteToMessageDecoder { private readonly PacketParser packetParser = new PacketParser(); protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) { var outputBufferList = new List<byte[]>(); var resultByte = new byte[input.ReadableBytes]; input.ReadBytes(resultByte); packetParser.TryParsing(ref resultByte,ref outputBufferList); output.AddRange(outputBufferList); input.Clear(); } }
編碼
/// <summary> /// Encoder Packet /// </summary> public class EncoderHandler : MessageToByteEncoder<RpcResponse<Gimind.Infrastructure.Common.Packet.IMessage>> { protected override void Encode(IChannelHandlerContext context, RpcResponse<Gimind.Infrastructure.Common.Packet.IMessage> message, IByteBuffer output) { var arry = SerializePacket(message.Length, message.Header, message.Body); output.WriteBytes(arry); } public byte[] SerializeHeader(RespHeader header) { header.Checkbit = Header.Checkbit; var headerArry = new byte[25]; try { BytesWriter.Write(header.Checkbit, ref headerArry, 0); //4 BytesWriter.Write(header.RequestId, ref headerArry, 4); //8 BytesWriter.Write(header.Code, ref headerArry, 12); //4 BytesWriter.Write(header.IsEncrypt, ref headerArry, 16);//1 BytesWriter.Write(header.CommandId, ref headerArry, 17);//4 BytesWriter.Write(header.Ext1, ref headerArry, 21); //4 } catch (Exception ex) { NLogger.Error("SerializeHeader",ex.Message,ex); } return headerArry; } private byte[] SerializePacket(int length,RespHeader header,IMessage body) { try { var Header = SerializeHeader(header); length += Header.Length; byte[] Body = null; var protobytes = SerializerUtilitys.Serialize(body); if (protobytes != null) { Body = protobytes; length += Body.Length; } var packageArry = new byte[4 + length]; BytesWriter.Write(length, ref packageArry, 0); BytesWriter.Write(Header, ref packageArry, 4); if (body != null) { BytesWriter.Write(Body, ref packageArry, 4 + RespHeader.Length); } return packageArry; } catch (Exception ex) { NLogger.Error("SerializeHeader", ex.Message, ex); } return null; } }
分包拆包邏輯
public class PacketParser { private readonly List<byte[]> _bufferList = new List<byte[]>(); public void TryParsing(ref byte[] inBytes, ref List<byte[]> outBytes) { try { _bufferList.Add(inBytes); var tempBuffer = new byte[_bufferList.Sum(item => item.Length)]; var size = 0; foreach (var item in _bufferList) { item.CopyTo(tempBuffer, size); size += item.Length; } if (tempBuffer.Length < 4) return; var packetLen = BytesReader.ReadInt32(ref tempBuffer, 0); if (tempBuffer.Length < (4 + packetLen)) { return; } if (tempBuffer.Length == (4 + packetLen)) { _bufferList.Clear(); outBytes.Add(tempBuffer); } if (tempBuffer.Length > (4 + packetLen)) { var left = new byte[4 + packetLen]; Array.Copy(tempBuffer, 0, left, 0, left.Length); var right = new byte[tempBuffer.Length - left.Length]; Array.Copy(tempBuffer, left.Length, right, 0, right.Length); _bufferList.Clear(); outBytes.Add(left); TryParsing(ref right, ref outBytes); } } catch (Exception ex) { NLogger.Error("PacketParser Error", ex.Message, ex); } } }
Protobuffer
using ProtoBuf; using System; using System.IO; namespace Protobuffer.Utilities { public class SerializerUtilitys { /// <summary> /// 序列化 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="serializeObj">序列化對象</param> /// <returns></returns> public static byte[] Serialize<T>(T serializeObj) { try { using (var stream = new MemoryStream()) { ProtoBuf.Serializer.Serialize<T>(stream, serializeObj); var result = new byte[stream.Length]; stream.Position = 0L; stream.Read(result, 0, result.Length); return result; } } catch (Exception e) { return null; } } /// <summary> /// 反序列化 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="bytes">二進制對象數組</param> /// <returns></returns> public static T DeSerialize<T>(byte[] bytes) { try { using (var stream = new MemoryStream()) { stream.Write(bytes, 0, bytes.Length); stream.Position = 0L; return ProtoBuf.Serializer.Deserialize<T>(stream); } } catch (Exception e) { Console.WriteLine(e); return default(T); } } } }
如果你看到這里,可能會有意外的收獲。在DotNetty里面的應用里用到如下設計模式:
- Builder構造器模式:ServerBootstap
- 責任鏈設計模式:pipeline的事件傳播
- 工廠模式: 創建Channel
- 適配器模式:HandlerAdapter
- 推薦書籍:《大話設計模式》《Head First設計模式》《CLR VIA C#》《大型網站技術架構 核心原理與案例分析》《.net 框架設計》《.net 性能優化》《編寫高性能的.net代碼》
希望大家多多支持。不勝感激。
- E-Mail:zhuzhen723723@outlook.com
- QQ: 580749909(個人群)
- Blog: https://www.cnblogs.com/justzhuzhu/
- Git: https://github.com/JusterZhu
- 微信公眾號