PooledByteBufAllocator buffer分配
buffer分配的入口:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(int, int)
netty實際應用時分配調用棧:
| CLASS_NAME | METHOD_NAME | LINE_NUM |
| io/netty/buffer/PooledByteBufAllocator | newDirectBuffer | 339 |
| io/netty/buffer/AbstractByteBufAllocator | directBuffer | 185 |
| io/netty/buffer/AbstractByteBufAllocator | directBuffer | 176 |
| io/netty/buffer/AbstractByteBufAllocator | ioBuffer | 139 |
| io/netty/channel/DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle | allocate | 114 |
| io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe | read | 186 |
| io/netty/channel/nio/NioEventLoop | processSelectedKey | 682 |
| io/netty/channel/nio/NioEventLoop | processSelectedKeysOptimized | 628 |
| io/netty/channel/nio/NioEventLoop | processSelectedKeys | 533 |
| io/netty/channel/nio/NioEventLoop | run | 511 |
| io/netty/util/concurrent/SingleThreadEventExecutor$5 | run | 956 |
測試case代碼
package io.netty.buffer;
import org.junit.Assert;
public class PooledByteBufTest {
public static void main(String[] args) {
final PooledByteBufAllocator allocator = new PooledByteBufAllocator(
false, // preferDirect
0, // nHeapArena
1, // nDirectArena
8192, // pageSize
11, // maxOrder
3, // tinyCacheSize
3, // smallCacheSize
3, // normalCacheSize
true // useCacheForAllThreads
);
// create tiny buffer
final ByteBuf b1 = allocator.directBuffer(24);
// create small buffer
final ByteBuf b2 = allocator.directBuffer(800);
// create normal buffer
final ByteBuf b3 = allocator.directBuffer(8192 * 2);
Assert.assertNotNull(b1);
Assert.assertNotNull(b2);
Assert.assertNotNull(b3);
// then release buffer to deallocated memory while threadlocal cache has been disabled
// allocations counter value must equals deallocations counter value
Assert.assertTrue(b1.release());
Assert.assertTrue(b2.release());
Assert.assertTrue(b3.release());
}
}
PoolChunk
PoolChunk本身數據結構與設計思路參見PoolChunk注釋:
/**
* Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
*
* Notation: The following terms are important to understand the code
* > page - a page is the smallest unit of memory chunk that can be allocated
* page是chunk中能分配的最小單元
* > chunk - a chunk is a collection of pages
* 一個chunk中有一組page 1對多
* > in this code chunkSize = 2^{maxOrder} * pageSize
* 代碼中 chunksize大小計算如上 maxOrder 是啥?
*
* To begin we allocate a byte array of size = chunkSize
* Whenever a ByteBuf of given size needs to be created we search for the first position
* in the byte array that has enough empty space to accommodate the requested size and
* return a (long) handle that encodes this offset information, (this memory segment is then
* marked as reserved so it is always used by exactly one ByteBuf and no more)
* 首先,當需要創建給定大小的ByteBuf時,我們分配一個size=chunkSize的字節數組,
* 在字節數組中搜索第一個有足夠的空空間來容納請求的大小的位置,
* 並返回一個(長)句柄來編碼該偏移量信息(然后將該內存段標記為保留,因此它總是僅由一個ByteBuf使用,不再使用)
*
* For simplicity all sizes are normalized according to PoolArena#normalizeCapacity method
* This ensures that when we request for memory segments of size >= pageSize the normalizedCapacity
* equals the next nearest power of 2
* 為了簡單起見,所有大小都按照PoolArena#normalizeCapacity方法進行規范化
* 這確保當我們請求大小大於等於pageSize的內存段時,normalized容量等於下一個最接近的2的冪
*
* To search for the first offset in chunk that has at least requested size available we construct a
* complete balanced binary tree and store it in an array (just like heaps) - memoryMap
* 為了搜索塊中至少有請求大小可用的第一個偏移量,我們構造了一個完整的平衡二叉樹,並將其存儲在一個數組(就像堆一樣)-內存映射中
*
* The tree looks like this (the size of each node being mentioned in the parenthesis)
* 樹看起來是這樣的(括號中提到的每個節點的大小)
*
* depth=0 1 node (chunkSize)
* depth=1 2 nodes (chunkSize/2)
* ..
* ..
* depth=d 2^d nodes (chunkSize/2^d)
* ..
* depth=maxOrder 2^maxOrder nodes (chunkSize/2^{maxOrder} = pageSize) pageSize 在最下一層 最頂層是chunksize 從上往下走,每過一層除以2
*
* depth=maxOrder is the last level and the leafs consist of pages
*
* With this tree available searching in chunkArray translates like this:
* To allocate a memory segment of size chunkSize/2^k we search for the first node (from left) at height k
* which is unused 要分配大小為chunkSize/2^k的內存段,我們在高度k處搜索第一個未使用的節點(從左開始)。 嗯嗯
*
* Algorithm:
* ----------
* Encode the tree in memoryMap with the notation 用符號將樹編碼在內存中
* memoryMap[id] = x => in the subtree rooted at id, the first node that is free to be allocated
* is at depth x (counted from depth=0) i.e., at depths [depth_of_id, x), there is no node that is free
* 在以id為根的子樹中,可自由分配的第一個節點在深度x(從深度=0開始計算),即在深度[深度id,x的深度]處,沒有可自由分配的節點
*
* As we allocate & free nodes, we update values stored in memoryMap so that the property is maintained
* 當我們分配空閑節點時,我們更新存儲在memoryMap中的值,以便維護屬性
*
* Initialization -
* In the beginning we construct the memoryMap array by storing the depth of a node at each node
* 首先,我們通過在每個節點上存儲一個節點的深度來構造memoryMap數組
* i.e., memoryMap[id] = depth_of_id
*
* Observations:
* -------------
* 1) memoryMap[id] = depth_of_id => it is free / unallocated
* 2) memoryMap[id] > depth_of_id => at least one of its child nodes is allocated, so we cannot allocate it, but
* some of its children can still be allocated based on their availability
* 3) memoryMap[id] = maxOrder + 1 => the node is fully allocated & thus none of its children can be allocated, it
* is thus marked as unusable
*
* Algorithm: [allocateNode(d) => we want to find the first node (from left) at height h that can be allocated]
* ----------
* 1) start at root (i.e., depth = 0 or id = 1)
* 2) if memoryMap[1] > d => cannot be allocated from this chunk
* 3) if left node value <= h; we can allocate from left subtree so move to left and repeat until found
* 4) else try in right subtree
*
* Algorithm: [allocateRun(size)]
* ----------
* 1) Compute d = log_2(chunkSize/size)
* 2) Return allocateNode(d)
*
* Algorithm: [allocateSubpage(size)]
* ----------
* 1) use allocateNode(maxOrder) to find an empty (i.e., unused) leaf (i.e., page)
* 2) use this handle to construct the PoolSubpage object or if it already exists just call init(normCapacity)
* note that this PoolSubpage object is added to subpagesPool in the PoolArena when we init() it
*
* Note:
* -----
* In the implementation for improving cache coherence,
* we store 2 pieces of information depth_of_id and x as two byte values in memoryMap and depthMap respectively
*
* memoryMap[id]= depth_of_id is defined above
* depthMap[id]= x indicates that the first node which is free to be allocated is at depth x (from root)
*/
final class PoolChunk<T> implements PoolChunkMetric {
io.netty.buffer.PoolArena.findSubpagePoolHead(int) 算出page header在page table中的index,小的page在前面
// trace 庫地址 jdbc:h2:/Users/simon/twice-cooked-pork/trace-data/基於netty4做的resetserver的一次http請求trace/tracer.data.h2db
PoolChunk要解決的問題有:
- 快速查找未分配的地方並分配
- 盡量不要有碎片,可以理解成盡量挨着緊湊的分配
整個chunk的結構如下:
+------+ chunksize 當L=11時,是16M
L=0 | 0 |
+----------------+------+------------------+
| |
| |
| |
+---v--+ +---v--+
L=1 | 1 | | 2 |
+------+------+------+ +------+------+-------+
| | | |
| | | |
| | | |
+---v--+ +---v--+ +---v--+ +---v--+
L=2 | 3 | | 4 | | 5 | | 6 |
+--+------+-+ +-+------+--+ +--+------+--+ +-+------+--+
| | | | | | | |
| | | | | | | |
| | | | | | | |
+--v---+ +---v--+ +--v---+ +---v--+ +-v----+ +---v--+ +--v---+ +---v--+
L=3 | 7 | | 8 | | 9 | | 10 | | 11 | | 12 | | 13 | | 14 |
+------+ +------+ +------+ +------+ +------+ +------+ +------+ +------+
8K大小即page size
是一個完全二叉樹,樹的層高可以自定義,目前限制在14層內,默認是11層。
最底層是真正的chunk描述,最底層每個葉子是一個paage,大小為8K。那么當層數是11層時,chunk的size是16M。因為11層的話,最下面一層葉子是2的11次方,再乘以8K正好是16MB。
這棵樹中每個節點還對對應其相應的大小是否被分配。什么叫其相應的大小?是這樣的,每一層代表需要分配的大小的檔次。暫且用檔次這個詞吧。最上面是16MB檔次,最下面是8K檔次,從最上面開始往下走一層,檔次就除以2。
每次申請內存時,netty會先對其做規格化,所謂規格化就是最接近申請內存值的2de整數次冪。比如我申請900byte,那么規格化后就是1K。在規格化后,netty會在樹上標志 0 1 3 7被使用了。下次要再申請8K內存時就要避開這個路徑了,只能是 0 1 3 8 了,因為7那邊已經不夠了。其他大小同理。所以樹上的節點是為了標志是否被使用過,以使得內存碎片減少盡量靠左緊湊分配。 對於單page內的內存使用浪費問題,netty又做了一層位圖結構使其得以利用。對於chunk對象的查找,netty還做了緩存機制,下面有講。
真正數據存放在 io.netty.buffer.PoolChunk.memory 這個字段中,調試時為:java.nio.DirectByteBuffer[pos=0 lim=16777216 cap=16777216]
16777216是16M
作業
仔細調試 1K 2k 3K 8K 11K 內存的多次分配與回收。
分配24byte過程
PooledUnsafeDirectByteBuf是用了對象池特性io.netty.buffer.PooledUnsafeDirectByteBuf.RECYCLER
PoolArena
PoolArena 這一層負責創建與維護PoolChunk,維護的方式是將用到的正在分配中的PoolChunk放到PoolChunkList這個列表中。
PoolChunkList是一個鏈是結構。
而且,PoolArena還按PoolChunk的使用量來分別維護到相對應的PoolChunkList中。
// abstract class PoolArena<T> implements PoolArenaMetric {
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
這些PoolChunkList也是按使用量大小有序的鏈式的串在一起(參見PoolArena構造方法中初始化這些list字段的代碼),當使用量達到本級別時,會加入到下一級別的list中,比如達到25%了,那么就會加到50%列表中了。(參見io.netty.buffer.PoolChunkList.add(PoolChunk
void add(PoolChunk<T> chunk) {
if (chunk.usage() >= maxUsage) {
nextList.add(chunk);
return;
}
add0(chunk);
}
PoolArena中還維護了兩個PoolSubpage數組,每個數組里面的實例在PoolArena構造時初始化,剛初始化后每個PoolSubpage元素的前繼與后繼元素都是指向自己(PoolSubpage是支持鏈表式的一個結構)
在io.netty.buffer.PoolSubpage.addToPool(PoolSubpage
private long allocateSubpage(int normCapacity) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity); // 這個是查找PoolArena的PoolSubpage數組
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
synchronized (head) {
int id = allocateNode(d);
if (id < 0) {
return id;
}
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
freeBytes -= pageSize;
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity); // 此處會將新新構建出來的PoolSubpage實例加到head的next節點
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
return subpage.allocate();
}
}
PoolArenad的cache與Recycler對象池
PooledByteBuf依賴PoolThreadCache做了一層對PoolChunk的緩存,PoolThreadCache靠MemoryRegionCache實現緩存。MemoryRegionCache靠隊列來實現對PoolChunk的緩存(參見下面代碼1),MemoryRegionCache在buf釋放時會調用其add接口將釋放的PoolChunk對象和nioBuffer對象通過io.netty.buffer.PoolThreadCache.MemoryRegionCache.Entry
Recycler對象池完成分配(獲取)已釋放的。對象是本質上一個通過FastThreadLocal的Stack的數據結構,分配對應出棧,釋放對象入棧。具體參見下面代碼2。
Recycler
是一個基於ThreadLocal結合stack玩起來的一個對象池數據結構,像上述這種就是PooledUnsafeDirectByteBuf的對象pool。回收的時候壓棧,要用的時候出棧。
獲取對象 io.netty.util.Recycler.get()
回收對象 io.netty.util.Recycler.DefaultHandle.recycle(Object)
代碼1: 隊列初始化
Queue<Entry<T>> queue = PlatformDependent.newFixedMpscQueue(this.size);
堆棧1:buf釋放時會調用MemoryRegionCache add接口將釋放的PoolChunk對象包裝后入隊:
Thread [main] (Suspended (breakpoint at line 393 in PoolThreadCache$MemoryRegionCache))
PoolThreadCache$SubPageMemoryRegionCache<T>(PoolThreadCache$MemoryRegionCache<T>).add(PoolChunk<T>, ByteBuffer, long) line: 393
PoolThreadCache.add(PoolArena<?>, PoolChunk, ByteBuffer, long, int, SizeClass) line: 209
PoolArena$DirectArena(PoolArena<T>).free(PoolChunk<T>, ByteBuffer, long, int, PoolThreadCache) line: 273
PooledUnsafeDirectByteBuf(PooledByteBuf<T>).deallocate() line: 171
PooledUnsafeDirectByteBuf(AbstractReferenceCountedByteBuf).release0(int) line: 136
PooledUnsafeDirectByteBuf(AbstractReferenceCountedByteBuf).release() line: 124
PooledByteBufTest.main(String[]) line: 43
代碼2:Entry對象使用對象池
private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
@SuppressWarnings("unchecked")
@Override
protected Entry newObject(Handle<Entry> handle) {
return new Entry(handle);
}
};
private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle) {
Entry entry = RECYCLER.get();
entry.chunk = chunk;
entry.nioBuffer = nioBuffer;
entry.handle = handle;
return entry;
}
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
stack.push(this);
}
PooledByteBufAllocator創建及其關聯細節
- PooledByteBufAllocator validateAndCalculateChunkSize 校驗樹高度不能超過14,且根據pageSize(可以外部指定)和樹高計算出chunksize
- PooledByteBufAllocator validateAndCalculatePageShifts 校驗pageSize最小不能小於4K,且pageSize必須是2的整數次方((pageSize & pageSize - 1) != 0) (為什么(pageSize & pageSize - 1) != 0能判斷?因為2的n次方的二進制形式一定是第一位1后面接n個0,減1后就變成第一位0后面接n個1,相與之后一定是0;如果不是2的n次方的數的二進制形式一定是第一位是1,且,這個數減去1后,第一位一定還是1,因為第一位是1且后面全接0的數一定是2的整數次方的,那么不是2的整數次方的數后面一定不全是0,所以減去1后第一位肯定還是1,所以不管后面接的這些數相與是怎樣的結果,第一位兩個1相與出來肯定是1,肯定不為0,所以能用這個辦法判斷)
- 創建tinySubpagePools數組並初始化里面的元素,默認數組大小32個,里面的元素是PoolSubpage,PoolSubpage還支持鏈式形式連接(他有前繼和后繼)
PoolChunk 分配與釋放小於pagesize的buf
io.netty.buffer.PoolArena.free(PoolChunk
位圖相關:
// long64位 取 高32位轉成整數
private static int bitmapIdx(long handle) {
return (int) (handle >>> Integer.SIZE);
}
PoolSubpage 支持位圖
一個page 8192大小 一個塊(element)大小32,那么一個page可以拆成256個,每申請一次numAvail減去1。
long型位圖數組中有個8個元素,8192/16/64=8, 64是long的位數,。
分配時bitmap中元素,以第一個元素為例子,按1 3 7 15 31 63 127網上漲,釋放的時候按對應數據往下減,並且在釋放時記錄nextAvail值,便於下次申請時優先使用。
bitmap中的4個(bitmapLength)long來維護256個(maxNumElems=pageSize/elemSize)塊是否使用的情況。
final class PoolSubpage<T> implements PoolSubpageMetric {
final PoolChunk<T> chunk;
private final int memoryMapIdx;
private final int runOffset;
private final int pageSize;
private final long[] bitmap; // 位圖...,默認有8個元素 個數= pagesize >>> 10 (pagesize / 16 / 64)64應該是long的位數,16是啥?一個element算256。 實際這個數組默認只用4個元素
PoolSubpage<T> prev;
PoolSubpage<T> next;
boolean doNotDestroy;
int elemSize;
private int maxNumElems; // 一個page再分maxNumElems分 默認是256
private int bitmapLength; // 默認是4 256 >>> 6 = 4
private int nextAvail; // 在有buf釋放時會設置這個值,以使得他們在下次分配時優先使用這個
private int numAvail;
long allocate() {
if (elemSize == 0) {
return toHandle(0);
}
if (numAvail == 0 || !doNotDestroy) {
return -1;
}
final int bitmapIdx = getNextAvail();
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) == 0;
bitmap[q] |= 1L << r; // 按1 3 7 15 31 63 127往上漲
if (-- numAvail == 0) {
removeFromPool();
}
return toHandle(bitmapIdx);
}
private int findNextAvail() {
final long[] bitmap = this.bitmap;
final int bitmapLength = this.bitmapLength;
for (int i = 0; i < bitmapLength; i ++) {
long bits = bitmap[i];
if (~bits != 0) { // 這個表示這個long上是否所有的位都用完了。。
return findNextAvail0(i, bits);
}
}
return -1;
}
private int findNextAvail0(int i, long bits) {
final int maxNumElems = this.maxNumElems;
final int baseVal = i << 6;
for (int j = 0; j < 64; j ++) {
if ((bits & 1) == 0) { // 判斷是否是偶數
int val = baseVal | j;
if (val < maxNumElems) {
return val;
} else {
break;
}
}
bits >>>= 1; // 除以2 並向靠近的2的整數次冪對齊
}
return -1;
}
free時不是每次都會真正釋放,在下面會先加入到MemoryRegionCache的queue中cache起來,當queue中放不下時才真正free,代碼如下:
// PoolArena.class
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
freeChunk(chunk, handle, sizeClass, nioBuffer);
}
}
