深入理解 Netty-解碼器架構與常用解碼器


任何數據類型想在網絡中進行傳輸,都得經過編解碼轉換成字節流

在netty中,服務端和客戶端進行通信的其實是下面這樣的

程序 ---編碼--> 網絡

網絡 ---解碼--> 程序

對應服務端:

  • 入站數據, 經過解碼器解碼后給后續的handler使用
  • 出站數據, 經過編碼器編碼成字節流給在網絡上傳播

在netty中的編碼器其實就是一個handler,回想一下,無論是編寫服務端的代碼,還是客戶端的代碼,總會通過一個channelIniteializer往pipeline中動態的添加多個處理器,在添加我們自定義的處理器之前,往往會添加編解碼器,其實說白了,編解碼器其實就是特定功能的handler

我們這樣做是有目的的,因為第一步就得需要把字節流轉換成我們后續的handler中能處理的常見的數據類型

Netty中的編解碼器太多了,下面就用常用的ByteToMessageDecoder介紹他的體系

編碼器的模板基類ByteToMessageDecoder

ByteToMessageDecoder繼承了ChannelInboundHandlerAdapter 說明它是處理入站方向數據的編碼器,而且它也因此是一個不折不扣的Handler,再回想,其實In開頭的handler都是基於事件驅動的,被動的處理器,當客戶端發生某種事件時,它對應有不同的動作回調,而且它的特色就是 fireXXX往下傳遞事件, 待會我們就能看到,netty用它把處理好的數據往下傳遞

架構概述

ByteToMessageDecoder本身是一個抽象類,但是它只有一個抽象方法 decode()

netty中的解碼器的工作流程如下:

  • 累加字節流
  • 調用子類的decode()方法進行解碼
  • 將解析完成的ByteBuf往后傳遞

既然是入棧處理器,有了新的數據,channelRead()就會被回調,我們去看一下它的channelRead()

下面是它的源碼,

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) { // todo 在這里判斷, 是否是 ByteBuf類型的,如果是,進行解碼,不是的話,簡單的往下傳播下去
    CodecOutputList out = CodecOutputList.newInstance();
    try {
        ByteBuf data = (ByteBuf) msg;
        // todo 進入查看 cumulation是類型  累加器,其實就是往 ByteBuf中 write數據,並且,當ByteBuf 內存不夠時進行擴容
        first = cumulation == null; // todo 如果為空, 則說明這是第一次進來的數據, 從沒累加過
        if (first) {
            cumulation = data;  // todo 如果是第一次進來,直接用他將累加器初始化
        } else {
            cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); // todo 非第一次進來,就進行累加
        }
        // todo , 這是第二部, 調用子類的decode()進行解析
        callDecode(ctx, cumulation, out);
    } catch (DecoderException e) {
        throw e;
    } catch (Throwable t) {
        throw new DecoderException(t);
    } finally {
        if (cumulation != null && !cumulation.isReadable()) {
            numReads = 0;
            cumulation.release();
            cumulation = null;
        } else if (++ numReads >= discardAfterReads) {
            // We did enough reads already try to discard some bytes so we not risk to see a OOME.
            // See https://github.com/netty/netty/issues/4275
            numReads = 0;
            discardSomeReadBytes();
        }

        int size = out.size();
        decodeWasNull = !out.insertSinceRecycled();
        // todo 調用 fireChannelRead,向后船舶channelRead事件, 前面的學習也知道,  她會從當前節點,挨個回調pipeline中處理器的CHannelRead方法
        fireChannelRead(ctx, out, size);
        out.recycle();
    }
} else {
    ctx.fireChannelRead(msg);
}

其實三步工作流程就在上面的代碼中

  • 累加字節流 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
  • 調用子類的decode()進行解析 callDecode(ctx, cumulation, out);
  • 將解析完成的ByteBuf往后傳遞 fireChannelRead(ctx, out, size);

它的設計很清晰, 由ByteToMessageDecoder完成整個編碼器的模板,規定好具體的處理流程,首先它負責字節流的累加工作,但是具體如何進行解碼,由不同的子類去實現,因此它設及成了唯一的抽象方法,在他的模板中,子類將數據解碼完成后,它再將數據傳播下去

什么是累加器cumulation?

源碼如下:我們可以看到,其實他就是一個輔助對象, 里面維護了一個 ByteBuf的引用

  • 所謂累加,就是往ByteBuf中write數據
  • 所謂維護,就是 動態判斷ByteBuf中可寫入的區域大小和將寫入的字節的關系
  • 最后,為了防止內存泄露,將收到的ByteBuf 釋放
// todo 創建一個累加器
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
// todo 如果 writerIndex + readableBytes > cumulation.maxCapacity 說明已經無法繼續累加了
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
        || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
    // todo 擴容
    buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
    buffer = cumulation;
}
// todo 往 ByteBuf中寫入數據 完成累加
buffer.writeBytes(in);
// todo 累加完成之后,原數據 釋放掉
in.release();
return buffer;
}
};

第二步, callDecode(ctx, cumulation, out)

我們直接跟進源碼: 可以看到,在把ByteBuf真正通過下面的 decodeRemovalReentryProtection(ctx, in, out);的子類進行解碼時, 它記錄下來了當時ByteBuf中可讀的字節數, 它用這個標記和經過子類處理之后的ByteBuf的可讀的字節數進行比對,從而判斷出子類是否真的讀取成功

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
    int outSize = out.size();

    if (outSize > 0) {// todo 如果盛放解析完成后的數據的 out集合中有數據
        fireChannelRead(ctx, out, outSize); /// todo 傳播channelRead事件,數據也傳遞進去
        out.clear();  // todo 清空out 集合

        if (ctx.isRemoved()) {
            break;
        }
        outSize = 0;
    }

    // todo 記錄 子類使用in之前, in中的可讀的字節
    int oldInputLength = in.readableBytes();

    //todo 調用子類重寫的 decode()
    decodeRemovalReentryProtection(ctx, in, out);
    if (ctx.isRemoved()) {
        break;
    }

    if (outSize == out.size()) { // todo 0 = 經過上面的decode解析后的 out.size()==0 , 說明沒解析出任何東西
        if (oldInputLength == in.readableBytes()) { // todo 第一種情況就是 可能字節數據不夠, 根本沒從in中讀
            break;
        } else {
            continue;  // todo 情況2: 從in中讀了, 但是沒來得及繼續出 內容
        }
    }
    // todo 來到這里就說明,已經解析出數據了 ,
    // todo  解析出數據了  就意味着in中的readIndex被子類改動了, 即 oldInputLength != in.readableBytes()
    // todo 如下現在還相等, 肯定是出問題了
    if (oldInputLength == in.readableBytes()) {
        throw new DecoderException(
                StringUtil.simpleClassName(getClass()) +
                        ".decode() did not read anything but decoded a message.");
    }
    if (isSingleDecode()) {
        break;
    }
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}

如何實現自己的解碼器?

實現自己的解碼器, 就得了解這三個參數分別是什么

  • ctx: 當前的hander所在的 Context
  • cumulation: 累加器,其實就是ByteBuf
  • out: 她其實是個容器, 用來盛放 經過編碼之后的數據,也就是可以被后續的處理器使用 類型

實現的思路就是繼承ByteToMessageDecoder然后重寫它唯一的抽象方法,decode(), 實現的邏輯如下:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    System.out.println("MyDeCoderHandler invoke...");
    System.out.println(in.readableBytes());
    if (in.readableBytes()>=8){
        out.add(in.readLong());
    }
}

常用的編解碼器

固定長度的解碼器FixedLengthFrameDecoder

他里面只維護着一個private final int frameLength;
使用時,我們通過構造函數傳遞給他,他就會按照下面的方式解碼

我們看一下它的javaDoc

 原始數據
 * +---+----+------+----+
 * | A | BC | DEFG | HI |
 * +---+----+------+----+
 
 如果frameLength==3
 * +-----+-----+-----+
 * | ABC | DEF | GHI |
 * +-----+-----+-----+

它的decode() 實現如下

protected Object decode(
    @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
    return null;
} else {
// 從in中截取 frameLength 長度的 字節流
    return in.readRetainedSlice(frameLength);
}
}

行解碼器LineBasedFrameDecoder

她會根據換行符進行解碼, 無論用戶發送過來的數據是以 \r\n 還是 \n 類型的換行符LineBasedFrameDecoder

使用:


   public LineBasedFrameDecoder(final int maxLength) {
        this(maxLength, true, false);
    }
    
  public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
        this.maxLength = maxLength;
        this.failFast = failFast;
        this.stripDelimiter = stripDelimiter;
    }

第一個構造函數

  • 入參位置是我們指定的每一行最大的字節數, 超過了這個大小的所有行,將全部被丟棄
  • 默認跳過分隔符
  • 出現了超過最大值的行,不報異常

第二個構造函數

  • 入參1 是我們指定的每一行最大的字節數, 超過了這個大小的所有行,將全部被丟棄
  • 入參2 指定每次解析是否跳過換行符
  • 入參3 指定出現大於規定的最大字節數時是否報異常

看它重寫的decode()的實現邏輯如下:

它總起來分成四種情況

  • 非丟棄模式
    • 找到了換行符
      • 如 readIndex + 換行符的位置 < maxLength 的關系 --> 解碼
      • 如 readIndex + 換行符的位置 > maxLength的關系 --> 丟棄
    • 未找到換行符
      • 如果可解析的長度 > maxLength --> 丟棄
  • 丟棄模式
    • 找到了換行符
      • 丟棄
    • 未找到換行符
      • 丟棄

基於分隔符的解碼器DelimiterBasedFrameDecoder

它主要有這幾個成員變量, 根據這幾個成員變量,可以選出使用它哪個構造函數

private final ByteBuf[] delimiters;  分隔符,數組
private final int maxFrameLength;    每次能允許的最大解碼長度
private final boolean stripDelimiter;  是否跳過分隔符
private final boolean failFast;      超過最大解碼長度時,是否拋出異常
private boolean discardingTooLongFrame;  是否丟棄超過最大限度的幀
private int tooLongFrameLength;      記錄超過最大范圍的字節數值

分三步

  • 第一, 判斷我們傳遞進入的分隔符是否是\n \r\n 如果是的話,就是用上面的, 行解碼器
  • 第二步, 按照最細的力度進行解碼, 比如, 我們有兩個解碼器, AB, 當前的readIndex 到A, 有2個字節, 到B有3個字節, 就會按照A進行解碼
  • 解碼

基於長度域的解碼器LengthFieldBasedFrameDecoder

通常我們在對特定的網絡協議進行解碼時會用到它,比如說,最典型的http協議, 雖然http協議看起來, 又有請求頭,又有請求體,挺麻煩的,它在網絡中依然是以字節流的方式進行傳輸

基於長度域,指的是在傳輸的協議中有一個 length字段,這個十六進制的字段記錄的可能是整個協議的長度,也可能是消息體的長度, 我們根據具體情況使用不同的構造函數

如何使用呢? 最常用它下面的這個構造函數

public LengthFieldBasedFrameDecoder(
    int maxFrameLength,
    int lengthFieldOffset,
    int lengthFieldLength,
    int lengthAdjustment,
    int initialBytesToStrip) {
this(
        maxFrameLength,
        lengthFieldOffset, lengthFieldLength, lengthAdjustment,
        initialBytesToStrip, true);
}

使用它的前提是,知道這五個參數的意思

  • maxFrameLength 每次解碼所能接受的最大幀的長度
  • lengthFieldOffset 長度域的偏移量

聽着挺高大尚的, 偏移量, 說白了,就是在現有的這段字節數據中找個開始解碼的位置, 大多數設為0, 意為,從o位置 開始解碼

  • lengthFieldLength 字段域的長度, 根據lengthFieldOffset的初始值往后數lengthFieldLength個字節,這段范圍解析出來的數值 可能是 長度域的大小,也可能是整個協議的大小(包括header,body...) 根據不同的協議不同
  • lengthAdjustment 矯正長度
  • initialBytesToStrip 需要取出的長度

下面是javaDoc給的例子

基於長度的拆包

 * BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
 * +--------+----------------+      +--------+----------------+
 * | Length | Actual Content |----->| Length | Actual Content |
 * | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
 * +--------+----------------+      +--------+----------------+
 這是最簡單的情況, 假定 Length的長度就是后面的 真正需要解碼的內容
 
 現在的字節全部解碼后是這樣的  12HELLO, WORLD
 我們要做的就是區分出  12和HELLO, WORLD
 
 * lengthFieldOffset   = 0
 * lengthFieldLength   = 2 // todo 每兩個字節 表示一個數據包
 * lengthAdjustment    = 0
 * initialBytesToStrip = 0

 意思就是:
  字節數組[lengthFieldOffset,lengthFieldLength]之間的內容轉換成十進制,就是后面的字段域的長度
    00 0C ==> 12 
  這個12 意思就是 長度域的長度, 說白了 就是我們想要的 HELLO, WORLD 的長度
  
  這樣一算,就分開了
基於長度的階段拆包
 
  * BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
 * +--------+----------------+      +----------------+
 * | Length | Actual Content |----->| Actual Content |
 * | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
 * +--------+----------------+      +----------------+
 情況2: 
 
 * lengthFieldOffset   = 0
 * lengthFieldLength   = 2 // todo 每兩個字節 表示一個數據包
 * lengthAdjustment    = 0
 * initialBytesToStrip = 2
 
  意思就是
   字節數組[lengthFieldOffset,lengthFieldLength]之間的內容轉換成十進制,就是后面的字段域的長度是
   00 0C ==> 12 
   這個12 意思就是 長度域的長度, 說白了 就是我們想要的 HELLO, WORLD 的長度
  
  然后,  從0開始 忽略 initialBytesToStrip, 就去除了 length ,只留下 HELLO, WORLD
  
  
  

 有時, 在某些其他協議中, length field 可能代表是整個消息的長度, 包括消息頭
       在這種情況下,我們就得指定一個 非零的 lengthAdjustment 去調整
  
  
   * BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
 * +--------+----------------+      +--------+----------------+
 * | Length | Actual Content |----->| Length | Actual Content |
 * | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
 * +--------+----------------+      +--------+----------------+
  
 * lengthFieldOffset   = 0
 * lengthFieldLength   = 2 // todo 每兩個字節 表示一個數據包
 * lengthAdjustment    = -2
 * initialBytesToStrip = 0

    意思就是
    
    字節數組[lengthFieldOffset,lengthFieldLength]之間的內容轉換成十進制,表示整個協議的長度
    00 0C ==> 14  意味,協議全長 14
    現在還是不能區分開  Length 和 Actual Content
  
    公式: 數據包的長度 = 長度域 + lengthFieldOffset + lengthFieldLength +lengthAdjustment
    
    通過他可以算出 lengthAdjustment = -2

基於偏移長度的拆包
    
  
 * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
 * +----------+----------+----------------+      +----------+----------+----------------+
 * | Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
 * |  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
 * +----------+----------+----------------+      +----------+----------+----------------+
  這個例子和第一個例子很像,但是多了頭
  
  我們想拿到后面消息長度的信息,就偏移過header
  
 * lengthFieldOffset   = 2
 * lengthFieldLength   = 3 // todo 每兩個字節 表示一個數據包
 * lengthAdjustment    = 0
 * initialBytesToStrip = 0
  
  
  字節數組[lengthFieldOffset,lengthFieldLength]之間的內容轉換成十進制, 表示長度域的長度
               
  在這里 整好跳過了 header 1,   0x00 00 0C 是三個字節
  也就是  字節數組[lengthFieldOffset,lengthFieldLength]=>[0,3]
  0x00 00 0C == 12 表示長度域是 12
  
  現在也成功區分開了 Header 1 和  Length 和 Actual Content
  分別是 2 3 12
  
基於可調整長度的拆包
  
  
  BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
 * +----------+----------+----------------+      +----------+----------+----------------+
 * |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
 * | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
 * +----------+----------+----------------+      +----------+----------+----------------+
  
 * lengthFieldOffset   = 0
 * lengthFieldLength   = 3 // todo 每兩個字節 表示一個數據包
 * lengthAdjustment    = 2
 * initialBytesToStrip = 0
  
  
  字節數組[lengthFieldOffset,lengthFieldLength]之間的內容轉換成十進制, 表示長度域的長度

  也就是  字節數組[lengthFieldOffset,lengthFieldLength]=>[0,3]
  0x00 00 0C 是三個字節  
  0x00 00 0C == 12 表示長度域是 12 == 長度域的長度 就是 HELLO, WORLD的長度
  但是上面的圖多了一個 兩個字節長度的 Header 1
  下一步進行調整 
  
  公式: 數據包的長度 = 長度域 + lengthFieldOffset + lengthFieldLength +lengthAdjustment
  
  lengthAdjustment= 17-12-0-3=2

基於偏移可調整長度的截斷拆包

 * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
 * +------+--------+------+----------------+      +------+----------------+
 * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
 * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
 * +------+--------+------+----------------+      +------+----------------+
  
 
 * lengthFieldOffset   = 1
 * lengthFieldLength   = 2 // todo 每兩個字節 表示一個數據包
 * lengthAdjustment    = 1
 * initialBytesToStrip = 3
 
 lengthFieldOffset =1 偏移1字節 跨過 HDR1
 
 lengthFieldLength =2 從[1,2] ==> 0x000C =12 表示長度域的值
 
 看拆包后的結果,后面明顯還多了個 HDR2 ,進行調整
 公式:  數據包值 = 長度域  + lengthFieldOffset+ lengthFieldLength + lengthAdjustment
 算出 lengthAdjustment = 16 - 12 - 1 - 2 = 1
 
 結果值只有 HDR2 和  Actual Content , 說明,前面通過 initialBytesToStrip 進行忽略
 initialBytesToStrip =3
 
基於偏移可調整長度的 變種 截斷拆包
 
  * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
 * +------+--------+------+----------------+      +------+----------------+
 * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
 * | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
 * +------+--------+------+----------------+      +------+----------------+
 
 
 * lengthFieldOffset   = 1
 * lengthFieldLength   = 2 // todo 每兩個字節 表示一個數據包
 * lengthAdjustment    = -3
 * initialBytesToStrip = 3
 
 同樣
 看結果,保留 HDR2 和 Actual Content
 
 lengthFieldOffset   = 1 表示跳過開頭的 HDR1
 [1,2] ==> 00 10 , 算出的 長度域的值==10 很顯然這不對
 
 10 < 13
 
 我們要想拆出后面的數據包就得在現有的基礎上往左移動三個字節 -3個調整量


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM