Netty源碼分析 (九)----- 拆包器的奧秘


Netty 的解碼器有很多種,比如基於長度的,基於分割符的,私有協議的。但是,總體的思路都是一致的。

拆包思路:當數據滿足了 解碼條件時,將其拆開。放到數組。然后發送到業務 handler 處理。

半包思路: 當讀取的數據不夠時,先存起來,直到滿足解碼條件后,放進數組。送到業務 handler 處理。

拆包的原理

在沒有netty的情況下,用戶如果自己需要拆包,基本原理就是不斷從TCP緩沖區中讀取數據,每次讀取完都需要判斷是否是一個完整的數據包

1.如果當前讀取的數據不足以拼接成一個完整的業務數據包,那就保留該數據,繼續從tcp緩沖區中讀取,直到得到一個完整的數據包
2.如果當前讀到的數據加上已經讀取的數據足夠拼接成一個數據包,那就將已經讀取的數據拼接上本次讀取的數據,夠成一個完整的業務數據包傳遞到業務邏輯,多余的數據仍然保留,以便和下次讀到的數據嘗試拼接

netty中拆包的基類

netty 中的拆包也是如上這個原理,在每個SocketChannel中會一個 pipeline ,pipeline 內部會加入解碼器,解碼器都繼承基類 ByteToMessageDecoder,其內部會有一個累加器,每次從當前SocketChannel讀取到數據都會不斷累加,然后嘗試對累加到的數據進行拆包,拆成一個完整的業務數據包,下面我們先詳細分析下這個類

看名字的意思是:將字節轉換成消息的解碼器。人如其名。而他本身也是一個入站 handler,所以,我們還是從他的 channelRead 方法入手。

channelRead 方法

我們先看看基類中的屬性,cumulation是此基類中的一個 ByteBuf 類型的累積區,每次從當前SocketChannel讀取到數據都會不斷累加,然后嘗試對累加到的數據進行拆包,拆成一個完整的業務數據包,如果不夠一個完整的數據包,則等待下一次從TCP的數據到來,繼續累加到此cumulation中

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    //累積區
    ByteBuf cumulation;
    private ByteToMessageDecoder.Cumulator cumulator;
    private boolean singleDecode;
    private boolean decodeWasNull;
    private boolean first;
    private int discardAfterReads;
    private int numReads;
    .
    .
    .
}

channelRead方法是每次從TCP緩沖區讀到數據都會調用的方法,觸發點在AbstractNioByteChannelread方法中,里面有個while循環不斷讀取,讀取到一次就觸發一次channelRead

 1 @Override
 2 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 3     if (msg instanceof ByteBuf) {
 4         // 從對象池中取出一個List
 5         CodecOutputList out = CodecOutputList.newInstance();
 6         try {
 7             ByteBuf data = (ByteBuf) msg;
 8             first = cumulation == null;
 9             if (first) {
10                 // 第一次解碼
11                 cumulation = data;//直接賦值
12             } else {
13                  // 第二次解碼,就將 data 向 cumulation 追加,並釋放 data
14                 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
15             }
16             // 得到追加后的 cumulation 后,調用 decode 方法進行解碼
17             // 主要目的是將累積區cumulation的內容 decode 到 out數組中
18             callDecode(ctx, cumulation, out);
19         } catch (DecoderException e) {
20             throw e;
21         } catch (Throwable t) {
22             throw new DecoderException(t);
23         } finally {
24             // 如果累計區沒有可讀字節了,有可能在上面callDecode方法中已經將cumulation全部讀完了,此時writerIndex==readerIndex
25             // 每讀一個字節,readerIndex會+1
26             if (cumulation != null && !cumulation.isReadable()) {
27                 // 將次數歸零
28                 numReads = 0;
29                 // 釋放累計區,因為累計區里面的字節都全部讀完了
30                 cumulation.release();
31                 // 便於 gc
32                 cumulation = null;
33             // 如果超過了 16 次,還有字節沒有讀完,就將已經讀過的數據丟棄,將 readIndex 歸零。
34             } else if (++ numReads >= discardAfterReads) {
35                 numReads = 0;
36                 //將已經讀過的數據丟棄,將 readIndex 歸零。
37                 discardSomeReadBytes();
38             }
39 
40             int size = out.size();
41             decodeWasNull = !out.insertSinceRecycled();
42             //循環數組,向后面的 handler 發送數據
43             fireChannelRead(ctx, out, size);
44             out.recycle();
45         }
46     } else {
47         ctx.fireChannelRead(msg);
48     }
49 }
  1. 從對象池中取出一個空的數組。
  2. 判斷成員變量是否是第一次使用,將 unsafe 中傳遞來的數據寫入到這個 cumulation 累積區中。
  3. 寫到累積區后,在callDecode方法中調用子類的 decode 方法,嘗試將累積區的內容解碼,每成功解碼一個,就調用后面節點的 channelRead 方法。若沒有解碼成功,什么都不做。
  4. 如果累積區沒有未讀數據了,就釋放累積區。
  5. 如果還有未讀數據,且解碼超過了 16 次(默認),就對累積區進行壓縮。將讀取過的數據清空,也就是將 readIndex 設置為0.
  6. 調用 fireChannelRead 方法,將數組中的元素發送到后面的 handler 中。
  7. 將數組清空。並還給對象池。

下面來說說詳細的步驟。

寫入累積區

如果當前累加器沒有數據,就直接跳過內存拷貝,直接將字節容器的指針指向新讀取的數據,否則,調用累加器累加數據至字節容器

ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
    cumulation = data;
} else {
    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}

我們看看構造方法

protected ByteToMessageDecoder() {
    this.cumulator = MERGE_CUMULATOR; this.discardAfterReads = 16;
    CodecUtil.ensureNotSharable(this);
}

可以看到 this.cumulator = MERGE_CUMULATOR;,那我們接下來看看 MERGE_CUMULATOR

public static final ByteToMessageDecoder.Cumulator MERGE_CUMULATOR = new ByteToMessageDecoder.Cumulator() {
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        ByteBuf buffer;
        if (cumulation.writerIndex() <= cumulation.maxCapacity() - in.readableBytes() && cumulation.refCnt() <= 1) {
            buffer = cumulation;
        } else {
            buffer = ByteToMessageDecoder.expandCumulation(alloc, cumulation, in.readableBytes());
        }

        buffer.writeBytes(in);
        in.release();
        return buffer;
    }
};

MERGE_CUMULATOR是基類ByteToMessageDecoder中的一個靜態常量,其重寫了cumulate方法,下面我們看一下 MERGE_CUMULATOR 是如何將新讀取到的數據累加到字節容器里的

netty 中ByteBuf的抽象,使得累加非常簡單,通過一個簡單的api調用 buffer.writeBytes(in); 便將新數據累加到字節容器中,為了防止字節容器大小不夠,在累加之前還進行了擴容處理

static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
        ByteBuf oldCumulation = cumulation;
        cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
        cumulation.writeBytes(oldCumulation);
        oldCumulation.release();
        return cumulation;
}

擴容也是一個內存拷貝操作,新增的大小即是新讀取數據的大小

將累加到的數據傳遞給業務進行拆包

當數據追加到累積區之后,需要調用 decode 方法進行解碼,代碼如下:

public boolean isReadable() {
    //寫的坐標大於讀的坐標則說明還有數據可讀
    return this.writerIndex > this.readerIndex;
}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    // 如果累計區還有可讀字節,循環解碼,因為這里in有可能是粘包,即多次完整的數據包粘在一起,通過換行符連接
    // 下面的decode方法只能處理一個完整的數據包,所以這里循環處理粘包
    while (in.isReadable()) {
        int outSize = out.size();
        // 上次循環成功解碼
        if (outSize > 0) {
            // 處理一個粘包就 調用一次后面的業務 handler 的  ChannelRead 方法
            fireChannelRead(ctx, out, outSize);
            // 將 size 置為0
            out.clear();//
            if (ctx.isRemoved()) {
                break;
            }
            outSize = 0;
        }
        // 得到可讀字節數
        int oldInputLength = in.readableBytes();
        // 調用 decode 方法,將成功解碼后的數據放入道 out 數組中
        decode(ctx, in, out);
        if (ctx.isRemoved()) {
            break;
        }
        if (outSize == out.size()) {
            if (oldInputLength == in.readableBytes()) {
                break;
            } else {
                continue;
            }
        }
        if (isSingleDecode()) {
            break;
        }
    }
}

我們看看 fireChannelRead

static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
    if (msgs instanceof CodecOutputList) {
        fireChannelRead(ctx, (CodecOutputList)msgs, numElements);
    } else {
        //將所有已解碼的數據向下業務hadder傳遞
        for(int i = 0; i < numElements; ++i) {
            ctx.fireChannelRead(msgs.get(i));
        }
    }

}

該方法主要邏輯:只要累積區還有未讀數據,就循環進行讀取。

  1. 調用 decodeRemovalReentryProtection 方法,內部調用了子類重寫的 decode 方法,很明顯,這里是個模板模式。decode 方法的邏輯就是將累積區的內容按照約定進行解碼,如果成功解碼,就添加到數組中。同時該方法也會檢查該 handler 的狀態,如果被移除出 pipeline 了,就將累積區的內容直接刷新到后面的 handler 中。

  2. 如果 Context 節點被移除了,直接結束循環。如果解碼前的數組大小和解碼后的數組大小相等,且累積區的可讀字節數沒有變化,說明此次讀取什么都沒做,就直接結束。如果字節數變化了,說明雖然數組沒有增加,但確實在讀取字節,就再繼續讀取。

  3. 如果上面的判斷過了,說明數組讀到數據了,但如果累積區的 readIndex 沒有變化,則拋出異常,說明沒有讀取數據,但數組卻增加了,子類的操作是不對的。

  4. 如果是個單次解碼器,解碼一次就直接結束了,如果數據包一次就解碼完了,則下一次循環時 in.isReadable()就為false,因為 writerIndex = this.readerIndex 了

所以,這段代碼的關鍵就是子類需要重寫 decode 方法,將累積區的數據正確的解碼並添加到數組中。每添加一次成功,就會調用 fireChannelRead 方法,將數組中的數據傳遞給后面的 handler。完成之后將數組的 size 設置為 0.

所以,如果你的業務 handler 在這個地方可能會被多次調用。也可能一次也不調用。取決於數組中的值。

解碼器最主要的邏輯:

將 read 方法的數據讀取到累積區,使用解碼器解碼累積區的數據,解碼成功一個就放入到一個數組中,並將數組中的數據一次次的傳遞到后面的handler。

清理字節容器

業務拆包完成之后,只是從累積區中取走了數據,但是這部分空間對於累積區來說依然保留着,而字節容器每次累加字節數據的時候都是將字節數據追加到尾部,如果不對累積區做清理,那么時間一長就會OOM,清理部分的代碼如下

finally {
    // 如果累計區沒有可讀字節了,有可能在上面callDecode方法中已經將cumulation全部讀完了,此時writerIndex==readerIndex
    // 每讀一個字節,readerIndex會+1
    if (cumulation != null && !cumulation.isReadable()) {
        // 將次數歸零
        numReads = 0;
        // 釋放累計區,因為累計區里面的字節都全部讀完了
        cumulation.release();
        // 便於 gc
        cumulation = null;
    // 如果超過了 16 次,還有字節沒有讀完,就將已經讀過的數據丟棄,將 readIndex 歸零。
    } else if (++ numReads >= discardAfterReads) {
        numReads = 0;
        //將已經讀過的數據丟棄,將 readIndex 歸零。
        discardSomeReadBytes();
    }

    int size = out.size();
    decodeWasNull = !out.insertSinceRecycled();
    //循環數組,向后面的 handler 發送數據
    fireChannelRead(ctx, out, size);
    out.recycle();
}
  1. 如果累積區沒有可讀數據了,將計數器歸零,並釋放累積區。
  2. 如果不滿足上面的條件,且計數器超過了 16 次,就壓縮累積區的內容,壓縮手段是刪除已讀的數據。將 readIndex 置為 0。還記得 ByteBuf 的指針結構嗎?

public ByteBuf discardSomeReadBytes() {
    this.ensureAccessible();
    if (this.readerIndex == 0) {
        return this;
    } else if (this.readerIndex == this.writerIndex) {
        this.adjustMarkers(this.readerIndex);
        this.writerIndex = this.readerIndex = 0;
        return this;
    } else {
        //讀指針超過了Buffer容量的一半時做清理工作
        if (this.readerIndex >= this.capacity() >>> 1) {
            //拷貝,從readerIndex開始,拷貝this.writerIndex - this.readerIndex 長度
            this.setBytes(0, this, this.readerIndex, this.writerIndex - this.readerIndex);
            //writerIndex=writerIndex-readerIndex
            this.writerIndex -= this.readerIndex;
            this.adjustMarkers(this.readerIndex);
            //將讀指針重置為0
            this.readerIndex = 0;
        }

        return this;
    }
}

我們看到discardSomeReadBytes 主要是將未讀的數據拷貝到原Buffer,重置 readerIndex 和 writerIndex 

我們看到最后還調用 fireChannelRead 方法,嘗試將數組中的數據發送到后面的 handler。為什么要這么做。按道理,到這一步的時候,數組不可能是空,為什么這里還要這么謹慎的再發送一次?

如果是單次解碼器,就需要發送了,因為單詞解碼器是不會在 callDecode 方法中發送的。

總結

可以說,ByteToMessageDecoder 是解碼器的核心所做,Netty 在這里使用了模板模式,留給子類擴展的方法就是 decode 方法。

主要邏輯就是將所有的數據全部放入累積區,子類從累積區取出數據進行解碼后放入到一個 數組中,ByteToMessageDecoder 會循環數組調用后面的 handler 方法,將數據一幀幀的發送到業務 handler 。完成這個的解碼邏輯。

使用這種方式,無論是粘包還是拆包,都可以完美的實現。

Netty 所有的解碼器,都可以在此類上擴展,一切取決於 decode 的實現。只要遵守 ByteToMessageDecoder 的約定即可。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM