Netty源碼分析第五章: ByteBuf
第四節: PooledByteBufAllocator簡述
上一小節簡單介紹了ByteBufAllocator以及其子類UnPooledByteBufAllocator的緩沖區分類的邏輯, 這一小節開始帶大家剖析更為復雜的PooledByteBufAllocator, 我們知道PooledByteBufAllocator是通過自己取一塊連續的內存進行ByteBuf的封裝, 所以這里更為復雜, 在這一小節簡單講解有關PooledByteBufAllocator分配邏輯
友情提示: 從這一節開始難度開始加大, 請各位戰友做好心理准備
PooledByteBufAllocator同樣也重寫了AbstractByteBuf的newDirectBuffer和newHeapBuffer兩個抽象方法, 我們這一小節以newDirectBuffer為例, 先簡述一下其邏輯
首先看UnPooledByteBufAllocator中newDirectBuffer這個方法:
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } return toLeakAwareBuffer(buf); }
首先 PoolThreadCache cache = threadCache.get() 這一步是拿到一個線程局部緩存對象, 線程局部緩存, 顧明思議, 就是同一個線程共享的一個緩存
threadCache是PooledByteBufAllocator類的一個成員變量, 類型是PoolThreadLocalCache(這兩個非常容易混淆, 切記):
private final PoolThreadLocalCache threadCache;
再看其類型PoolThreadLocalCache的定義:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代碼省略
}
這里繼承了一個FastThreadLocal類, 這個類相當於jdk的ThreadLocal, 只是性能更快, 有關FastThreadLocal, 我們在后面的章節會詳細剖析, 這里我們只要知道, 繼承FastThreadLocal類並且重寫了initialValue方法, 則通過其get方法就能獲得initialValue返回的對象, 並且這個對象是線程共享的
在這里我們看到, 在重寫的initialValue方法中, 初始化了heapArena和directArena兩個屬性之后, 通過new PoolThreadCache()這種方式創建了PoolThreadCache對象
這里注意, PoolThreadLocalCache是一個FastThreadLocal, 而PoolThreadCache才是線程局部緩存, 這兩個類名非常非常像, 千萬別搞混了(我當初讀這段代碼時因為搞混所以懵逼了)
其中heapArena和directArena是分別是用來分配堆和堆外內存用的兩個對象, 以directArena為例, 我們看到是通過leastUsedArena(directArenas)這種方式獲得的, directArenas是一個directArena類型的數組, leastUsedArena(directArenas)這個方法是用來獲取數組中一個使用最少的directArena對象
directArenas是PooledByteBufAllocator的成員變量, 是在其構造方法中初始化的:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { //代碼省略
if (nDirectArena > 0) { directArenas = newArenaArray(nDirectArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length); for (int i = 0; i < directArenas.length; i ++) { PoolArena.DirectArena arena = new PoolArena.DirectArena( this, pageSize, maxOrder, pageShifts, chunkSize); directArenas[i] = arena; metrics.add(arena); } directArenaMetrics = Collections.unmodifiableList(metrics); } else { directArenas = null; directArenaMetrics = Collections.emptyList(); } }
我們看到這里通過directArenas = newArenaArray(nDirectArena)初始化了directArenas, 其中nDirectArena, 默認是cpu核心數的2倍, 這點我們可以跟蹤構造方法的調用鏈可以分析到
這樣保證了每一個線程會有一個獨享的arena
我們看newArenaArray(nDirectArena)這個方法:
private static <T> PoolArena<T>[] newArenaArray(int size) { return new PoolArena[size]; }
這里只是創建了一個數組, 默認長度為nDirectArena
繼續跟PooledByteBufAllocator的構造方法, 創建完了數組, 后面在for循環中為數組賦值:
首先通過new PoolArena.DirectArena創建一個DirectArena實例, 然后再為新創建的directArenas數組賦值
再回到PoolThreadLocalCache的構造方法中:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代碼省略
}
方法最后, 創建PoolThreadCache的一個對象, 我們跟進構造方法中:
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { //代碼省略 //保存成兩個成員變量
this.heapArena = heapArena; this.directArena = directArena; //代碼省略
}
這里省略了大段代碼, 只需要關注這里將兩個值保存在PoolThreadCache的成員變量中
我們回到newDirectBuffer中:
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } } return toLeakAwareBuffer(buf); }
簡單分析的線程局部緩存初始化相關邏輯, 我們再往下跟:
PoolArena<ByteBuffer> directArena = cache.directArena;
通過上面的分析, 這步我們應該不陌生, 在PoolThreadCache構造方法中將directArena和heapArena中保存在成員變量中, 這樣就可以直接通過cache.directArena這種方式拿到其成員變量的內容
從以上邏輯, 我們可以大概的分析一下流程, 通常會創建和線程數量相等的arena, 並以數組的形式存儲在PooledByteBufAllocator的成員變量中, 每一個PoolThreadCache創建的時候, 都會在當前線程拿到一個arena, 並保存在自身的成員變量中
5-4-1
PoolThreadCache除了維護了一個arena之外, 還維護了一個緩存列表, 我們在重復分配ByteBuf的時候, 並不需要每次都通過arena進行分配, 可以直接從緩存列表中拿一個ByteBuf
有關緩存列表, 我們循序漸進的往下看:
在PooledByteBufAllocator中維護了三個值:
1. tinyCacheSize
2. smallCacheSize
3. normalCacheSize
tinyCacheSize代表tiny類型的ByteBuf能緩存多少個
smallCacheSize代表small類型的ByteBuf能緩存多少個
normalCacheSize代表normal類型的ByteBuf能緩存多少個
具體tiny類型, small類型, normal是什么意思, 我們會在后面講解
我們回到PoolThreadLocalCache類中看其構造方法:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代碼省略
}
我們看到這三個屬性是在PoolThreadCache的構造方法中傳入的
這三個屬性是通過PooledByteBufAllocator的構造方法中初始化的, 跟隨構造方法的調用鏈會走到這個構造方法:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE); }
這里仍然調用了一個重載的構造方法, 這里我們關注這幾個參數:
DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE
這里對應着幾個靜態的成員變量:
private static final int DEFAULT_TINY_CACHE_SIZE; private static final int DEFAULT_SMALL_CACHE_SIZE; private static final int DEFAULT_NORMAL_CACHE_SIZE;
我們在static塊中看其初始化過程:
static{ //代碼省略
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512); DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256); DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64); //代碼省略
}
在這里我們看到, 這三個屬性分別初始化的大小是512, 256, 64, 這三個屬性就對應了PooledByteBufAllocator另外的幾個成員變量, tinyCacheSize, smallCacheSize, normalCacheSize
也就是說, tiny類型的ByteBuf在每個緩存中默認緩存的數量是512個, small類型的ByteBuf在每個緩存中默認緩存的數量是256個, normal類型的ByteBuf在每個緩存中默認緩存的數量是64個
我們再到PooledByteBufAllocator中重載的構造方法中:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { super(preferDirect); threadCache = new PoolThreadLocalCache(); this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; //代碼省略
}
篇幅原因, 這里也省略了大段代碼, 大家可以通過構造方法參數找到源碼中相對的位置進行閱讀
我們關注這段代碼:
this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize;
在這里將將參數的DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE的三個值保存到了成員變量tinyCacheSize, smallCacheSize, normalCacheSize
PooledByteBufAllocator中將這三個成員變量初始化之后, 在PoolThreadLocalCache的initialValue方法中就可以使用這三個成員變量的值了
我們再次跟到initialValue方法中:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> { @Override protected synchronized PoolThreadCache initialValue() { final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } //代碼省略
}
這里就可以在創建PoolThreadCache對象的的構造方法中傳入tinyCacheSize, smallCacheSize, normalCacheSize這三個成員變量了
我們再跟到PoolThreadCache的構造方法中:
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { //代碼省略
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; this.heapArena = heapArena; this.directArena = directArena; if (directArena != null) { tinySubPageDirectCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); directArena.numThreadCaches.getAndIncrement(); } else { //代碼省略
} //代碼省略
ThreadDeathWatcher.watch(thread, freeTask); }
其中tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches就代表了三種類型的緩存數組, 數組元素是MemoryRegionCache類型的對象, MemoryRegionCache就代表一個ByeBuf的緩存
以tinySubPageDirectCaches為例, 我們看到tiny類型的緩存是通過createSubPageCaches這個方法創建的
這里傳入了三個參數tinyCacheSize我們之前分析過是512, PoolArena.numTinySubpagePools這里是32(這里不同類型的緩存大小不一樣, small類型是4, normal類型是3) , SizeClass.Tiny代表其類型是tiny類型
我們跟到createSubPageCaches這個方法中:
private static <T> MemoryRegionCache<T>[] createSubPageCaches( int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0) { //創建數組, 長度為32
@SuppressWarnings("unchecked") MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches]; for (int i = 0; i < cache.length; i++) { //每一個節點是ubPageMemoryRegionCache對象
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); } return cache; } else { return null; } }
這里首先創建了MemoryRegionCache, 長度是我們剛才分析過的32
然后通過for循環, 為數組賦值, 賦值的對象是SubPageMemoryRegionCache類型的, SubPageMemoryRegionCache就是MemoryRegionCache類型的子類, 同樣也是一個緩存對象, 構造方法中, cacheSize, 就是其中緩存對象的數量, 如果是tiny類型就是512, sizeClass, 代表其類型, 比如tiny, small或者normal
再簡單跟到其構造方法:
SubPageMemoryRegionCache(int size, SizeClass sizeClass) { super(size, sizeClass); }
這里調用了父類的構造方法, 我們繼續跟進去:
MemoryRegionCache(int size, SizeClass sizeClass) { //size會進行規格化
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); //隊列大小
queue = PlatformDependent.newFixedMpscQueue(this.size); this.sizeClass = sizeClass; }
首先會對其進行規格化, 其實就是查找大於等於當前size的2的冪次方的數, 這里如果是512那么規格化之后還是512, 然后初始化一個隊列, 隊列大小就是傳入的大小, 如果是tiny, 這里大小就是512
最后並保存其類型
這里我們不難看出, 其實每個緩存的對象, 里面是通過一個隊列保存的, 有關緩存隊列和ByteBuf之間的邏輯, 后面的小節會進行剖析
從上面剖析我們不難看出, PoolThreadCache中維護了三種類型的緩存數組, 每個緩存數組中的每個值中, 又通過一個隊列進行對象的存儲
5-4-2
當然這里只舉了Direct類型的對象關系, heap類型其實都是一樣的, 這里不再贅述
這一小節邏輯較為復雜, 同學們可以自己在源碼中跟蹤一遍加深印象