[netty4][netty-buffer]netty之池化buffer


PooledByteBufAllocator buffer分配

buffer分配的入口:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(int, int)
netty實際應用時分配調用棧:

CLASS_NAME METHOD_NAME LINE_NUM
io/netty/buffer/PooledByteBufAllocator newDirectBuffer 339
io/netty/buffer/AbstractByteBufAllocator directBuffer 185
io/netty/buffer/AbstractByteBufAllocator directBuffer 176
io/netty/buffer/AbstractByteBufAllocator ioBuffer 139
io/netty/channel/DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle allocate 114
io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe read 186
io/netty/channel/nio/NioEventLoop processSelectedKey 682
io/netty/channel/nio/NioEventLoop processSelectedKeysOptimized 628
io/netty/channel/nio/NioEventLoop processSelectedKeys 533
io/netty/channel/nio/NioEventLoop run 511
io/netty/util/concurrent/SingleThreadEventExecutor$5 run 956

測試case代碼

package io.netty.buffer;

import org.junit.Assert;
public class PooledByteBufTest {

	public static void main(String[] args) {
		  final PooledByteBufAllocator allocator = new PooledByteBufAllocator(
	                false,   // preferDirect
	                0,      // nHeapArena
	                1,      // nDirectArena
	                8192,   // pageSize
	                11,     // maxOrder
	                3,      // tinyCacheSize
	                3,      // smallCacheSize
	                3,      // normalCacheSize
	                true    // useCacheForAllThreads
	                );

	        // create tiny buffer
	        final ByteBuf b1 = allocator.directBuffer(24);
	        // create small buffer
	        final ByteBuf b2 = allocator.directBuffer(800);
	        // create normal buffer
	        final ByteBuf b3 = allocator.directBuffer(8192 * 2);

	        Assert.assertNotNull(b1);
	        Assert.assertNotNull(b2);
	        Assert.assertNotNull(b3);

	        // then release buffer to deallocated memory while threadlocal cache has been disabled
	        // allocations counter value must equals deallocations counter value
	        Assert.assertTrue(b1.release());
	        Assert.assertTrue(b2.release());
	        Assert.assertTrue(b3.release());
	}
}

PoolChunk

PoolChunk本身數據結構與設計思路參見PoolChunk注釋:

/**
 * Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
 *
 * Notation: The following terms are important to understand the code
 * > page  - a page is the smallest unit of memory chunk that can be allocated
 * page是chunk中能分配的最小單元  
 * > chunk - a chunk is a collection of pages
 * 一個chunk中有一組page  1對多  
 * > in this code chunkSize = 2^{maxOrder} * pageSize
 * 代碼中  chunksize大小計算如上  maxOrder 是啥?
 *
 * To begin we allocate a byte array of size = chunkSize
 * Whenever a ByteBuf of given size needs to be created we search for the first position
 * in the byte array that has enough empty space to accommodate the requested size and
 * return a (long) handle that encodes this offset information, (this memory segment is then
 * marked as reserved so it is always used by exactly one ByteBuf and no more)
 * 首先,當需要創建給定大小的ByteBuf時,我們分配一個size=chunkSize的字節數組,
 * 在字節數組中搜索第一個有足夠的空空間來容納請求的大小的位置,
 * 並返回一個(長)句柄來編碼該偏移量信息(然后將該內存段標記為保留,因此它總是僅由一個ByteBuf使用,不再使用)
 *
 * For simplicity all sizes are normalized according to PoolArena#normalizeCapacity method
 * This ensures that when we request for memory segments of size >= pageSize the normalizedCapacity
 * equals the next nearest power of 2
 * 為了簡單起見,所有大小都按照PoolArena#normalizeCapacity方法進行規范化
 * 這確保當我們請求大小大於等於pageSize的內存段時,normalized容量等於下一個最接近的2的冪
 *
 * To search for the first offset in chunk that has at least requested size available we construct a
 * complete balanced binary tree and store it in an array (just like heaps) - memoryMap
 * 為了搜索塊中至少有請求大小可用的第一個偏移量,我們構造了一個完整的平衡二叉樹,並將其存儲在一個數組(就像堆一樣)-內存映射中
 *
 * The tree looks like this (the size of each node being mentioned in the parenthesis)
 * 樹看起來是這樣的(括號中提到的每個節點的大小)
 *
 * depth=0        1 node (chunkSize)
 * depth=1        2 nodes (chunkSize/2)
 * ..
 * ..
 * depth=d        2^d nodes (chunkSize/2^d)
 * ..
 * depth=maxOrder 2^maxOrder nodes (chunkSize/2^{maxOrder} = pageSize)  pageSize 在最下一層  最頂層是chunksize 從上往下走,每過一層除以2  
 *
 * depth=maxOrder is the last level and the leafs consist of pages
 *
 * With this tree available searching in chunkArray translates like this:
 * To allocate a memory segment of size chunkSize/2^k we search for the first node (from left) at height k
 * which is unused 要分配大小為chunkSize/2^k的內存段,我們在高度k處搜索第一個未使用的節點(從左開始)。 嗯嗯
 *
 * Algorithm:
 * ----------
 * Encode the tree in memoryMap with the notation  用符號將樹編碼在內存中
 *   memoryMap[id] = x => in the subtree rooted at id, the first node that is free to be allocated
 *   is at depth x (counted from depth=0) i.e., at depths [depth_of_id, x), there is no node that is free
 * 在以id為根的子樹中,可自由分配的第一個節點在深度x(從深度=0開始計算),即在深度[深度id,x的深度]處,沒有可自由分配的節點
 *
 *  As we allocate & free nodes, we update values stored in memoryMap so that the property is maintained
 * 當我們分配空閑節點時,我們更新存儲在memoryMap中的值,以便維護屬性
 *
 * Initialization -
 *   In the beginning we construct the memoryMap array by storing the depth of a node at each node
 * 首先,我們通過在每個節點上存儲一個節點的深度來構造memoryMap數組
 *     i.e., memoryMap[id] = depth_of_id
 *
 * Observations:
 * -------------
 * 1) memoryMap[id] = depth_of_id  => it is free / unallocated
 * 2) memoryMap[id] > depth_of_id  => at least one of its child nodes is allocated, so we cannot allocate it, but
 *                                    some of its children can still be allocated based on their availability
 * 3) memoryMap[id] = maxOrder + 1 => the node is fully allocated & thus none of its children can be allocated, it
 *                                    is thus marked as unusable
 *
 * Algorithm: [allocateNode(d) => we want to find the first node (from left) at height h that can be allocated]
 * ----------
 * 1) start at root (i.e., depth = 0 or id = 1)
 * 2) if memoryMap[1] > d => cannot be allocated from this chunk
 * 3) if left node value <= h; we can allocate from left subtree so move to left and repeat until found
 * 4) else try in right subtree
 *
 * Algorithm: [allocateRun(size)]
 * ----------
 * 1) Compute d = log_2(chunkSize/size)
 * 2) Return allocateNode(d)
 *
 * Algorithm: [allocateSubpage(size)]
 * ----------
 * 1) use allocateNode(maxOrder) to find an empty (i.e., unused) leaf (i.e., page)
 * 2) use this handle to construct the PoolSubpage object or if it already exists just call init(normCapacity)
 *    note that this PoolSubpage object is added to subpagesPool in the PoolArena when we init() it
 *
 * Note:
 * -----
 * In the implementation for improving cache coherence,
 * we store 2 pieces of information depth_of_id and x as two byte values in memoryMap and depthMap respectively
 *
 * memoryMap[id]= depth_of_id  is defined above
 * depthMap[id]= x  indicates that the first node which is free to be allocated is at depth x (from root)
 */
final class PoolChunk<T> implements PoolChunkMetric {

io.netty.buffer.PoolArena.findSubpagePoolHead(int) 算出page header在page table中的index,小的page在前面

// trace 庫地址 jdbc:h2:/Users/simon/twice-cooked-pork/trace-data/基於netty4做的resetserver的一次http請求trace/tracer.data.h2db

PoolChunk要解決的問題有:

  1. 快速查找未分配的地方並分配
  2. 盡量不要有碎片,可以理解成盡量挨着緊湊的分配

整個chunk的結構如下:

                                                    +------+   chunksize 當L=11時,是16M
L=0                                                 |   0  |
                                   +----------------+------+------------------+
                                   |                                          |
                                   |                                          |
                                   |                                          |
                               +---v--+                                   +---v--+
L=1                            |   1  |                                   |   2  |
                        +------+------+------+                     +------+------+-------+
                        |                    |                     |                     |
                        |                    |                     |                     |
                        |                    |                     |                     |
                    +---v--+             +---v--+              +---v--+              +---v--+
L=2                 |   3  |             |   4  |              |   5  |              |   6  |
                 +--+------+-+         +-+------+--+        +--+------+--+         +-+------+--+
                 |           |         |           |        |            |         |           |
                 |           |         |           |        |            |         |           |
                 |           |         |           |        |            |         |           |
              +--v---+   +---v--+   +--v---+   +---v--+   +-v----+   +---v--+   +--v---+   +---v--+
L=3           |  7   |   |   8  |   |  9   |   |  10  |   |  11  |   |  12  |   |  13  |   |  14  |
              +------+   +------+   +------+   +------+   +------+   +------+   +------+   +------+
               8K大小即page size

是一個完全二叉樹,樹的層高可以自定義,目前限制在14層內,默認是11層。
最底層是真正的chunk描述,最底層每個葉子是一個paage,大小為8K。那么當層數是11層時,chunk的size是16M。因為11層的話,最下面一層葉子是2的11次方,再乘以8K正好是16MB。
這棵樹中每個節點還對對應其相應的大小是否被分配。什么叫其相應的大小?是這樣的,每一層代表需要分配的大小的檔次。暫且用檔次這個詞吧。最上面是16MB檔次,最下面是8K檔次,從最上面開始往下走一層,檔次就除以2。
每次申請內存時,netty會先對其做規格化,所謂規格化就是最接近申請內存值的2de整數次冪。比如我申請900byte,那么規格化后就是1K。在規格化后,netty會在樹上標志 0 1 3 7被使用了。下次要再申請8K內存時就要避開這個路徑了,只能是 0 1 3 8 了,因為7那邊已經不夠了。其他大小同理。所以樹上的節點是為了標志是否被使用過,以使得內存碎片減少盡量靠左緊湊分配。 對於單page內的內存使用浪費問題,netty又做了一層位圖結構使其得以利用。對於chunk對象的查找,netty還做了緩存機制,下面有講。

真正數據存放在 io.netty.buffer.PoolChunk.memory 這個字段中,調試時為:java.nio.DirectByteBuffer[pos=0 lim=16777216 cap=16777216]
16777216是16M

作業

仔細調試 1K 2k 3K 8K 11K 內存的多次分配與回收。

分配24byte過程

PooledUnsafeDirectByteBuf是用了對象池特性io.netty.buffer.PooledUnsafeDirectByteBuf.RECYCLER

PoolArena

PoolArena 這一層負責創建與維護PoolChunk,維護的方式是將用到的正在分配中的PoolChunk放到PoolChunkList這個列表中。
PoolChunkList是一個鏈是結構。
而且,PoolArena還按PoolChunk的使用量分別維護到相對應的PoolChunkList中。

// abstract class PoolArena<T> implements PoolArenaMetric {
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;

這些PoolChunkList也是按使用量大小有序的鏈式的串在一起(參見PoolArena構造方法中初始化這些list字段的代碼),當使用量達到本級別時,會加入到下一級別的list中,比如達到25%了,那么就會加到50%列表中了。(參見io.netty.buffer.PoolChunkList.add(PoolChunk ))

void add(PoolChunk<T> chunk) {
    if (chunk.usage() >= maxUsage) {
        nextList.add(chunk);
        return;
    }
    add0(chunk);
}

PoolArena中還維護了兩個PoolSubpage數組,每個數組里面的實例在PoolArena構造時初始化,剛初始化后每個PoolSubpage元素的前繼與后繼元素都是指向自己(PoolSubpage是支持鏈表式的一個結構)
在io.netty.buffer.PoolSubpage.addToPool(PoolSubpage )時 會將io.netty.buffer.PoolChunk.allocateSubpage(int)過程中新構建出來的PoolSubpage實例 加到head的next節點上(即后繼節點)。 具體代碼如下:

    private long allocateSubpage(int normCapacity) {
        // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
        // This is need as we may add it back and so alter the linked-list structure.
        PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity); // 這個是查找PoolArena的PoolSubpage數組
        int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
        synchronized (head) {
            int id = allocateNode(d);
            if (id < 0) {
                return id;
            }

            final PoolSubpage<T>[] subpages = this.subpages;
            final int pageSize = this.pageSize;

            freeBytes -= pageSize;

            int subpageIdx = subpageIdx(id);
            PoolSubpage<T> subpage = subpages[subpageIdx];
            if (subpage == null) {
                subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity); // 此處會將新新構建出來的PoolSubpage實例加到head的next節點
                subpages[subpageIdx] = subpage;
            } else {
                subpage.init(head, normCapacity);
            }
            return subpage.allocate();
        }
    }

PoolArenad的cache與Recycler對象池

PooledByteBuf依賴PoolThreadCache做了一層對PoolChunk的緩存,PoolThreadCache靠MemoryRegionCache實現緩存。MemoryRegionCache靠隊列來實現對PoolChunk的緩存(參見下面代碼1),MemoryRegionCache在buf釋放時會調用其add接口將釋放的PoolChunk對象和nioBuffer對象通過io.netty.buffer.PoolThreadCache.MemoryRegionCache.Entry 對象包裝后加入(offer)到隊列(參見下面堆棧1)。在io.netty.buffer.PoolThreadCache.MemoryRegionCache.allocate(PooledByteBuf , int)時再從隊列中直接poll出來,達成cache的目的。優化還沒有結束,包裝PoolChunk用的Entry對象是通過 Recycler對象池完成分配(獲取)已釋放的。對象是本質上一個通過FastThreadLocal的Stack的數據結構,分配對應出棧,釋放對象入棧。具體參見下面代碼2。
Recycler
是一個基於ThreadLocal結合stack玩起來的一個對象池數據結構,像上述這種就是PooledUnsafeDirectByteBuf的對象pool。回收的時候壓棧,要用的時候出棧。
獲取對象 io.netty.util.Recycler.get()
回收對象 io.netty.util.Recycler.DefaultHandle.recycle(Object)

代碼1: 隊列初始化

Queue<Entry<T>> queue = PlatformDependent.newFixedMpscQueue(this.size);

堆棧1:buf釋放時會調用MemoryRegionCache add接口將釋放的PoolChunk對象包裝后入隊:

Thread [main] (Suspended (breakpoint at line 393 in PoolThreadCache$MemoryRegionCache))	
	PoolThreadCache$SubPageMemoryRegionCache<T>(PoolThreadCache$MemoryRegionCache<T>).add(PoolChunk<T>, ByteBuffer, long) line: 393	
	PoolThreadCache.add(PoolArena<?>, PoolChunk, ByteBuffer, long, int, SizeClass) line: 209	
	PoolArena$DirectArena(PoolArena<T>).free(PoolChunk<T>, ByteBuffer, long, int, PoolThreadCache) line: 273	
	PooledUnsafeDirectByteBuf(PooledByteBuf<T>).deallocate() line: 171	
	PooledUnsafeDirectByteBuf(AbstractReferenceCountedByteBuf).release0(int) line: 136	
	PooledUnsafeDirectByteBuf(AbstractReferenceCountedByteBuf).release() line: 124	
	PooledByteBufTest.main(String[]) line: 43	

代碼2:Entry對象使用對象池

private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
    @SuppressWarnings("unchecked")
    @Override
    protected Entry newObject(Handle<Entry> handle) {
        return new Entry(handle);
    }
};

private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle) {
    Entry entry = RECYCLER.get();
    entry.chunk = chunk;
    entry.nioBuffer = nioBuffer;
    entry.handle = handle;
    return entry;
}

@Override
public void recycle(Object object) {
    if (object != value) {
        throw new IllegalArgumentException("object does not belong to handle");
    }

    Stack<?> stack = this.stack;
    if (lastRecycledId != recycleId || stack == null) {
        throw new IllegalStateException("recycled already");
    }

    stack.push(this);
}

PooledByteBufAllocator創建及其關聯細節

  1. PooledByteBufAllocator validateAndCalculateChunkSize 校驗樹高度不能超過14,且根據pageSize(可以外部指定)和樹高計算出chunksize
  2. PooledByteBufAllocator validateAndCalculatePageShifts 校驗pageSize最小不能小於4K,且pageSize必須是2的整數次方((pageSize & pageSize - 1) != 0) (為什么(pageSize & pageSize - 1) != 0能判斷?因為2的n次方的二進制形式一定是第一位1后面接n個0,減1后就變成第一位0后面接n個1,相與之后一定是0;如果不是2的n次方的數的二進制形式一定是第一位是1,且,這個數減去1后,第一位一定還是1,因為第一位是1且后面全接0的數一定是2的整數次方的,那么不是2的整數次方的數后面一定不全是0,所以減去1后第一位肯定還是1,所以不管后面接的這些數相與是怎樣的結果,第一位兩個1相與出來肯定是1,肯定不為0,所以能用這個辦法判斷)
  3. 創建tinySubpagePools數組並初始化里面的元素,默認數組大小32個,里面的元素是PoolSubpage,PoolSubpage還支持鏈式形式連接(他有前繼和后繼)

PoolChunk 分配與釋放小於pagesize的buf

io.netty.buffer.PoolArena.free(PoolChunk , ByteBuffer, long, int, PoolThreadCache)
位圖相關:

// long64位 取 高32位轉成整數
private static int bitmapIdx(long handle) {
        return (int) (handle >>> Integer.SIZE);
    }

PoolSubpage 支持位圖
一個page 8192大小 一個塊(element)大小32,那么一個page可以拆成256個,每申請一次numAvail減去1。
long型位圖數組中有個8個元素,8192/16/64=8, 64是long的位數,。

分配時bitmap中元素,以第一個元素為例子,按1 3 7 15 31 63 127網上漲,釋放的時候按對應數據往下減,並且在釋放時記錄nextAvail值,便於下次申請時優先使用。
bitmap中的4個(bitmapLength)long來維護256個(maxNumElems=pageSize/elemSize)塊是否使用的情況。

final class PoolSubpage<T> implements PoolSubpageMetric {

    final PoolChunk<T> chunk;
    private final int memoryMapIdx;
    private final int runOffset;
    private final int pageSize;
    private final long[] bitmap;  // 位圖...,默認有8個元素 個數= pagesize >>> 10 (pagesize / 16 / 64)64應該是long的位數,16是啥?一個element算256。 實際這個數組默認只用4個元素

    PoolSubpage<T> prev;
    PoolSubpage<T> next;

    boolean doNotDestroy;
    int elemSize;
    private int maxNumElems; // 一個page再分maxNumElems分  默認是256
    private int bitmapLength; // 默認是4  256 >>> 6 = 4
    private int nextAvail; // 在有buf釋放時會設置這個值,以使得他們在下次分配時優先使用這個
    private int numAvail;
    
    long allocate() {
        if (elemSize == 0) {
            return toHandle(0);
        }

        if (numAvail == 0 || !doNotDestroy) {
            return -1;
        }

        final int bitmapIdx = getNextAvail();
        int q = bitmapIdx >>> 6;
        int r = bitmapIdx & 63;
        assert (bitmap[q] >>> r & 1) == 0;
        bitmap[q] |= 1L << r;  // 按1 3 7 15 31 63 127往上漲

        if (-- numAvail == 0) {
            removeFromPool();
        }

        return toHandle(bitmapIdx);
    }
    
    
    private int findNextAvail() {
        final long[] bitmap = this.bitmap;
        final int bitmapLength = this.bitmapLength;
        for (int i = 0; i < bitmapLength; i ++) {
            long bits = bitmap[i];
            if (~bits != 0) { // 這個表示這個long上是否所有的位都用完了。。
                return findNextAvail0(i, bits);
            }
        }
        return -1;
    }
    
    private int findNextAvail0(int i, long bits) {
        final int maxNumElems = this.maxNumElems;
        final int baseVal = i << 6;

        for (int j = 0; j < 64; j ++) {
            if ((bits & 1) == 0) { // 判斷是否是偶數
                int val = baseVal | j;
                if (val < maxNumElems) {
                    return val;
                } else {
                    break;
                }
            }
            bits >>>= 1; // 除以2 並向靠近的2的整數次冪對齊
        }
        return -1;
    }

free時不是每次都會真正釋放,在下面會先加入到MemoryRegionCache的queue中cache起來,當queue中放不下時才真正free,代碼如下:

// PoolArena.class
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
    if (chunk.unpooled) {
        int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();
    } else {
        SizeClass sizeClass = sizeClass(normCapacity);
        if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
            // cached so not free it.
            return;
        }

        freeChunk(chunk, handle, sizeClass, nioBuffer);
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM