在Netty中,還有另外一個比較常見的對象ByteBuf,它其實等同於Java Nio中的ByteBuffer,但是ByteBuf對Nio中的ByteBuffer的功能做了很多增強,下面介紹一下ByteBuf。
下面這段代碼演示了ByteBuf的創建以及內容的打印,這里顯示出了和普通ByteBuffer最大的區別之一,就是ByteBuf可以自動擴容,默認長度是256,如果內容長度超過閾值時,會自動觸發擴容
public class ByteBufExample {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); //可自動擴容
log(buffer);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 128; i++) {
sb.append(" - " + i);
}
buffer.writeBytes(sb.toString().getBytes());
log(buffer);
}
private static void log(ByteBuf buf) {
StringBuilder builder = new StringBuilder()
.append(" read index:").append(buf.readerIndex())//獲取讀索引
.append(" write index:").append(buf.writerIndex()) //獲取寫索引
.append(" capacity:").append(buf.capacity())//獲取容量
.append(StringUtil.NEWLINE);
//把ByteBuf中的內容,dump到StringBuilder中
ByteBufUtil.appendPrettyHexDump(builder, buf);
System.out.println(builder.toString());
}
}
ByteBuf創建的方法有兩種
-
第一種,創建基於堆內存的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
-
第二種,創建基於直接內存(堆外內存)的ByteBuf(默認情況下用的是這種)
Java中的內存分為兩個部分,一部分是不需要jvm管理的直接內存,也被稱為堆外內存。堆外內存就是把內存對象分配在JVM堆以外的內存區域,這部分內存不是虛擬機管理,而是由操作系統來管理,這樣可以減少垃圾回收對應用程序的影響
ByteBufAllocator.DEFAULT.directBuffer(10);
直接內存的好處是讀寫性能會高一些,如果數據存放在堆中,此時需要把Java堆空間的數據發送到遠程服務器,首先需要把堆內部的數據拷貝到直接內存(堆外內存),然后再發送,如果是把數據直接存儲到堆外內存中,發送的時候就少了一個復制步驟。
但是它也有缺點,由於缺少了JMM的內存管理,所以需要我們自己來維護堆外內存,防止內存溢出。
另外,需要注意的是,ByteBuf默認采用了池化技術來創建。它的核心思想是實現對象的復用,從而減少對象頻繁創建銷毀帶來的性能開銷。
池化功能是否開啟,可以通過下面的環境變量來控制,其中unpooled表示不開啟。
-Dio.netty.allocator.type={unpooled | pooled}
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); //可自動擴容
System.out.println(buffer);
}
//PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)
ByteBuf的存儲結構
ByteBuf的存儲結構下圖所示,從這個圖中可以看到ByteBuf其實是一個字節容器,該容器中包含三個部分
- 已經丟棄的字節,這部分數據是無效的
- 可讀字節,這部分數據是ByteBuf的主體數據,從ByteBuf里面讀取的數據都來自這部分;可寫字節,所有寫到ByteBuf的數據都會存儲到這一段
- 可擴容字節,表示ByteBuf最多還能擴容多少容量。
在ByteBuf中,有兩個指針:
- readerIndex:讀指針,每讀取一個字節,readerIndex自增加1。ByteBuf里面總共有witelndex-readerlndex個字節可讀,當readerlndex和writeIndex相等的時候,ByteBuf不可讀
- writelndex:寫指針,每寫入一個字節,writeIndex自增加1,直到增加到capacity后,可以觸發擴容后繼續寫入。
- ByteBuf中還有一個maxCapacity最大容量,默認的值是Integer.MAX_VALUE,當ByteBuf寫入數據時,如果容量不足時,會觸發擴容,直到capacity擴容到maxCapacity。
ByteBuf中常用的方法
對於ByteBuf來說,常見的方法就是寫入和讀取
Write相關方法
對於write方法來說,ByteBuf提供了針對各種不同數據類型的寫入,比如
- writeChar,寫入char類型
- writelnt,寫入int類型
- writeFloat,寫入float類型
- writeBytes,寫入nio的ByteBuffer
- writeCharSequence,寫入字符串
public class ByteBufExample {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); //可自動擴容
buffer.writeBytes(new byte[]{1,2,3,4});//寫入四個字節
log(buffer);
buffer.writeInt(5); //寫入一個int類型,也是4個字節
log(buffer);
}
private static void log(ByteBuf buf) {
StringBuilder builder = new StringBuilder()
.append(" read index:").append(buf.readerIndex())//獲取讀索引
.append(" write index:").append(buf.writerIndex()) //獲取寫索引
.append(" capacity:").append(buf.capacity())//獲取容量
.append(StringUtil.NEWLINE);
//把ByteBuf中的內容,dump到StringBuilder中
ByteBufUtil.appendPrettyHexDump(builder, buf);
System.out.println(builder.toString());
}
}
擴容
當向ByteBuf寫入數據時,發現容量不足時,會觸發擴容,而具體的擴容規則是
假設ByteBuf初始容量是10。
- 如果寫入后數據大小未超過512個字節,則選擇下一個16的整數倍進行擴容。比如寫入數據后大小為12,則擴容后的capacity是16。
- 如果寫入后數據大小超過512個字節,則選擇下一個\(2^n\)。比如寫入后大小是512字節,則擴容后的capacity是\(2^{10}\)=1024。(因為\(2^9\)=512,長度已經不夠了)
- 擴容不能超過max capacity,否則會報錯。
Reader相關方法
reader方法也同樣針對不同數據類型提供了不同的操作方法,
- readByte,讀取單個字節
- readInt,讀取一個int類型
- readFloat,讀取一個float類型
public class ByteBufExample {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); //可自動擴容
buffer.writeBytes(new byte[]{1,2,3,4});//寫入四個字節
log(buffer);
System.out.println(buffer.readByte());
log(buffer);
}
private static void log(ByteBuf buf) {
StringBuilder builder = new StringBuilder()
.append(" read index:").append(buf.readerIndex())//獲取讀索引
.append(" write index:").append(buf.writerIndex()) //獲取寫索引
.append(" capacity:").append(buf.capacity())//獲取容量
.append(StringUtil.NEWLINE);
//把ByteBuf中的內容,dump到StringBuilder中
ByteBufUtil.appendPrettyHexDump(builder, buf);
System.out.println(builder.toString());
}
}
從下面結果中可以看到,讀完一個字節后,這個字節就變成了廢棄部分,再次讀取的時候只能讀取未讀取的部分數據。
另外,如果想重復讀取哪些已經讀完的數據,這里提供了兩個方法來實現標記和重置
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); //可自動擴容
buffer.writeBytes(new byte[]{1,2,3,4});//寫入四個字節
log(buffer);
buffer.markReaderIndex();//標記讀取的索引位置
System.out.println("開始進行讀取操作");
System.out.println(buffer.readByte());
log(buffer);
buffer.resetReaderIndex();//重置到標記位
System.out.println("重置到標記位");
log(buffer);
}
另外,如果想不改變讀指針位置來獲得數據,在ByteBuf中提供了get開頭的方法,這個方法基於索引位置讀取,並且允許重復讀取的功能。
ByteBuf的零拷貝機制
需要說明一下,ByteBuf的零拷貝機制和我們之前提到的操作系統層面的零拷貝不同,操作系統層面的零拷貝,是我們要把一個文件發送到遠程服務器時,需要從內核空間拷貝到用戶空間,再從用戶空間拷貝到內核空間的網卡緩沖區發送,導致拷貝次數增加。
而ByteBuf中的零拷貝思想也是相同,都是減少數據復制提升性能。如圖3-2所示,假設有一個原始ByteBuf,我們想對這個ByteBuf其中的兩個部分的數據進行操作。按照正常的思路,我們會創建兩個新的ByteBuf,然后把原始ByteBuf中的部分數據拷貝到兩個新的ByteBuf中,但是這種會涉及到數據拷貝,在並發量較大的情況下,會影響到性能。
ByteBuf中提供了一個slice方法,這個方法可以在不做數據拷貝的情況下對原始ByteBuf進行拆分,使用方法如下
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); //可自動擴容
buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});//寫入四個字節
log(buffer);
ByteBuf b1 = buffer.slice(0, 5);
ByteBuf b2 = buffer.slice(5, 5);
log(b1);
log(b2);
System.out.println("修改原始數據");
buffer.setByte(2, 5); //修改原始buf數據
log(b1); //再次打印b1的結果。發現數據發生了變化
}
在上面的代碼中,通過slice對原始buf進行切片,每個分片是5個字節。
為了證明slice是沒有數據拷貝,我們通過修改原始buf的索引2所在的值,然后再打印第一個分片b1,可以發現b1的結果發生了變化。說明兩個分片和原始buf指向的數據是同一個。
Unpooled
Unpooled工具類,它是同了非池化的ByteBuf的創建、組合、復制等操作。
假設有一個協議數據,它有頭部和消息體組成,這兩個部分分別放在兩個ByteBuf中
ByteBuf header = ...
ByteBuf body = ...
我們希望把header和body合並成一個ByteBuf,通常的做法是
ByteBuf allBuf = Unpooled.buffer(header.readableBytes() + body.readableBytes());
allBuf.writeBytes(header);
allBuf.writeBytes(body);
在這個過程中,我們把header和body拷貝到了新的allBuf中,這個過程在無形中增加了兩次數據拷貝操作。那有沒有更高效的方法減少拷貝次數來達到相同目的呢?
在Netty中,提供了一個CompositeByteBuf
組件,它提供了這個功能。
public static void main(String[] args) {
ByteBuf header = ByteBufAllocator.DEFAULT.buffer(); //可自動擴容
header.writeCharSequence("header", CharsetUtil.UTF_8);
ByteBuf body = ByteBufAllocator.DEFAULT.buffer(); //可自動擴容
body.writeCharSequence("body",CharsetUtil.UTF_8);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
//其中第一個參數是true,表示當添加新的ByteBuf時,自動遞增CompositeByteBuf的writeIndex
//默認是false,就是writeIndex=0,這樣的話我們不可能從CompositeByteBuf中讀到數據
compositeByteBuf.addComponents(true,header,body);
log(compositeByteBuf);
ByteBuf allBuf = Unpooled.buffer(header.readableBytes() + body.readableBytes());
allBuf.writeBytes(header);
allBuf.writeBytes(body);
}
之所以CompositeByteBuf能夠實現零拷貝,是因為在組合header和body時,並沒有對這兩個數據進行復制,而是通過CompositeByteBuf構建了一個邏輯整體,里面仍然是兩個真實對象,也就是有一個指針指向了同一個對象,所以這里類似於淺拷貝的實現。
wrappedBuffer
在Unpooled工具類中,提供了一個wrappedBuffer方法,來實現CompositeByteBuf零拷貝功能。使用方法如下。
copiedBuffer
copiedBuffer,和wrappedBuffer最大的區別是,該方法會實現數據復制,下面代碼演示了
copiedBuffer和wrappedbuffer的區別,可以看到在case
標注的位置中,修改了原始ByteBuf的值,並沒有影響到原來的值。
public static void main(String[] args) {
ByteBuf header= ByteBufAllocator.DEFAULT.buffer();
header.writeBytes(new byte[]{1,2,3,4,5});
ByteBuf body=ByteBufAllocator.DEFAULT.buffer();
body.writeBytes(new byte[]{6,7,8,9,10});
ByteBuf total=Unpooled.wrappedBuffer(header,body);
log(total);
header.setByte(2,9);
log(total);
System.out.println("===============================");
ByteBuf byteBuf = Unpooled.copiedBuffer(header, body);
log(byteBuf);
header.setByte(2,8);
log(byteBuf);
}
case:
/**
* read index:0 write index:10 capacity :10 +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
* +--------+-------------------------------------------------+----------------+
* read index:0 write index:10 capacity :10 +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 01 02 09 04 05 06 07 08 09 0a |.......... |
* +--------+-------------------------------------------------+----------------+
* ===============================
* read index:0 write index:10 capacity :10 +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 01 02 09 04 05 06 07 08 09 0a |.......... |
* +--------+-------------------------------------------------+----------------+
* read index:0 write index:10 capacity :10 +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 01 02 09 04 05 06 07 08 09 0a |.......... |
* +--------+-------------------------------------------------+----------------+
*/
內存釋放
針對不同的ByteBuf創建,內存釋放的方法不同。
-
UnpooledHeapByteBuf,使用JVM內存,只需要等待GC回收即可
-
UnpooledDirectByteBuf,使用堆外內存,需要特殊方法來回收內存
-
PooledByteBuf和它的子類使用了池化機制,需要更復雜的規則來回收
內存如果ByteBuf是使用堆外內存來創建,那么盡量手動釋放內存,那怎么釋放呢?
Netty采用了引用計數方法來控制內存回收,每個ByteBuf都實現了ReferenceCounted接口。
- 每個ByteBuf對象的初始計數為1
- 調用release方法時,計數器減一,如果計數器為0,ByteBuf被回收
- 調用retain方法時,計數器加一,表示調用者沒用完之前,其他handler即時調用了release也不會造成回收。
- 當計數器為0時,底層內存會被回收,這時即使ByteBuf對象還存在,但是它的各個方法都無法正常使用