Netty 框架學習 —— ByteBuf



概述

網絡數據的基本單位總是字節,Java NIO 提供了 ByteBuffer 作為它的字節容器,但這個類的使用過於復雜。Netty 的 ByteBuf 具有卓越的功能性和靈活性,可以作為 ByteBuffer 的替代品

Netty 的數據處理 API 通過兩個組件暴露 —— abstract class ByteBuf 和 interface ByteBufHolder,下面是 ByteBuf API 的優點:

  • 可以被用戶自定義的緩沖區類型擴展
  • 通過內置的復合緩沖區類型實現透明的零拷貝
  • 容量可以按需增長
  • 在讀和寫這兩種模式之間切換不需要調用 ByteBuffer 的 flip() 方法
  • 在讀和寫使用了不同的索引
  • 支持方法的鏈式調用
  • 支持引用計數
  • 支持池化

ByteBuf

1. 工作原理

ByteBuf 維護了兩個不同的索引:一個用於讀取,一個用於寫入,當你從 ByteBuf 讀取時,readIndex 會遞增已經被讀取的字節數。同樣的,當你寫入 ByteBuf 時,它的 writeIndex 也會遞增。readIndex 和 writeIndex 的起始位置都為 0

如果 readIndex 和 writeIndex 的值相等,也即此時已經到了可讀取數據的末尾,就如同達到數組末尾一樣,試圖讀取超出該點的數據將觸發一個 IndexOutOfBoundsException

名稱以 read 或 write 開頭的 ByteBuf 方法,將會推進其對應的索引,而名稱以 set 或 get 開頭的操作則不會

2. ByteBuf 的使用模式

2.1 堆緩沖區

最常用的 ByteBuf 模式是將數據存儲在 JVM 的堆空間中,這種模式被稱為支撐數組(backing array)它能在沒有使用池化的情況下提供快速的分配和釋放,適合於有遺留的數據需要處理的情況

ByteBuf heapBuf = ...;
// 檢查 ByteBuf 是否有一個支撐數組
if(heapBuf.hasArray()) {
    // 獲取對該數組的引用
    byte[] array = heapBuf.array();
    // 計算第一個字節的偏移量
    int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
    // 獲得可讀字節數
    int length = heapBuf.readableBytes();
    // 使用數組、偏移量和長度作為參數調用你的方法
    handleArray(array, offset, length);
}
2.2 直接緩沖區

直接緩沖區使用本地內存存儲數據,更適合用於網絡傳輸,但相對於堆緩沖區,其分配和釋放都較為昂貴。另外,如果你正在處理遺留代碼,處理直接緩沖區內容時,你必須將其內容進行一次復制

ByteBuf directBuf = ...;
// 不是支撐數組就是直接緩沖區
if(!directBuf.hasArray()) {
    // 獲取可讀字節數
    int length = directBuf.readableBytes();
    // 分配一個新的數組來保存具有該長度的字節數組
    byte[] array = new byte[length];
    // 將字節復制到該數組
    directBuf.getBytes(directBuf.readerIndex(), array);
    // 使用數組、偏移量和長度作為參數調用你的方法
    handleArray(array, 0, length);
}
2.3 復合緩沖區

復合緩沖區為多個 ByteBuf 提供了一個聚合視圖,可以根據需要添加或刪除 ByteBuf 實例。Netty 通過一個 ByteBuf 子類 —— CompositeByteBuf 實現這個模式,它提供了一個將多個緩沖區表示為單個合並緩沖區的虛擬表示

CompositeByteBuf 中的 ByteBuf 實例可能同時包含直接內存和非直接內存分配,如果其中只有一個實例,那么對 CompositeByteBuf 上的 hasArray() 方法的調用將返回該數組上的 hasArray() 方法的值,否則返回 false

CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = ...;
ByteBuf bodyBuf = ...;
// 將 ByteBuf 實例追加到 CompositeByteBuf
messageBuf.addComponents(headerBuf, bodyBuf);
...
// 刪除第位於索引位置為 0 的 ByteBuf
messageBuf.removeComponent(0);
// 循環遍歷所有的 ByteBuf 實例
for(ByteBuf buf : messageBuf) {
    System.out.println(buf.toString());
}

字節級操作

1. 隨機訪問索引

如同普通的 Java 字節數組一樣,ByteBuf 的索引是從零開始的:第一個字節的索引是 0,最后一個字節的索引總是 capacity() - 1

ByteBuf buffer = ...;
for(int i = 0; i < buffer.capacity(); i++) {
    byte b = buffer.getByte(i);
    System.out.println((char) b)
}

這種需要一個索引值參數的方法訪問數據不會改變 readerIndex 也不會改變 writerIndex。如果需要改變,也可以通過調用 readerIndex(index) 或者 writerIndex(index) 來手動移動這兩者

2. 順序訪問索引

雖然 ByteBuf 同時具有讀索引和寫索引,但是 JDK 的 ByteBuf 卻只有一個索引,這也就是為什么必須調用 flip() 方法來在讀模式和寫模式之間進行切換的原因

3. 可丟棄字節

可丟棄字節的分段包含了已經被讀過的字節,通過調用 discardReadBytes() 方法,可以丟棄它們並回收空間。這個分段的初始大小為 0,存儲在 readerIndex 中,會隨着 read 操作的執行而增加

可能你會想到頻繁調用 discardReadBytes() 方法以確保可寫分段的最大化,但這極有可能會導致內存復制,因為可讀字段必須被移動到緩沖區的開始位置

4. 可讀字節

ByteBuf 的可讀字節分段存儲了實際數據,新分配的、包裝的或者復制的緩沖區的默認的 readerIndex 值為 0。任何名稱以 read 或者 skip 開頭的操作都將檢索或者跳過位於當前 readerIndex 的數據,並且將它增加已讀字節數

如果嘗試在緩沖區的可讀字節數已經耗盡時從中讀取數據,那么將會引發一個 IndexOutOfBoundsException

ByteBuf buffer = ...;
while(buffer.isReadable()) {
    System.out.println(buffer.readByte());
}

5. 可寫字節

可寫字節分段是指一個擁有未定義內容、寫入就緒的內存區域。新分配的緩沖區的 writerIndex 的默認值為 0.任何名稱以 write 開頭的操作都將從當前的 writerIndex 處開始寫數據,並將它增加已經寫入的字節數。如果寫操作的目標是 ByteBuf,並且沒有指定源索引的值,則緩沖區的 readerIndex 也同樣會被增加相同的大小

writeBytes(ByteBuf dest)

如果嘗試往目標寫入超過目標容量的數據,將會引發一個 IndexOutOfBoundException

ByteBuf buffer = ...;
while(buffer.writableBytes() >= 4) {
    buffer.writeInt(random.nextInt());
}

6. 索引管理

JDK 的 InputStream 定義了 mark(int readlimit) 和 reset() 方法,這些方法分別被用來將流中的當前位置標記為指定的值,以及將流重置到該位置

同樣,可以通過 markReaderIndex()、markWriterIndex()、resetWriterIndex() 和 resetReaderIndex() 來標記和重置 ByteBuf 的 readerIndex 和 writerIndex

也可以通過 readerIndex(int) 或者 writerIndex(int) 來將索引移動到指定位置。任何試圖將索引設置到無效位置都將導致 IndexOutOfBoundsException

可以通過調用 clear() 方法來將 readerIndex 和 writerIndex 都設置為 0,這樣並不會清除內存中的內容。調用 clear() 比調用 discardReadBytes() 輕量得多,因為它只是重置索引

7. 查找操作

在 ByteBuf 中有多種可以用來確定指定值的索引的方法,最簡單的是 indexOf() 方法。較為復雜的查找可以通過那些需要一個 ByteBufProcessor 作為參數的方法達成,這個接口只定義了一個方法

boolean process(byte value);

它將檢查輸入值是否是正在查找的值,ByteBufProcessor 針對一些常見的值定義了許多便利方法

ByteBuf buffer = ...;
// 查找回車符 \r
int index = buffer.forEachByte(ByteBufProcessor.FIND_CR);

8. 派生緩沖區

派生緩沖區為 ByteBuf 提供了以專門的方式來呈現其內容的視圖,這些視圖通過以下方法被創建

  • duplicate()
  • slice()
  • slice(int, int)
  • Unpooled.unmodifiableBuffer(...)
  • order(ByteOrder)
  • readSlice(int)

這些方法都將返回一個新的 ByteBuf 實例,其內部存儲和 JDK 的 ByteBuffer 共享,這也意味着,如果你修改了它的內容,也即同時修改了其對應的源實例。如果需要一個現有緩沖區的真實副本,請使用 copy() 或 copy(int, int) 方法

// 對 ByteBuf 進行切片
Charset utf8 = Charset.forName(StandardCharsets.UTF_8);
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", urf8);
// 創建該 ByteBuf 從索引 0 到 15 結束的一個新切片
ByteBuf sliced = buf.slice(0, 15);
// 更新索引 0 處的字節
buf.setByte(0, (byte) 'J');
// 成功,因為數據是共享的
assert buf.getByte(0) == sliced.getByte(0);
// 對 ByteBuf 進行切片
Charset utf8 = Charset.forName(StandardCharsets.UTF_8);
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", urf8);
// 創建該 ByteBuf 從索引 0 到 15 結束的一個新副本
ByteBuf sliced = buf.copy(0, 15);
// 更新索引 0 處的字節
buf.setByte(0, (byte) 'J');
// 成功,因為數據不是共享的
assert buf.getByte(0) != sliced.getByte(0);

9. 讀/寫操作

有兩種類別的讀/寫操作:

  • get() / set() 操作,從給定的索引開始,並且索引不會改變
  • read() / write() 操作,從給定的索引開始,並且會根據已經訪問過的字節數對索引進行調整
方法 描述
setBoolean (int , boolean) 設定給定索引處的 Boolean 值
getBoolean(int) 返回給定索引處的 Boolean 值
setByte(int index, int value) 設定給定索引處的字節值
getByte(int) 返回給定索引處的字節
getUnsignedByte(int ) 將給定索引處的無符號字節值作為 short 返回
setMedium(int index , int value) 設定給定索引處的 24 位的中等 int值
getMedium(int) 返回給定索引處的 24 位的中等 int 值
getUnsignedMedium (int) 返回給定索引處的無符號的 24 位的中等 int 值
setint(int index , int value) 設定給定索引處的 int 值
getint (int) 返回給定索引處的 int 值
getUnsignedint(int) 將給定索引處的無符號 int 值作為 long 返回
setLong(int index, long value) 設定給定索引處的 long 值
getLong(int) 返回給定索引處的 long 值
setShort(int index, int value) 設定給定索引處的 short 值
getShort(int) 返回給定索引處的 short 值
getUnsignedShort(int) 將給定索引處的無符號 short 值作為 int 返回
getBytes (int, …) 將該緩沖區中從給定索引開始的數據傳送到指定的目的地

read/write 操作的 API 和 set/get 大同小異,只不過會增加索引值

ByteBuf 還提供了其他有用的操作

方法 描述
isReadable () 如果至少有一個字節可供讀取,則返回 true
isWritable () 如果至少有一個字節可被寫入,則返回 true
readableBytes() 返回可被讀取的字節數
writableBytes() 返回可被寫入的字節數
capacity() 返回 ByteBuf 可容納的字節數 。在此之后,它會嘗試再次擴展直到達到maxCapacity ()
maxCapacity() 返問 ByteBuf 可以容納的最大字節數
hasArray() 如果 ByteBuf 由一個字節數組支撐,則返回 true
array () 如果 ByteBuf 由一個字節數組支撐則返問該數組;否則,它將拋出 一個 UnsupportedOperat工onException 異常

ByteBuf 分配

1. 按需分配

為了降低分配和釋放內存的開銷,Netty 通過 interface ByteBufAllocator 實現了 ByteBuf 的池化,用於分配 ByteBuf 實例

下面是 ByteBufAllocator 的一些 API

方法 描述
buffer()buffer(int initialCapacity);buffer(int initialCapacity, int maxCapacity); 返回一個基於堆或者直接內存存儲的 ByteBuf
heapBuffer ()heapBuffer(int initialCapacity)heapBuffer(int initialCapacity, int maxCapacity) 返回一個基於堆內存存儲的 ByteBuf
directBuffer()directBuffer(int initialCapacity)directBuffer(int initialCapacity , int maxCapacity) 返回一個基於直接內存存儲的 ByteBuf
compositeBuffer()compositeBuffer(int maxNumComponents) compositeDirectBuffer()compositeDirectBuffer (int maxNumComponents); compositeHeapBuffer()compositeHeapBuffer(int maxNumComponents); 返回一個可以通過添加最大到指定數目的基於堆的或者直接內存存儲的緩沖區來擴展的 CompositeByteBuf
ioBuffer() 返回一個用於套接字的 I/O 操作的 ByteBuf。默認地, 當所運行的環境具有 sun.misc.Unsafe支持時,返回基於直接內存存儲的 ByteBuf,否則返回基於堆內存存儲的 ByteBuf;當指定使用 PreferHeapByteBufAllocator 時,則只會返回基於堆內存存儲的 ByteBuf

可以通過 Channel 或者綁定到 ChannelHandler 的 ChannelHandlerContext 獲取一個 ByteBufAllocator 的引用

Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
...
ChannelHandlerContext ctx = ...;
ByteBufAllocator allocator = ctx.alloc();

Netty 提供了兩種 ByteBufAllocator 的實現:PooledByteBufAllocator 和 UnpooledByteBufAllocator ,前者池化了 ByteBuf 實例以提供性能,最大限度減少內存碎片。后者不池化 ByteBuf 實例,每次調用都會返回一個新的實例

2. Unpooled 緩沖區

如果你未能獲取 ByteBufAllocator 實例,Netty 也提供了名為 Unpooled 的工具類,它提供了靜態的輔助方法來創建未池化的 ByteBuf 實例

方法 描述
buffer()buffer(int 工nitialCapacity)buffer(int initialCapacity, int maxCapacity) 返回一個未池化的基於堆內存存儲的ByteBuf
directBuffer()directBuffer(int initialCapacity)directBuffer(int initialCapacity, int maxCapacity) 返回一個未池化的基於直接內存存儲ByteBuf
wrappedBuffer() 返回一個包裝了給定數據的ByteBuf
copiedBuffer() 返回一個復制了給定數據的 ByteBuf

3. ByteBufUtil 類

ByteBufUtil 提供了用於操作 ByteBuf 的靜態的輔助方法。因為這個 API 是通用的,並且和池化無關,所以這些方法已然在分配類的外部實現

這些靜態方法中最有價值的可能就是 hexdump() 方法,它以十六進制的表示形式打印 ByteBuf 的內容。 這在各種情況下都很有用,例如,出於調試 的目的記錄 ByteBuf 的內容。十六進制的表示通常會提供一個比字節值的直接表示形式更加有用的日志條目,此外,十六進制的版本還可以很容易地轉換回實際的字節表示

另一個有用的方法是 boolean equals(ByteBuf , ByteBuf),它被用來判斷兩個 ByteBuf 實例的相等性。 如果你實現自己的 ByteBuf 子類,你可能會發現 ByteBufUtil 的其他有用方法



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM