本文來分享Netty中的零拷貝機制以及內存緩沖區ByteBuf的實現。
源碼分析基於Netty 4.1.52
Netty中的零拷貝
Netty中零拷貝機制主要有以下幾種
1.文件傳輸類DefaultFileRegion#transferTo,調用FileChannel#transferTo,直接將文件緩沖區的數據發送到目標Channel,減少用戶緩沖區的拷貝(通過linux的sendfile函數)。
使用read 和 write過程如下
使用sendfile
可以看到,使用sendfile函數可以減少數據拷貝以及用戶態,內核態的切換
可參考: 操作系統和Web服務器那點事兒
2.Netty中提供了一些操作內存緩沖區的方法,如
Unpooled#wrappedBuffer方法,將byte數據,(jvm)ByteBuffer轉換為ByteBuf
CompositeByteBuf#addComponents方法,合並ByteBuf
ByteBuf#slice方法,提取ByteBuf中部分數據片段
ByteBuf#duplicate,復制一個內存緩沖區
這些方法都是基於對象引用的操作,並沒有內存拷貝,而是內存共享
3.使用堆外內存(jvm)ByteBuffer對Socket讀寫
如果使用JVM的堆內存讀取Socket數據,JVM會將Socket數據讀取到直接內存,再拷貝一份到堆內存中,寫入數據到Socket也需要將堆內存拷貝一份到直接內存中,然后才寫入Socket中。
因為操作系統進行io操作需要一個穩定的連續空間的字節空間, 但是java堆上的字節空間會隨着gc進行而進行移動, 如果操作系統讀取堆上的空間, 就會出錯。
使用堆外內存可以避免該拷貝操作。
注意,這里從內核緩沖區拷貝到用戶緩沖區的操作並不能省略,畢竟我們需要對數據進行操作,所以還是要拷貝到用戶態的。
可參考:
知乎--Java NIO中,關於DirectBuffer,HeapBuffer的疑問
知乎--Java NIO direct buffer的優勢在哪兒?
ByteBuf
ByteBuf是用於與Channel交互的內存緩沖區,提供順序訪問和隨機訪問。
Netty4中將ByteBuf調整為抽象類,從而提升吞吐量。
1.ByteBuffer
先了解一下ByteBuffer,ByteBuffer是JVM提供的字節內存緩沖區。ByteBuf是在ByteBuffer上進行的擴展,底層還是使用ByteBuffer。
ByteBuffer有兩個子類,DirectByteBuffer和HeapByteBuffer。
HeapByteBuffer使用ByteBuffer#hb(byte[])存儲數據。
DirectByteBuffer是堆外內存,使用的是操作系統的直接內存,它維護了一個引用address指向了底層數據,從而操作數據。(並沒有使用ByteBuffer#buff)
Buffer核心屬性
int position; //當前操作位置。
int mark; //為某一讀過的位置做標記,便於某些時候回退到該位置。
int capacity; //初始化時候的容量。
int limit; // 讀寫的限制位置,讀寫超出該位置會報錯
讀寫操作都是基於position,並以limit為限制的。mark,position,limit,capacity關系如下
0 <= mark <= position <= limit <= capacity
ByteBuffer提供了如下方法調整這些標志位置:
- clear
limit = position = 0
一般在把數據寫入Buffer前調用 - flip
limit = position
position = 0
一般在從Buffer讀出數據前調用 - rewind
position=0
limit不變
一般在把數據重寫入Buffer前調用。 - compacting
清除已經讀過的數據。任何未讀的數據都被移到緩沖區的起始處,新寫入的數據將放到緩沖區未讀數據的后面
ByteBuffer還提供了一些操作緩沖區的方法
- duplicate
創建新字節緩沖區,共享當前緩沖區內容 - slice
創建新字節緩沖區,共享當前緩沖區內容子序列。
Netty的ByteBuf使用readerIndex標志讀位置,writerIndex標志寫位置,比(jvm)ByteBuffer設計更優雅。
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
ByteBuf提供readerIndex/writerIndex等方法獲取或設置這兩個值,非常直觀。另外,ByteBuf提供了如下方法操作緩沖區
-
discardReadBytes
清除已經讀過的數據。未讀的數據都被移到緩沖區的起始處,新寫入的數據將放到緩沖區未讀數據的后面 -
duplicate
創建新字節緩沖區,共享當前緩沖區內容 -
slice(int index, int length)
創建共享內存的ByteBuf,從index開始,長度為length -
readSlice(int length)
創建共享內存的ByteBuf,從readerIndex開始,長度為length -
retainedDuplicate()
創建共享內存的ByteBuf,並且當前ByteBuf的引用計數加1
2.接口關系
AbstractByteBuf:實現一些公共邏輯,如讀寫前檢查位置。
AbstractReferenceCountedByteBuf,添加引用計數邏輯,實現引用計數回收直接內存。
PooledByteBuf:實現池化ByteBuf的公共邏輯。關於Netty中的內存池后面有文章解析。
PooledByteBuf#memory是底層的內存存儲,PooledDirectByteBuf該字段是ByteBuffer,PooledHeapByteBuf則是byte[]。
下面可以分為Unsafe,No_Unsafe兩個維度。Unsafe就是sun.misc.Unsafe。
使用Unsafe可以提高性能,但Unsafe是JDK內部的類,並非公開標准,不一定所有JDK都存在這個類, JDK以后也有可能去掉這個類,所以Netty提供了兩套實現。
3.內存分配
后面有文章解析Netty內存池,分享Netty中如何分配內存給ByteBuf。這里先不深入。
4.讀寫過程
下面看一下ByteBuf與Channel如何交互數據。
前面分享Netty讀寫過程的文章說過了,NioByteUnsafe#read方法讀取數據。
NioByteUnsafe#read -> NioSocketChannel#doReadBytes -> AbstractByteBuf#writeBytes -> PooledByteBuf#setBytes
public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
try {
return in.read(internalNioBuffer(index, length));
} catch (ClosedChannelException ignored) {
return -1;
}
}
index參數就是writerIndex,internalNioBuffer方法會構造一個新的ByteBuffer,並設置ByteBuffer#position為index
直接調用ReadableByteChannel#read讀取數據
在《ChannelOutboundBuffer與flush操作》中已經分享過,
ChannelOutboundBuffer#nioBuffers也是通過internalNioBuffer方法生成ByteBuffer,
作為參數調用NioSocketChannel#doWrite方法,直接將數據拷貝到Channel。
ByteBuf#internalNioBuffer -> PooledByteBuf#_internalNioBuffer
final ByteBuffer _internalNioBuffer(int index, int length, boolean duplicate) {
index = idx(index);
ByteBuffer buffer = duplicate ? newInternalNioBuffer(memory) : internalNioBuffer();
buffer.limit(index + length).position(index);
return buffer;
}
newInternalNioBuffer由子類實現,構建對應的DirectByteBuffer或者HeapByteBuffer,注意,這里的內存是共享的。
5.引用計數
由於使用了直接內存,不能依賴JVM垃圾回收器釋放內存,Netty使用引用計數算法釋放內存。
ReferenceCounted接口,代表需要顯式釋放的引用計數對象,retain方法增加引用計數,release方法減少引用計數。
AbstractReferenceCountedByteBuf實現了ReferenceCounted接口,它維護了refCnt變量作為引用計數。
構造一個AbstractReferenceCountedByteBuf時,refCnt為1。
當引用計數release到0時,調用deallocate()方法釋放內存。
PooledByteBuf#deallocate
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
tmpNioBuf = null;
chunk.arena.free(chunk, handle, maxLength, cache);
chunk = null;
recycle();
}
}
這里調用的是PoolArena#free。
PoolArena可以理解為一個內存池,這里free實際是將內存放回內存池中,由內存池決定是否需要銷毀底層直接內存。
PoolArena后面有對應文章解析。
6.內存銷毀
銷毀DirectByteBuf,有兩個方式
利用反射獲取Unsafe,調用Unsafe#freeMemory
利用反射獲取DirectByteBuffer#cleaner(sun.misc.Cleaner),通過反射調用cleaner#clean方法
因為Netty不確認JDK中是否存在sun.misc.Cleaner,所以它也實現了兩套機制。
PoolArenaDirect#free -> Arena#destroyChunk
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
if (PlatformDependent.useDirectBufferNoCleaner()) {
PlatformDependent.freeDirectNoCleaner(chunk.memory);
} else {
PlatformDependent.freeDirectBuffer(chunk.memory);
}
}
從PlatformDependent中確認是否使用CLEANER
if (maxDirectMemory == 0 || !hasUnsafe() || !PlatformDependent0.hasDirectBufferNoCleanerConstructor()) {
USE_DIRECT_BUFFER_NO_CLEANER = false;
DIRECT_MEMORY_COUNTER = null;
}
滿足以下條件中一個就使用CLEANER,否則使用NO_CLEANER
- 沒有使用直接內存
- JVM不支持Unsafe
- ByteBuffer不存在無Cleaner的構造函數
如果您覺得本文不錯,歡迎關注我的微信公眾號。您的關注是我堅持的動力!