本文內容主要參考<<Netty In Action>>,偏筆記向.
網絡編程中,字節緩沖區是一個比較基本的組件.Java NIO提供了ByteBuffer,但是使用過的都知道ByteBuffer對於讀寫數據操作還是有些麻煩的,切換讀寫狀態需要flip().Netty框架對字節緩沖區進行了封裝,名稱是ByteBuf,相較於ByteBuffer更靈活.
1.ByteBuf特點概覽
- 用戶可以自定義緩沖區類型對其擴展
- 通過內置的符合緩沖區類型實現了透明的零拷貝
- 容量可以按需增長(類似
StringBuilder) - 切換讀寫模式不用調用
flip()方法 - 讀寫使用各自的索引
- 支持方法的鏈式調用
- 支持引用計數
- 支持池化
2.ByteBuf類介紹
2.1工作模式
ByteBuf維護了兩個指針,一個用於讀取(readerIndex),一個用於寫入(writerIndex).
使用ByteBuf的API中的read*方法讀取數據時,readerIndex會根據讀取字節數向后移動,但是get*方法不會移動readerIndex;使用write*數據時,writerIndex會根據字節數移動,但是set*方法不會移動writerIndex.(read*表示read開頭的方法,其余意義相同)
讀取數據時,如果readerIndex超過了writerIndex會觸發IndexOutOfBoundsException.
可以指定ByteBuf容量最大值,capacity(int)或ensureWritable(int),當超出容量時會拋出異常.
2.2使用模式
2.2.1堆緩沖區
將ByteBuf存入JVM的堆空間.能夠在沒有池化的情況下提供快速的分配和釋放.
除此之外,ByteBuf的堆緩沖區還提供了一個后備數組(backing array).后備數組和ByteBuf中的數據是對應的,如果修改了backing array中的數據,ByteBuf中的數據是同步的.
public static void main(String[] args) {
ByteBuf heapBuf = Unpooled.buffer(1024);
if(heapBuf.hasArray()){
heapBuf.writeBytes("Hello,heapBuf".getBytes());
System.out.println("數組第一個字節在緩沖區中的偏移量:"+heapBuf.arrayOffset());
System.out.println("緩沖區中的readerIndex:"+heapBuf.readerIndex());
System.out.println("writerIndex:"+heapBuf.writerIndex());
System.out.println("緩沖區中的可讀字節數:"+heapBuf.readableBytes());//等於writerIndex-readerIndex
byte[] array = heapBuf.array();
for(int i = 0;i < heapBuf.readableBytes();i++){
System.out.print((char) array[i]);
if(i==5){
array[i] = (int)'.';
}
}
//不會修改readerIndex位置
System.out.println("\n讀取數據后的readerIndex:"+heapBuf.readerIndex());
//讀取緩沖區的數據,查看是否將逗號改成了句號
while (heapBuf.isReadable()){
System.out.print((char) heapBuf.readByte());
}
}
輸出:
數組第一個字節在緩沖區中的偏移量:0
緩沖區中的readerIndex:0
writerIndex:13
緩沖區中的可讀字節數:13
Hello,heapBuf
讀取數據后的readerIndex:0
Hello.heapBuf
如果
hasArray()返回false,嘗試訪問backing array會報錯
2.2.2直接緩沖區
直接緩沖區存儲於JVM堆外的內存空間.這樣做有一個好處,當你想把JVM中的數據寫給socket,需要將數據復制到直接緩沖區(JVM堆外內存)再交給socket.如果使用直接緩沖區,將減少復制這一過程.
但是直接緩沖區也是有不足的,與JVM堆的緩沖區相比,他們的分配和釋放是比較昂貴的.而且還有一個缺點,面對遺留代碼的時候,可能不確定ByteBuf使用的是直接緩沖區還是堆緩沖區,你可能需要進行一次額外的復制.如代碼示例.
與自帶后備數組的堆緩沖區來講,這要多做一些工作.所以,如果確定容器中的數據會被作為數組來訪問,你可能更願意使用堆內存.
//實際上你不知道從哪獲得的引用,這可能是一個直接緩沖區的ByteBuf
//忽略Unpooled.buffer方法,當做不知道從哪獲得的directBuf
ByteBuf directBuf = Unpooled.buffer(1024);
//如果想要從數組中訪問數據,需要將直接緩沖區中的數據手動復制到數組中
if (!directBuf.hasArray()) {
int length = directBuf.readableBytes();
byte[] array = new byte[length];
directBuf.getBytes(directBuf.readerIndex(), array);
handleArray(array, 0, length);
}
2.2.3符合緩沖區(CompositeByteBuf)
聚合緩沖區是個非常好用的東西,是多個ByteBuf的聚合視圖,可以添加或刪除ByteBuf實例.
CompositeByteBuf中的ByteBuf實例可能同事包含直接內存分配和非直接內存分配.如果其中只有一個實例,那么調用CompositeByteBuf中的
hasArray()方法將返回該組件上的hasArray()方法的值,否則返回false
多個ByteBuf組成一個完整的消息是很常見的,比如header和body組成的HTTP協議傳輸的消息.消息中的body有時候可能能重用,我們不想每次都創建重復的body,我們可以通過CompositeByteBuf來復用body.
對比一下JDK中的ByteBuffer實現復合緩沖區和Netty中的CompositeByteBuf.
//JDK版本實現復合緩沖區
public static void byteBufferComposite(ByteBuffer header, ByteBuffer body) {
//使用一個數組來保存消息的各個部分
ByteBuffer[] message = new ByteBuffer[]{ header, body };
// 創建一個新的ByteBuffer來復制合並header和body
ByteBuffer message2 =
ByteBuffer.allocate(header.remaining() + body.remaining());
message2.put(header);
message2.put(body);
message2.flip();
}
//Netty中的CompositeByteBuf
public static void byteBufComposite() {
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = Unpooled.buffer(1024); // 可能是直接緩存也可能是堆緩存中的
ByteBuf bodyBuf = Unpooled.buffer(1024); // 可能是直接緩存也可能是堆緩存中的
messageBuf.addComponents(headerBuf, bodyBuf);
//...
messageBuf.removeComponent(0); // remove the header
for (ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
}
}
CompositeByteBuf不支持訪問其后備數組,所以訪問CompositeByteBuf中的數據類似於訪問直接緩沖區
CompositeByteBuf compBuf = Unpooled.compositeBuffer();
int length = compBuf.readableBytes();
byte[] array = new byte[length];
//將CompositeByteBuf中的數據復制到數組中
compBuf.getBytes(compBuf.readerIndex(), array);
//處理一下數組中的數據
handleArray(array, 0, array.length);
Netty使用CompositeByteBuf來優化socket的IO操作,避免了JDK緩沖區實現所導致的性能和內存使用率的缺陷.內存使用率的缺陷是指對可復用對象大量的復制,Netty對其在內部做了優化,雖然沒有暴露出來,但是應該知道CompositeByteBuf的優勢和JDK自帶工具的弊端.
JDK的NIO包中提供了Scatter/Gather I/O技術,字面意思是打散和聚合,可以理解為把單個ByteBuffer切分成多個或者把多個ByteBuffer合並成一個.
3.字節級操作
ByteBuf的索引從0開始,最后一個索引是capacity()-1.
遍歷演示
ByteBuf buffer = Unpooled.buffer(1024);
for (int i = 0; i < buffer.capacity(); i++) {
byte b = buffer.getByte(i);//這種方法不會移動readerIndex指針
System.out.println((char) b);
}
3.1readerIndex和writerIndex
JDK中的ByteBuffer只有一個索引,需要通過flip()來切換讀寫操作,Netty中的ByteBuf既有讀索引,也有寫索引,通過兩個索引把ByteBuf划分了三部分.

可以調用discardReadBytes() 方法可丟棄可丟棄字節並回收空間.
調用discardReadBytes() 方法之后

使用read*或skip*方法都會增加readerIndex.
移動readerIndex讀取可讀數據的方式
ByteBuf buffer = ...;
while (buffer.isReadable()) {
System.out.println(buffer.readByte());
}
write*方法寫入ByteBuf時會增加writerIndex,如果超過容量會拋出IndexOutOfBoundException .
writeableBytes()可以返回可寫字節數.
ByteBuf buffer = ...;
while (buffer.writableBytes() >= 4) {
buffer.writeInt(random.nextInt());
}
3.2索引管理
JDK 的
InputStream定義了mark(int readlimit)和reset()方法,這些方法分別被用來將流中的當前位置標記為指定的值,以及將流重置到該位置。
同樣,可以通過調用markReaderIndex()、markWriterIndex()、resetWriterIndex()和resetReaderIndex()來標記和重置ByteBuf的readerIndex和writerIndex。這些和InputStream上的調用類似,只是沒有readlimit參數來指定標記什么時候失效。
如果將索引設置到一個無效位置會拋出IndexOutOfBoundsException.
可以通過clear()歸零索引,歸零索引不會清除數據.
3.3查找
ByteBuf中很多方法可以確定值的索引,如indexOf().
復雜查找可以通過那些需要一個ByteBufProcessor作為參數的方法完成.這個接口應該可以使用lambda表達式(但是我現在使用的Netty4.1.12已經廢棄了該接口,應該使用ByteProcessor).
ByteBuf buffer = ...;
int index = buffer.forEachByte(ByteProcessor.FIND_CR);
3.4派生緩沖區
派生緩沖區就是,基於原緩沖區一頓操作生成新緩沖區.比如復制,切分等等.
duplicate();slice(); slice(int, int) ;Unpooled.unmodifiableBuffer(…) ;order(ByteOrder); readSlice(int) .
每個這些方法都將返回一個新的 ByteBuf 實例,它具有自己的讀索引、寫索引和標記
索引。 其內部存儲和 JDK 的 ByteBuffer 一樣也是共享的。這使得派生緩沖區的創建成本
是很低廉的,但是這也意味着,如果你修改了它的內容,也同時修改了其對應的源實例,所
以要小心
//復制
public static void byteBufCopy() {
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
ByteBuf copy = buf.copy(0, 15);
System.out.println(copy.toString(utf8));
buf.setByte(0, (byte)'J');
assert buf.getByte(0) != copy.getByte(0);
}
//切片
public static void byteBufSlice() {
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
ByteBuf sliced = buf.slice(0, 15);
System.out.println(sliced.toString(utf8));
buf.setByte(0, (byte)'J');
assert buf.getByte(0) == sliced.getByte(0);
}
還有一些讀寫操作的API,留在文末展示吧.
4.ByteBufHolder接口
我們經常發現, 除了實際的數據負載之外, 我們還需要存儲各種屬性值。 HTTP 響應便是一個很好的例子, 除了表示為字節的內容,還包括狀態碼、 cookie 等。
為了處理這種常見的用例, Netty 提供了 ByteBufHolder。 ByteBufHolder 也為 Netty 的高級特性提供了支持,如緩沖區池化,其中可以從池中借用 ByteBuf, 並且在需要時自動釋放。ByteBufHolder 只有幾種用於訪問底層數據和引用計數的方法。

5.ByteBuf的分配
我們可以通過ByteBufAllocator來分配一個ByteBuf實例.ByteBufAllocator接口實現了ByteBuf的池化.
可以通過 Channel(每個都可以有一個不同的 ByteBufAllocator 實例)或者綁定到ChannelHandler 的 ChannelHandlerContext 獲取一個到ByteBufAllocator的引用。
//從Channel獲取一個ByteBufAllocator的引用
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
....
//從ChannelHandlerContext獲取ByteBufAllocator 的引用
ChannelHandlerContext ctx = ...;
ByteBufAllocator allocator2 = ctx.alloc();
Netty提供了兩種ByteBufAllocator的實現: PooledByteBufAllocator和UnpooledByteBufAllocator。前者池化了ByteBuf的實例以提高性能並最大限度地減少內存碎片。 后者的實現不 池化ByteBuf實例, 並且在每次它被調用時都會返回一個新的實例。
默認使用的是PooledByteBufAllocator ,可以通過ChannelConfig修改.
Unpooled緩沖區
可能有時候拿不到ByteBufAllocator引用的話,可以使用Unpooled工具類來創建未持化ByteBuf實例.
ByteBufUtil類
ByteBufUtil 提供了用於操作 ByteBuf 的靜態的輔助方法。因為這個 API 是通用的, 並且和池化無關,所以這些方法已然在分配類的外部實現。
這些靜態方法中最有價值的可能就是 hexdump()方法, 它以十六進制的表示形式打印ByteBuf 的內容。這在各種情況下都很有用,例如, 出於調試的目的記錄 ByteBuf 的內容。十六進制的表示通常會提供一個比字節值的直接表示形式更加有用的日志條目,此外,十六進制的版本還可以很容易地轉換回實際的字節表示。
另一個有用的方法是 boolean equals(ByteBuf, ByteBuf), 它被用來判斷兩個 ByteBuf實例的相等性。如果你實現自己的 ByteBuf 子類,你可能會發現 ByteBufUtil 的其他有用方法。
6.引用計數
引用計數是一種通過在某個對象所持有的資源不再被其他對象引用時釋放該對象所持有的資源來優化內存使用和性能的技術。 它們都實現了 interface ReferenceCounted。 引用計數背后的想法並不是特別的復雜;它主要涉及跟蹤到某個特定對象的活動引用的數量。一個 ReferenceCounted 實現的實例將通常以活動的引用計數為 1 作為開始。只要引用計數大於 0, 就能保證對象不會被釋放。當活動引用的數量減少到 0 時,該實例就會被釋放。注意,雖然釋放的確切語義可能是特定於實現的,但是至少已經釋放的對象應該不可再用了。
//從Channel獲取ByteBufAllocator
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
....
//從ByteBufAllocator分配一個ByteBuf
ByteBuf buffer = allocator.directBuffer();
assert buffer.refCnt() == 1;//引用計數是否為1
7.API
ByteBuf





ByteBufAllocator

Unpooled

