Netty源碼解析 -- 內存池與PoolArena


我們知道,Netty使用直接內存實現Netty零拷貝以提升性能,
但直接內存的創建和釋放可能需要涉及系統調用,是比較昂貴的操作,如果每個請求都創建和釋放一個直接內存,那性能肯定是不能滿足要求的。
這時就需要使用內存池。
即從系統中申請一大塊內存,再在上面分配每個請求所需的內存。

Netty中的內存池主要涉及PoolArena,PoolChunk與PoolSubpage。
本文主要分析PoolArena的作用與實現。
源碼分析基於Netty 4.1.52

接口關系
ByteBufAllocator,內存分配器,負責為ByteBuf分配內存, 線程安全。
PooledByteBufAllocator,池化內存分配器,默認的ByteBufAllocator,預先從操作系統中申請一大塊內存,在該內存上分配內存給ByteBuf,可以提高性能和減小內存碎片。
UnPooledByteBufAllocator,非池化內存分配器,每次都從操作系統中申請內存。

RecvByteBufAllocator,接收內存分配器,為Channel讀入的IO數據分配一塊大小合理的buffer空間。具體功能交由內部接口Handle定義。
它主要是針對Channel讀入場景添加一些操作,如guess,incMessagesRead,lastBytesRead等等。
ByteBuf,分配好的內存塊,可以直接使用。

下面只關注PooledByteBufAllocator,它是Netty中默認的內存分配器,也是理解Netty內存機制的難點。

內存分配

前面文章《ChannelPipeline機制與讀寫過程》中分析了數據讀取過程,
NioByteUnsafe#read

public final void read() {
    ...
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;

    ...
    byteBuf = allocHandle.allocate(allocator);
    allocHandle.lastBytesRead(doReadBytes(byteBuf));
    ...
}

recvBufAllocHandle方法返回AdaptiveRecvByteBufAllocator.HandleImpl。(AdaptiveRecvByteBufAllocator,PooledByteBufAllocator都在DefaultChannelConfig中初始化)

AdaptiveRecvByteBufAllocator.HandleImpl#allocate -> AbstractByteBufAllocator#ioBuffer -> PooledByteBufAllocator#directBuffer -> PooledByteBufAllocator#newDirectBuffer

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    // #1
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    final ByteBuf buf;
    if (directArena != null) {
        // #2
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        // #3
        buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    return toLeakAwareBuffer(buf);
}

AbstractByteBufAllocator#ioBuffer方法會判斷當前系統是否支持unsafe。支持時使用直接內存,不支持則使用堆內存。這里只關注直接內存的實現。
#1 從當前線程緩存中獲取對應內存池PoolArena
#2 在當前線程內存池上分配內存
#3 內存池不存在,只能使用非池化內存分配內存了

PooledByteBufAllocator#threadCache是一個PoolThreadLocalCache實例,PoolThreadLocalCache繼承於FastThreadLocal,FastThreadLocal這里簡單理解為對ThreadLocal的優化,它為每個線程維護了一個PoolThreadCache,PoolThreadCache上關聯了內存池。
當PoolThreadLocalCache上某個線程的PoolThreadCache不存在時,通過initialValue方法構造。

PoolThreadLocalCache#initialValue

protected synchronized PoolThreadCache initialValue() {
    // #1
    final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
    final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
    // #2
    final Thread current = Thread.currentThread();
    if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
        final PoolThreadCache cache = new PoolThreadCache(
                heapArena, directArena, smallCacheSize, normalCacheSize,
                DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);

        ...
    }
    // No caching so just use 0 as sizes.
    return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0);
}

#1 從PooledByteBufAllocator的heapArenas,directArenas中獲取使用率最小的PoolArena。
PooledByteBufAllocator構造時默認會為PooledByteBufAllocator#directArenas初始化8個PoolArena。
#2 構造PoolThreadCache。

PoolArena,可以理解為一個內存池,負責管理從操作系統中申請到的內存塊。
PoolThreadCache為每一個線程關聯一個PoolArena(PoolThreadCache#directArena),該線程的內存都在該PoolArena上分配。
Netty支持高並發系統,可能有很多線程進行同時內存分配。為了緩解線程競爭,通過創建多個PoolArena細化鎖的粒度,從而提高並發執行的效率。
注意,一個PoolArena可以會分給多個的線程,可以看到PoolArena上會有一些同步操作。

內存級別

前面分析SizeClasses的文章說過,Netty將內存池中的內存塊按大小划分為3個級別。
不同級別的內存塊管理算法不同。默認划分規則如下:
small <= 28672(28K)
normal <= 16777216(16M)
huge > 16777216(16M)

smallSubpagePools是一個PoolSubpage數組,負責維護small級別的內存塊信息。
PoolChunk負責維護normal級別的內存,PoolChunkList管理一組PoolChunk。
PoolArena按內存使用率將PoolChunk分別維護到6個PoolChunkList中,
PoolArena按內存使用率將PoolChunk分別維護到6個PoolChunkList中,
qInit->內存使用率為0~25,
q000->內存使用率為1~50,
q025->內存使用率為25~75,
q050->內存使用率為50~75,
q075->內存使用率為75~100,
q100->內存使用率為100。
注意:PoolChunk是Netty每次向操作系統申請的內存塊。
PoolSubpage需要從PoolChunk中分配,而Tiny,Small級別的內存則是從PoolSubpage中分配。

下面來看一下分配過程

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    // #1
    final int sizeIdx = size2SizeIdx(reqCapacity);
    // #2
    if (sizeIdx <= smallMaxSizeIdx) {
        tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);
    } else if (sizeIdx < nSizes) {
        // #3
        tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);
    } else {
        // #4
        int normCapacity = directMemoryCacheAlignment > 0
                ? normalizeSize(reqCapacity) : reqCapacity;
        // Huge allocations are never served via the cache so just call allocateHuge
        allocateHuge(buf, normCapacity);
    }
}

#1 size2SizeIdx是父類SizeClasses提供的方法,它使用特定算法,將申請的內存大小調整為規范大小,划分到對應位置,返回對應索引,可參考《內存對齊類SizeClasses》
#2 分配small級別的內存塊
#3 分配normal級別的內存塊
#4 分配huge級別的內存塊

private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,
                                 final int sizeIdx) {
    // #1
    if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) {
        return;
    }

    // #2
    final PoolSubpage<T> head = smallSubpagePools[sizeIdx];
    final boolean needsNormalAllocation;
    synchronized (head) {
        // #3
        final PoolSubpage<T> s = head.next;
        needsNormalAllocation = s == head;
        if (!needsNormalAllocation) {
            assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
            long handle = s.allocate();
            assert handle >= 0;
            s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
        }
    }
    // #4
    if (needsNormalAllocation) {
        synchronized (this) {
            allocateNormal(buf, reqCapacity, sizeIdx, cache);
        }
    }

    incSmallAllocation();
}

#1 首先嘗試在線程緩存上分配。
除了PoolArena,PoolThreadCache#smallSubPageHeapCaches還為每個線程維護了Small級別的內存緩存
#2 使用前面SizeClasses#size2SizeIdx方法計算的索引,獲取對應PoolSubpage
#3 注意,head是一個占位節點,並不存儲數據,s==head表示當前存在可以用的PoolSubpage,因為已經耗盡的PoolSubpage是會從鏈表中移除。
接着從PoolSubpage中分配內存,后面有文章解析詳細過程
注意,這里必要運行在同步機制中。
#4 沒有可用的PoolSubpage,需要申請一個Normal級別的內存塊,再在上面分配所需內存

normal級別的內存也是先嘗試在線程緩存中分配,分配失敗后再調用allocateNormal方法申請
PoolArena#allocate -> allocateNormal

private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
    if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
        q025.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
        q000.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
        qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) ||
        q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) {
        return;
    }

    // Add a new chunk.
    PoolChunk<T> c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
    boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache);
    assert success;
    qInit.add(c);
}

#1 依次從q050,q025,q000,qInit,q075上申請內存
為什么要是這個順序呢?

PoolArena中的PoolChunkList之間也組成一個“雙向”鏈表

qInit ---> q000 <---> q025 <---> q050 <---> q075 <---> q100

PoolChunkList中還維護了minUsage,maxUsage,即當一個PoolChunk使用率大於maxUsage,它將被移動到下一個PoolChunkList,使用率小於minUsage,則被移動到前一個PoolChunkList。
注意:q000沒有前置節點,它的minUsage為1,即上面的PoolChunk內存完全釋放后,將被銷毀。
qInit的前置節點是它自己,但它的minUsage為Integer.MIN_VALUE,即使上面的PoolChunk內存完全釋放后,也不會被銷毀,而是繼續保留在內存。

不優先從q000分配,正是因為q000上的PoolChunk內存完全釋放后要被銷毀,如果在上面分配,則會延遲內存的回收進度。
而q075上由於內存利用率太高,導致內存分配的成功率大大降低,因此放到最后。
所以從q050是一個不錯的選擇,這樣大部分情況下,Chunk的利用率都會保持在一個較高水平,提高整個應用的內存利用率;

在PoolChunkList上申請內存,PoolChunkList會遍歷鏈表上PoolChunk節點,直到分配成功或到達鏈表末尾。
PoolChunk分配后,如果內存使用率高於maxUsage,它將被移動到下一個PoolChunkList。

newChunk方法負責構造一個PoolChunk,這里是內存池向操作系統申請內存。
DirectArena#newChunk

protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxPageIdx,
    int pageShifts, int chunkSize) {
    if (directMemoryCacheAlignment == 0) {
        return new PoolChunk<ByteBuffer>(this,
                allocateDirect(chunkSize), pageSize, pageShifts,
                chunkSize, maxPageIdx, 0);
    }
    final ByteBuffer memory = allocateDirect(chunkSize
            + directMemoryCacheAlignment);
    return new PoolChunk<ByteBuffer>(this, memory, pageSize,
            pageShifts, chunkSize, maxPageIdx,
            offsetCacheLine(memory));
}

allocateDirect方法向操作系統申請內存,獲得一個(jvm)ByteBuffer,
PoolChunk#memory維護了該ByteBuffer,PoolChunk的內存實際上都是在該ByteBuffer上分配。

最后是huge級別的內存申請

private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
    PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
    activeBytesHuge.add(chunk.chunkSize());
    buf.initUnpooled(chunk, reqCapacity);
    allocationsHuge.increment();
}

比較簡單,沒有使用內存池,直接向操作系統申請內存。

內存釋放

void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
    if (chunk.unpooled) {
        // #1
        int size = chunk.chunkSize();
        destroyChunk(chunk);
        activeBytesHuge.add(-size);
        deallocationsHuge.increment();
    } else {
        // #2
        SizeClass sizeClass = sizeClass(handle);
        if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
            // cached so not free it.
            return;
        }

        freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false);
    }
}

#1 非池化內存,直接銷毀內存
#2 池化內存,首先嘗試加到線程緩存中,成功則不需要其他操作。失敗則調用freeChunk

void freeChunk(PoolChunk<T> chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer,
               boolean finalizer) {
    final boolean destroyChunk;
    synchronized (this) {
        ...
        destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer);
    }
    if (destroyChunk) {
        // destroyChunk not need to be called while holding the synchronized lock.
        destroyChunk(chunk);
    }
}

chunk.parent即PoolChunkList,PoolChunkList#free會調用PoolChunk釋放內存,釋放內存后,如果內存使用率低於minUsage,則移動前一個PoolChunkList,如果前一個PoolChunkList不存在(q000),則返回false,由后面的步驟銷毀該PoolChunk。
可回顧前面解析ByteBuf文章中關於內存銷毀的內容。

如果您覺得本文不錯,歡迎關注我的微信公眾號,系列文章持續更新中。您的關注是我堅持的動力!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM