1. 概念
Java NIO API自帶的緩沖區類功能相當有限,沒有經過優化,使用JDK的ByteBuffer操作更復雜。故而Netty的作者Trustin Lee為了實現高效率的網絡傳輸,重新造輪子,Netty中的ByteBuf實際上就相當於JDK中的ByteBuffer,其作用是在Netty中通過Channel傳輸數據。
2. 優勢
- 可以自定義緩沖類型;
- 通過內置的復合緩沖類型,實現透明的零拷貝(zero-copy);
- 不需要調用flip()來切換讀/寫模式;
- 讀取和寫入索引分開;
- 方法鏈;
- 引用計數;
- Pooling(池)。
3. 實現機制
ByteBuf實際上是在一個抽象的字節數組byte[]
上進行讀/寫操作的集合。它提供了兩個指針變量用來支持讀寫操作:readerIndex
和writerIndex
。下圖展現了如何將一個buffer利用兩個指針來划分為三個區域。
由此可見,ByteBuf真正可讀取的內容長度是writerIndex - readerIndex
。
圍繞着讀和寫操作,接下來分析ByteBuf的實現邏輯。
3.1 讀操作
讀操作主要提供以下功能:
- readByte:取1字節的內容;
- readBoolean:取1字節的內容,返回
readByte() != 0
; - readUnsignedByte:取1字節的內容,返回(
(short) (readByte() & 0xFF)
);(能把負數轉換為無符號嗎?) - readShort:取2字節的內容,返回轉換后的
short
類型; - readUnsignedShort:取2字節的內容,返回
readShort() & 0xFFFF
; - readMedium:取3字節的內容,返回轉換后的
int
類型; - readUnsignedMedium:取3字節的內容,返回轉換后的
int
類型; - readInt:取4字節的內容;
- readUnsignedInt:取4字節的內容,返回
readInt() & 0xFFFFFFFFL
; - readLong:取8字節的內容;
- readChar:取1字節的內容;
- readFloat:取4字節的int內容,轉換為
float
類型; - readDouble:取8字節的long內容,轉換為
double
類型; - readBytes:取指定長度的內容,返回
ByteBuf
類型; - readSlice:取指定長度的內容,返回
ByteBuf
類型; - readBytes:取指定長度的內容到目標容器。
3.2 寫操作
寫操作提供的功能主要是往ByteBuf中寫入byte內容,不再一一贅述。主要區別在於寫入前根據類型轉換為相對應長度的byte數組。
主要函數是:writeBoolean、writeByte、writeShort、writeMedium、writeInt、writeLong、writeChar、writeFloat、writeDouble、writeBytes、writeZero。
3.3 邊界值安全
不論讀或寫,肯定會存在ByteBuf數據為空或滿的情形,作為數據容器,要存在邊界值檢查,確保讀寫安全。
可讀檢查
首先調用ensureAccessible()
方法來檢查ByteBuf對象是否被引用,如果其引用計數器為0,代表該對象將要被釋放,已失效,那么拋出IllegalReferenceCountException
異常,確保不能夠執行接下來的操作;
然后進行讀取參數合法性檢查,如不合法,則拋出響應的異常。
實現代碼框架如下:
protected final void checkReadableBytes(int minimumReadableBytes) {
ensureAccessible();
if (minimumReadableBytes < 0) {//參數為負,拋出參數非法異常
throw new IllegalArgumentException();
}
if (readerIndex > writerIndex - minimumReadableBytes) {
//容器中可讀字節不夠,拋出越界異常
throw new IndexOutOfBoundsException();
}
}
可寫檢查
進行待寫入長度的合法性檢查,如不合法,這拋出相應的異常。如果待寫入小於可寫的長度,則正常返回然;否則,進行容量的擴展,並確保容量是2的指數冪。
實現代碼框架如下:
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException();
}
if (minWritableBytes <= writableBytes()) {
return this;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException();
}
// 擴展現在的容量大小直到2的指數冪大小
int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);
// 調整容器到新的容量大小
capacity(newCapacity);
return this;
}
在函數calculateNewCapacity()中,主要判斷參數minNewCapacity與閾值threshold(4M)關系:
- 相等,則返回threshold;
- 大於,則按閾值(4MB)遞增,返回增加后新容量newCapacity;
- 小於,newCapacity的初始大小設置為64字節,每次翻倍,直到newCapacity大於minNewCapacity為止,返回
Math.min(newCapacity, maxCapacity)
。
注:maxCapacity
是使用者自己設定的ByteBuf容量上限。
4. 繼承層次
ByteBuf的類結構圖如下所示:
4.1 AbstractByteBuf
實現ByteBuf的一個骨架,提供容器的上層操作實現,具體的讀寫內容需要依賴於具體的ByteBuf實現類。
AbstractByteBuf有兩個實現類:AbstractDerivedByteBuf
和AbstractReferenceCountedByteBuf
。
4.1.1 AbstractDerivedByteBuf
ByteBuf的抽象基類,實現了包裝另一個ByteBuf功能。在AbstractByteBuf的基礎上提供了一下功能:
- refCnt:獲得該對象的引用計數;
- retain:增加該對象的引用計數(無參數:+1;有參數:+指定的increment);
- release:減少該對象的引用計數(無參數:-1;有參數:-指定的increment),當引用計數減少到0時,釋放該對象。返回值為true,當且僅當引用計數變為0和該對象已釋放。
- internalNioBuffer:內部實現就是簡單的調用
nioBuffer(index, length)
; - nioBuffer:得到內部buffer的一個區域包裝,即得到buffer的子區域作為NIO ByteBuffer,返回的ByteBuffer內容不會再受到原buffer索引或內容改變的影響。
AbstractDerivedByteBuf的派生類如下所示:
下面逐個分析下派生類的具體功能實現:
DuplicatedByteBuf
派生類buffer,簡單的把所有的數據訪問請求發送給內部的buffer。推薦使用ByteBuf.duplicate()
來創建該對象,而不是直接調用本身的構造函數。
對象與內部的buffer共享該buffer整個區域的緩沖數據,只不過它們餓單獨保持自己的索引標記。
ReadOnlyByteBuf
派生類buffer,將原有的ByteBuf包裝為制度的ByteBuf,所有的寫請求都將被禁止。推薦使用Unpooled.unmodifiableBuffer(ByteBuf)
來創建該對象,而不是直接調用本身的構造函數。
對象與內部的buffer使用相同的索引標記,即共享readerIndex
和writerIndex
。
SlicedByteBuf
派生類buffer,僅暴露內部buffer的一個子區域,即切片。推薦使用ByteBuf.slice()
或ByteBuf.slice(int, int)
來創建該對象,而不是直接調用本身的構造函數。
4.1.2 AbstractReferenceCountedByteBuf
ByteBuf的抽象基類,實現了引用計數功能。
提供一個volatile
類型的整型變量refCnt
來記錄引用次數。
主要的功能函數是refCnt、retain、release,用來更新引用次數refCnt
。
AbstractReferenceCountedByteBuf的派生類如下所示:
下面逐個分析下派生類的具體功能實現:
CompositeByteBuf
它是一個虛擬的buffer,將多個buffer合並為一個buffer,默認最大可以合並的buffer個數為DEFAULT_MAX_COMPONENTS
= 16。推薦使用ByteBufAllocator.compositeBuffer()
或Unpooled.wrappedBuffer(ByteBuf...)
,而不是直接調用本身的構造函數。
(合並操作還需仔細閱讀源碼,待完成)
FixedCompositeByteBuf
功能和CompositeByteBuf
相似,只是以只讀的方式合並一個ByteBuf數組。
在功能實現中,對於所有的可能更改buffers數組的set操作,均拋出ReadOnlyBufferException
異常。
PooledByteBuf
Pooled的基類, 提供Pool的基本實現。
PooledByteBuf有三個衍生類:
- PooledDirectByteBuf:提供池化的直接內存支持,基於 NIO ByteBuffer;
- PooledHeapByteBuf:提供池化的堆內存支持, 基於byte[];
- PooledUnsafeDirectByteBuf:提供池化的直接內存支持, 基於 NIO ByteBuffer。讀寫數據依賴於PlatformDependent,為了得到最佳性能,通過final類
Unsafe
來進行數據的讀寫。
ReadOnlyByteBufferBuf
包裝一個只讀的ByteBuffer,所有的set方法均拋出ReadOnlyBufferException
異常。
它具有一個衍生類ReadOnlyUnsafeDirectByteBuf
:
在ReadOnlyByteBufferBuf
的基礎上提供對direct ByteBuffer的支持,為了得到最佳性能,通過final類Unsafe
來進行數據的讀寫。
UnpooledDirectByteBuf
提供非池化的直接內存支持, 基於 NIO ByteBuffer。推薦使用Unpooled.directBuffer(int)
和Unpooled.wrappedBuffer(ByteBuffer)
來替代本身的構造函數來生成對象。
它具有一個衍生類ThreadLocalPooledByteBuf
。
UnpooledHeapByteBuf
提供非池化的堆內存支持, 基於byte[]。采用大端序來存儲數據。
UnpooledUnsafeDirectByteBuf
提供非池化的直接內存支持,基於 NIO ByteBuffer。讀寫數據依賴於PlatformDependent,為了得到最佳性能,通過final類Unsafe
來進行數據的讀寫。推薦使用Unpooled.directBuffer(int)
和Unpooled.wrappedBuffer(ByteBuffer)
來替代本身的構造函數來生成對象。
4.2 EmptyByteBuf
ByteBuf的直接實現類,構建一個容量和最大容量均為0的空ByteBuf。
它不能容納數據,對於所有的get、set、read和write操作均拋出IndexOutOfBoundsException
異常。
4.3 ReplayingDecoderBuffer
ByteBuf的直接實現類,buffer的數據讀取采用ReplayingDecoder
機制,它的原理是阻塞IO,當沒有讀取到足夠的數據時,會拋出REPLY異常,然后進入循環 while (in.isReadable())
不斷檢查是否有足夠的數據,直到讀取到足夠的數據放入到ReplayingDecoderBuffer對象中。
4.4 SwappedByteBuf
轉換字節序的ByteBuf包裝類,主要功能是交換內部ByteBuf的字節序列,即大端序BIG_ENDIAN
和小端序LITTLE_ENDIAN
之間的轉換。
4.5 WrappedByteBuf
ByteBuf的包裝類,將所有方法調用委派給被包裝的ByteBuf對象。
其派生類如下圖所示:
4.5.1 AdvancedLeakAwareByteBuf
為了方便監控ByteBuf的泄露,AdvancedLeakAwareByteBuf的所有方法都添加了leak.record()
,用來記錄調用者當前棧的蹤跡,從而ResourceLeakDetector
就可以發現最后被訪問的泄漏資源是哪個。
4.5.2 SimpleLeakAwareByteBuf
實現方式和AdvancedLeakAwareByteBuf
非常相似,但只對order
方法添加了資源泄露的檢查動作leak.record()
。
4.5.3 UnreleasableByteBuf
包裝其它的buffer到一個ByteBuf,用於阻止用戶增加或減少這個包裝buffer的引用計數,防止他人對ByteBuf的銷毀動作。
一般的使用場景就是定義特殊的常量ByteBuf,然后包裝成unreleasableBuffer()后就不怕被其他人錯誤的銷毀掉:
public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageToMessageEncoder<Object> {
private static final ByteBuf CRLF_BUF = unreleasableBuffer(directBuffer(CRLF.length).writeBytes(CRLF));
}
(END)