ByteBuf
ByteBuf是什么
為了平衡數據傳輸時CPU與各種IO設備速度的差異性,計算機設計者引入了緩沖區這一重要抽象。jdkNIO庫提供了java.nio.Buffer接口,並且提供了7種默認實現,常見的實現類為ByteBuffer。不過netty並沒有直接使用nio的ByteBuffer,這主要是由於jdk的Buffer有以下幾個缺點:
- 當調用allocate方法分配內存時,Buffer的長度就固定了,不能動態擴展和收縮,當寫入數據大於緩沖區的capacity時會發生數組越界錯誤
- Buffer只有一個位置標志位屬性position,讀寫切換時,必須先調用flip或rewind方法。不僅如此,因為flip的切換
- Buffer只提供了存取、翻轉、釋放、標志、比較、批量移動等緩沖區的基本操作,想使用高級的功能(比如池化),就得自己手動進行封裝及維護,使用非常不方便。
也因此,netty實現了自己的緩沖區——ByteBuf,連名字都如此相似。那么ByteBuf是如何規避ByteBuffer的缺點的?
第一點顯然是很好解決的,由於ByteBuf底層也是數組,那么它就可以像ArrayList一樣,在寫入操作時進行容量檢查,當容量不足時進行擴容。
第二點,ByteBuf通過2個索引readerIndex,writerIndex將數組分為3部分,如下圖所示
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
初始化時,readerIndex和writerIndex都是0,隨着數據的寫入writerIndex會增加,此時readable byte部分增加,writable bytes減少。當讀取時,discardable bytes增加,readable bytes減少。由於讀操作只修改readerIndex,寫操作只修改writerIndex,讓ByteBuf的使用更加容易理解,避免了由於遺漏flip導致的功能異常。
此外,當調用discardReadBytes方法時,可以把discardable bytes這部分的內存釋放。總體想法是通過將readerIndex移動到0,writerIndex移動到writerIndex-readerIndex下標,具體移動下標的方式依據ByteBuf實現類有所不同。這個方法可以顯著提高緩沖區的空間復用率,避免無限度的擴容,但會發生字節數組的內存復制,屬於以時間換空間的做法。
ByteBuf重要API
read、write、set、skipBytes
前3個系列的方法及最后一個skipBytes都屬於改變指針的方法。舉例來說,readByte會移動readerIndex1個下標位,而int是4個byte的大小,所以readInt會移動readerIndex4個下標位,相應的,writeByte會移動writerIndex1個下標位,writeInt會移動writerIndex4個下標位。set系列方法比較特殊,它的參數為index和value,意即將value寫入指定的index位置,但這個操作不會改變readerIndex和writerIndex。skipBytes比較簡單粗暴,直接將readerIndex移動指定長度。
mark和reset
markReaderIndex和markWriterIndex可以將對應的指針做一個標記,當需要重新操作這部分數據時,再使用resetReaderIndex或resetWriterIndex,將對應指針復位到mark的位置。
duplicate、slice、copy
這3種方法都可以復制一份字節數組,不同之處在於duplicate和slice兩個方法返回的新ByteBuf和原有的老ByteBuf之間的內容會互相影響,而copy則不會。duplicate和slice的區別在於前者復制整個ByteBuf的字節數組,而后者默認僅復制可讀部分,但可以通過slice(index, length)分割指定的區間。
retain、release
這是ByteBuf接口繼承自ReferenceCounted接口的方法,用於引用計數,以便在不使用對象時及時釋放。實現思路是當需要使用一個對象時,計數加1;不再使用時,計數減1。考慮到多線程場景,一般也多采用AtomicInteger實現。netty卻另辟蹊徑,選擇了volatile + AtomicIntegerFieldUpdater這樣一種更節省內存的方式。
ByteBuf擴容
在ByteBuf寫入數據時會檢查可寫入的容量,若容量不足會進行擴容。
final void ensureWritable0(int minWritableBytes) {
if (minWritableBytes <= writableBytes()) {
return;
}
int minNewCapacity = writerIndex + minWritableBytes;
int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);
int fastCapacity = writerIndex + maxFastWritableBytes();
if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) {
newCapacity = fastCapacity;
}
capacity(newCapacity);
}
忽略一些檢驗性質的代碼后,可以看到擴容時先嘗試將現有寫索引加上需要寫入的容量大小作為最小新容量,並調用ByteBufAllocate的calculateNewCapacity方法進行計算。跟入這個方法:
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
可以看到這個方法的目的則是計算比可寫容量稍大的2的冪次方。minNewCapacity由上一個方法傳入,而maxCapacity則為Integer.MAX_VALUE。具體步驟是首先判斷新容量minNewCapacity是否超過了計算限制CALCULATE_THRESHOLD,默認為4M,如果沒有超過4MB,那么從64B開始不斷以2的冪次方形式擴容,直到newCapacity超過minNewCapacity。而若一開始新容量就超過了4M,則調整新容量到4M的倍數+1。比如newCapacity為6M,因為6/4 = 1,所以調整為(1+1)*4M=8M。
在計算完容量之后會調用capacity方法。這是一個抽象方法,這里以UnpooledHeapByteBuf為例。
public ByteBuf capacity(int newCapacity) {
checkNewCapacity(newCapacity);
byte[] oldArray = array;
int oldCapacity = oldArray.length;
if (newCapacity == oldCapacity) {
return this;
}
int bytesToCopy;
if (newCapacity > oldCapacity) {
bytesToCopy = oldCapacity;
} else {
trimIndicesToCapacity(newCapacity);
bytesToCopy = newCapacity;
}
byte[] newArray = allocateArray(newCapacity);
System.arraycopy(oldArray, 0, newArray, 0, bytesToCopy);
setArray(newArray);
freeArray(oldArray);
return this;
}
首先檢查newCapacity是否大於0且小於最大容量。之后准備好老數組要復制的長度。trimIndicesToCapacity(newCapacity)是縮容時調用的,它將readerIndex和newCapacity的較小值設置為新的readerIndex,將newCapacity設置為新的writerIndex。
之后便分配一個新數組,並開始復制舊數組的元素。復制成功后,將新數組保存為成員變量,將老數組釋放掉。
ByteBuf種類
出於性能和空間的多方考慮,netty從3個維度定義了各種不同的ByteBuf實現類,主要是池化、堆內堆外、可否使用Unsafe類這3個維度,從而演化出8種不同的ByteBuf,它們分別是PooledUnsafeHeapBytebuf、PooledHeapByteBuf、PooledUnsafeDirectByteBuf、PooledDirectBytebuf、UnpooledUnsafeHeapByteBuf、UnpooledHeapByteBuf、UnpooledUnsafeDirectByteBuf、UnpooledDirectByteBuf。
ByteBuf接口之下有一個抽象類AbstractByteBuf,實現了接口定義的read、write、set相關的方法,但在實現時只做了檢查,而具體邏輯則定義一系列以_開頭的proteced方法,留待子類實現。
ByteBufAllocate
不同於一般形式的創建對象,ByteBuf需要通過內存分配器ByteBufAllocate分配,對應於不同的ByteBuf也會有不同的BtteBufferAllocate。netty將之抽象為ByteBufAllocate接口。我們看一下有哪些方法:
- buffer()、buffer(initialCapacity)、buffer(initialCapacity、maxCapacity),分配ByteBuf的方法,具體分配的Buffer是堆內還是堆外則由實現類決定。2個重載方法分別以給定初始容量、最大容量的方式分配內存
- ioBuffer()、ioBuffer(initialCapacity)、ioBuffer(initialCapacity、maxCapacity)更傾向於分配堆外內存的方法,因為堆外內存更適合用於IO操作。重載方法同上
- heapBuffer()、heapBuffer(initialCapacity)、heapBuffer(initialCapacity、maxCapacity)分配堆內內存的方法。
- directBuffer()、directBuffer(initialCapacity)、directBuffer(initialCapacity、maxCapacity)分配堆外內存的方法。
- compositeBuffer()。可以將多個ByteBuf合並為一個ByteBuf,多個ByteBuf可以部分是堆內內存,部分是堆外內存。
ByteBufAllocate接口定義了heap和direct這一個維度,其他維度則交由子類來定義。
UnPooledByteBufAllocate
ByteBufAllocate有一個直接實現類AbstractByteBufAllocate,它實現了大部分方法,只留下2個抽象方法newHeapBuffer和newDirectBuffer交由子類實現。AbstractByteBufAllocate有2個子類PooledByteBufAllocate和UnpooledByteBufAllocate,在這里定義了pooled池化維度的分配方式。
看看UnpooledByteBufAllocate如何實現2個抽象方法:
newHeapBuffer
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
可以看到實現類根據PlatformDependent.hasUnsafe()方法自動判定是否使用unsafe維度,這個方法通過在靜態代碼塊中嘗試初始化sun.misc.Unsafe來判斷Unsafe類是否在當前平台可用,在juc中,這個類使用頗多,作為與高並發打交道的netty,出現這個類不令人意外。UnpooledUnsafeHeapByteBuf與UnpooledHeapByteBuf並不是平級關系,事實上前者繼承了后者,在構造方法上也直接調用UnpooledHeapByteBuf的構造方法。構造方法比較簡單,初始化byte數組、初始容量、最大容量,將讀寫指針的設置為0,並將子類傳入的this指針保存到alloc變量中。
兩種Bytebuf的區別在於unsafe會嘗試通過反射的方式創建byte數組,並將數組的地址保存起來,之后再獲取數據時也會調用Unsafe的getByte方法,通過數組在內存中的地址+偏移量的形式直接獲取,而普通的SafeByteBuf則是保存byte數組,通過數組索引即array[index]訪問。
// UnsafeHeapByteBuf初始化數組
protected byte[] allocateArray(int initialCapacity) {
return PlatformDependent.allocateUninitializedArray(initialCapacity);
}
// HeapByteBuf初始化數組
protected byte[] allocateArray(int initialCapacity) {
return new byte[initialCapacity];
}
// UnsafeHeapByteBuf通過UnsafeByteBufUtil獲取字節
static byte getByte(byte[] data, int index) {
return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}
// HeapByteBuf獲取字節
static byte getByte(byte[] memory, int index) {
return memory[index];
}
newDirectBuffer
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
DirectByteBuf構造方法大致與heap的類似,只是保存數據的容器由字節數組變為了jdk的ByteBuffer。相應的,分配與釋放內存的方法也變成調用jdk的ByteBuffer方法。而UnsafeByteBuf更是直接用long類型記錄內存地址。
// DirectByteBuf獲取字節
protected byte _getByte(int index) {
return buffer.get(index);
}
// UnsafeDirectByteBuf獲取字節
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(addr(index));
}
// 獲取內存地址
final long addr(int index) {
return memoryAddress + index;
}
// UnsafeByteBufUtil獲取字節
static byte getByte(long address) {
return UNSAFE.getByte(address);
}
由於PooledByteBufAllocate內容較為龐大,放入下一節講述。
未完待續···