任何數據類型想在網絡中進行傳輸,都得經過編解碼轉換成字節流
在netty中,服務端和客戶端進行通信的其實是下面這樣的
程序 ---編碼--> 網絡
網絡 ---解碼--> 程序
對應服務端:
- 入站數據, 經過解碼器解碼后給后續的handler使用
- 出站數據, 經過編碼器編碼成字節流給在網絡上傳播
在netty中的編碼器其實就是一個handler,回想一下,無論是編寫服務端的代碼,還是客戶端的代碼,總會通過一個channelIniteializer往pipeline中動態的添加多個處理器,在添加我們自定義的處理器之前,往往會添加編解碼器,其實說白了,編解碼器其實就是特定功能的handler
我們這樣做是有目的的,因為第一步就得需要把字節流轉換成我們后續的handler中能處理的常見的數據類型
Netty中的編解碼器太多了,下面就用常用的ByteToMessageDecoder
介紹他的體系
編碼器的模板基類ByteToMessageDecoder
ByteToMessageDecoder
繼承了ChannelInboundHandlerAdapter
說明它是處理入站方向數據的編碼器,而且它也因此是一個不折不扣的Handler,再回想,其實In開頭的handler都是基於事件驅動的,被動的處理器,當客戶端發生某種事件時,它對應有不同的動作回調,而且它的特色就是 fireXXX往下傳遞事件, 待會我們就能看到,netty用它把處理好的數據往下傳遞
架構概述
ByteToMessageDecoder
本身是一個抽象類,但是它只有一個抽象方法 decode()
netty中的解碼器的工作流程如下:
- 累加字節流
- 調用子類的
decode()
方法進行解碼 - 將解析完成的
ByteBuf
往后傳遞
既然是入棧處理器,有了新的數據,channelRead()就會被回調,我們去看一下它的channelRead()
下面是它的源碼,
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) { // todo 在這里判斷, 是否是 ByteBuf類型的,如果是,進行解碼,不是的話,簡單的往下傳播下去
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
// todo 進入查看 cumulation是類型 累加器,其實就是往 ByteBuf中 write數據,並且,當ByteBuf 內存不夠時進行擴容
first = cumulation == null; // todo 如果為空, 則說明這是第一次進來的數據, 從沒累加過
if (first) {
cumulation = data; // todo 如果是第一次進來,直接用他將累加器初始化
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); // todo 非第一次進來,就進行累加
}
// todo , 這是第二部, 調用子類的decode()進行解析
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
// todo 調用 fireChannelRead,向后船舶channelRead事件, 前面的學習也知道, 她會從當前節點,挨個回調pipeline中處理器的CHannelRead方法
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
其實三步工作流程就在上面的代碼中
- 累加字節流
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
- 調用子類的decode()進行解析
callDecode(ctx, cumulation, out);
- 將解析完成的
ByteBuf
往后傳遞fireChannelRead(ctx, out, size);
它的設計很清晰, 由ByteToMessageDecoder
完成整個編碼器的模板,規定好具體的處理流程,首先它負責字節流的累加工作,但是具體如何進行解碼,由不同的子類去實現,因此它設及成了唯一的抽象方法,在他的模板中,子類將數據解碼完成后,它再將數據傳播下去
什么是累加器cumulation
?
源碼如下:我們可以看到,其實他就是一個輔助對象, 里面維護了一個 ByteBuf
的引用
- 所謂累加,就是往
ByteBuf
中write數據 - 所謂維護,就是 動態判斷
ByteBuf
中可寫入的區域大小和將寫入的字節的關系 - 最后,為了防止內存泄露,將收到的
ByteBuf
釋放
// todo 創建一個累加器
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
// todo 如果 writerIndex + readableBytes > cumulation.maxCapacity 說明已經無法繼續累加了
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
// todo 擴容
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
// todo 往 ByteBuf中寫入數據 完成累加
buffer.writeBytes(in);
// todo 累加完成之后,原數據 釋放掉
in.release();
return buffer;
}
};
第二步, callDecode(ctx, cumulation, out)
我們直接跟進源碼: 可以看到,在把ByteBuf
真正通過下面的 decodeRemovalReentryProtection(ctx, in, out);
的子類進行解碼時, 它記錄下來了當時ByteBuf
中可讀的字節數, 它用這個標記和經過子類處理之后的ByteBuf
的可讀的字節數進行比對,從而判斷出子類是否真的讀取成功
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {// todo 如果盛放解析完成后的數據的 out集合中有數據
fireChannelRead(ctx, out, outSize); /// todo 傳播channelRead事件,數據也傳遞進去
out.clear(); // todo 清空out 集合
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
// todo 記錄 子類使用in之前, in中的可讀的字節
int oldInputLength = in.readableBytes();
//todo 調用子類重寫的 decode()
decodeRemovalReentryProtection(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) { // todo 0 = 經過上面的decode解析后的 out.size()==0 , 說明沒解析出任何東西
if (oldInputLength == in.readableBytes()) { // todo 第一種情況就是 可能字節數據不夠, 根本沒從in中讀
break;
} else {
continue; // todo 情況2: 從in中讀了, 但是沒來得及繼續出 內容
}
}
// todo 來到這里就說明,已經解析出數據了 ,
// todo 解析出數據了 就意味着in中的readIndex被子類改動了, 即 oldInputLength != in.readableBytes()
// todo 如下現在還相等, 肯定是出問題了
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
如何實現自己的解碼器?
實現自己的解碼器, 就得了解這三個參數分別是什么
- ctx: 當前的hander所在的 Context
- cumulation: 累加器,其實就是
ByteBuf
- out: 她其實是個容器, 用來盛放 經過編碼之后的數據,也就是可以被后續的處理器使用 類型
實現的思路就是繼承ByteToMessageDecoder
然后重寫它唯一的抽象方法,decode()
, 實現的邏輯如下:
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyDeCoderHandler invoke...");
System.out.println(in.readableBytes());
if (in.readableBytes()>=8){
out.add(in.readLong());
}
}
常用的編解碼器
固定長度的解碼器FixedLengthFrameDecoder
他里面只維護着一個private final int frameLength;
使用時,我們通過構造函數傳遞給他,他就會按照下面的方式解碼
我們看一下它的javaDoc
原始數據
* +---+----+------+----+
* | A | BC | DEFG | HI |
* +---+----+------+----+
如果frameLength==3
* +-----+-----+-----+
* | ABC | DEF | GHI |
* +-----+-----+-----+
它的decode()
實現如下
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
// 從in中截取 frameLength 長度的 字節流
return in.readRetainedSlice(frameLength);
}
}
行解碼器LineBasedFrameDecoder
她會根據換行符進行解碼, 無論用戶發送過來的數據是以 \r\n 還是 \n 類型的換行符LineBasedFrameDecoder
使用:
public LineBasedFrameDecoder(final int maxLength) {
this(maxLength, true, false);
}
public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
this.maxLength = maxLength;
this.failFast = failFast;
this.stripDelimiter = stripDelimiter;
}
第一個構造函數
- 入參位置是我們指定的每一行最大的字節數, 超過了這個大小的所有行,將全部被丟棄
- 默認跳過分隔符
- 出現了超過最大值的行,不報異常
第二個構造函數
- 入參1 是我們指定的每一行最大的字節數, 超過了這個大小的所有行,將全部被丟棄
- 入參2 指定每次解析是否跳過換行符
- 入參3 指定出現大於規定的最大字節數時是否報異常
看它重寫的decode()
的實現邏輯如下:
它總起來分成四種情況
- 非丟棄模式
- 找到了換行符
- 如 readIndex + 換行符的位置 < maxLength 的關系 --> 解碼
- 如 readIndex + 換行符的位置 > maxLength的關系 --> 丟棄
- 未找到換行符
- 如果可解析的長度 > maxLength --> 丟棄
- 找到了換行符
- 丟棄模式
- 找到了換行符
- 丟棄
- 未找到換行符
- 丟棄
- 找到了換行符
基於分隔符的解碼器DelimiterBasedFrameDecoder
它主要有這幾個成員變量, 根據這幾個成員變量,可以選出使用它哪個構造函數
private final ByteBuf[] delimiters; 分隔符,數組
private final int maxFrameLength; 每次能允許的最大解碼長度
private final boolean stripDelimiter; 是否跳過分隔符
private final boolean failFast; 超過最大解碼長度時,是否拋出異常
private boolean discardingTooLongFrame; 是否丟棄超過最大限度的幀
private int tooLongFrameLength; 記錄超過最大范圍的字節數值
分三步
- 第一, 判斷我們傳遞進入的分隔符是否是
\n \r\n
如果是的話,就是用上面的, 行解碼器 - 第二步, 按照最細的力度進行解碼, 比如, 我們有兩個解碼器, AB, 當前的readIndex 到A, 有2個字節, 到B有3個字節, 就會按照A進行解碼
- 解碼
基於長度域的解碼器LengthFieldBasedFrameDecoder
通常我們在對特定的網絡協議進行解碼時會用到它,比如說,最典型的http協議, 雖然http協議看起來, 又有請求頭,又有請求體,挺麻煩的,它在網絡中依然是以字節流的方式進行傳輸
基於長度域,指的是在傳輸的協議中有一個 length字段,這個十六進制的字段記錄的可能是整個協議的長度,也可能是消息體的長度, 我們根據具體情況使用不同的構造函數
如何使用呢? 最常用它下面的這個構造函數
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset,
int lengthFieldLength,
int lengthAdjustment,
int initialBytesToStrip) {
this(
maxFrameLength,
lengthFieldOffset, lengthFieldLength, lengthAdjustment,
initialBytesToStrip, true);
}
使用它的前提是,知道這五個參數的意思
- maxFrameLength 每次解碼所能接受的最大幀的長度
- lengthFieldOffset 長度域的偏移量
聽着挺高大尚的, 偏移量, 說白了,就是在現有的這段字節數據中找個開始解碼的位置, 大多數設為0, 意為,從o位置 開始解碼
- lengthFieldLength 字段域的長度, 根據lengthFieldOffset的初始值往后數lengthFieldLength個字節,這段范圍解析出來的數值 可能是 長度域的大小,也可能是整個協議的大小(包括header,body...) 根據不同的協議不同
- lengthAdjustment 矯正長度
- initialBytesToStrip 需要取出的長度
下面是javaDoc給的例子
基於長度的拆包
* BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
這是最簡單的情況, 假定 Length的長度就是后面的 真正需要解碼的內容
現在的字節全部解碼后是這樣的 12HELLO, WORLD
我們要做的就是區分出 12和HELLO, WORLD
* lengthFieldOffset = 0
* lengthFieldLength = 2 // todo 每兩個字節 表示一個數據包
* lengthAdjustment = 0
* initialBytesToStrip = 0
意思就是:
字節數組[lengthFieldOffset,lengthFieldLength]之間的內容轉換成十進制,就是后面的字段域的長度
00 0C ==> 12
這個12 意思就是 長度域的長度, 說白了 就是我們想要的 HELLO, WORLD 的長度
這樣一算,就分開了
基於長度的階段拆包
* BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
* +--------+----------------+ +----------------+
* | Length | Actual Content |----->| Actual Content |
* | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
* +--------+----------------+ +----------------+
情況2:
* lengthFieldOffset = 0
* lengthFieldLength = 2 // todo 每兩個字節 表示一個數據包
* lengthAdjustment = 0
* initialBytesToStrip = 2
意思就是
字節數組[lengthFieldOffset,lengthFieldLength]之間的內容轉換成十進制,就是后面的字段域的長度是
00 0C ==> 12
這個12 意思就是 長度域的長度, 說白了 就是我們想要的 HELLO, WORLD 的長度
然后, 從0開始 忽略 initialBytesToStrip, 就去除了 length ,只留下 HELLO, WORLD
有時, 在某些其他協議中, length field 可能代表是整個消息的長度, 包括消息頭
在這種情況下,我們就得指定一個 非零的 lengthAdjustment 去調整
* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
* lengthFieldOffset = 0
* lengthFieldLength = 2 // todo 每兩個字節 表示一個數據包
* lengthAdjustment = -2
* initialBytesToStrip = 0
意思就是
字節數組[lengthFieldOffset,lengthFieldLength]之間的內容轉換成十進制,表示整個協議的長度
00 0C ==> 14 意味,協議全長 14
現在還是不能區分開 Length 和 Actual Content
公式: 數據包的長度 = 長度域 + lengthFieldOffset + lengthFieldLength +lengthAdjustment
通過他可以算出 lengthAdjustment = -2
基於偏移長度的拆包
* BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
* +----------+----------+----------------+ +----------+----------+----------------+
* | Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
* | 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
* +----------+----------+----------------+ +----------+----------+----------------+
這個例子和第一個例子很像,但是多了頭
我們想拿到后面消息長度的信息,就偏移過header
* lengthFieldOffset = 2
* lengthFieldLength = 3 // todo 每兩個字節 表示一個數據包
* lengthAdjustment = 0
* initialBytesToStrip = 0
字節數組[lengthFieldOffset,lengthFieldLength]之間的內容轉換成十進制, 表示長度域的長度
在這里 整好跳過了 header 1, 0x00 00 0C 是三個字節
也就是 字節數組[lengthFieldOffset,lengthFieldLength]=>[0,3]
0x00 00 0C == 12 表示長度域是 12
現在也成功區分開了 Header 1 和 Length 和 Actual Content
分別是 2 3 12
基於可調整長度的拆包
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
* +----------+----------+----------------+ +----------+----------+----------------+
* | Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
* | 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
* +----------+----------+----------------+ +----------+----------+----------------+
* lengthFieldOffset = 0
* lengthFieldLength = 3 // todo 每兩個字節 表示一個數據包
* lengthAdjustment = 2
* initialBytesToStrip = 0
字節數組[lengthFieldOffset,lengthFieldLength]之間的內容轉換成十進制, 表示長度域的長度
也就是 字節數組[lengthFieldOffset,lengthFieldLength]=>[0,3]
0x00 00 0C 是三個字節
0x00 00 0C == 12 表示長度域是 12 == 長度域的長度 就是 HELLO, WORLD的長度
但是上面的圖多了一個 兩個字節長度的 Header 1
下一步進行調整
公式: 數據包的長度 = 長度域 + lengthFieldOffset + lengthFieldLength +lengthAdjustment
lengthAdjustment= 17-12-0-3=2
基於偏移可調整長度的截斷拆包
* BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
* +------+--------+------+----------------+ +------+----------------+
* | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
* | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
* +------+--------+------+----------------+ +------+----------------+
* lengthFieldOffset = 1
* lengthFieldLength = 2 // todo 每兩個字節 表示一個數據包
* lengthAdjustment = 1
* initialBytesToStrip = 3
lengthFieldOffset =1 偏移1字節 跨過 HDR1
lengthFieldLength =2 從[1,2] ==> 0x000C =12 表示長度域的值
看拆包后的結果,后面明顯還多了個 HDR2 ,進行調整
公式: 數據包值 = 長度域 + lengthFieldOffset+ lengthFieldLength + lengthAdjustment
算出 lengthAdjustment = 16 - 12 - 1 - 2 = 1
結果值只有 HDR2 和 Actual Content , 說明,前面通過 initialBytesToStrip 進行忽略
initialBytesToStrip =3
基於偏移可調整長度的 變種 截斷拆包
* BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
* +------+--------+------+----------------+ +------+----------------+
* | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
* | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
* +------+--------+------+----------------+ +------+----------------+
* lengthFieldOffset = 1
* lengthFieldLength = 2 // todo 每兩個字節 表示一個數據包
* lengthAdjustment = -3
* initialBytesToStrip = 3
同樣
看結果,保留 HDR2 和 Actual Content
lengthFieldOffset = 1 表示跳過開頭的 HDR1
[1,2] ==> 00 10 , 算出的 長度域的值==10 很顯然這不對
10 < 13
我們要想拆出后面的數據包就得在現有的基礎上往左移動三個字節 -3個調整量