ByteBufAllocator 內存管理器:
Netty 中內存分配有一個最頂層的抽象就是ByteBufAllocator,負責分配所有ByteBuf 類型的內存。功能其實不是很多,主要有以下幾個重要的API:
public interface ByteBufAllocator {/**分配一塊內存,自動判斷是否分配堆內內存或者堆外內存。
* Allocate a {@link ByteBuf}. If it is a direct or heap buffer depends on the actual implementation.
*/
ByteBuf buffer();/**盡可能地分配一塊堆外直接內存,如果系統不支持則分配堆內內存。
* Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O.
*/
ByteBuf ioBuffer();/**分配一塊堆內內存。
* Allocate a heap {@link ByteBuf}.
*/
ByteBuf heapBuffer();/**分配一塊堆外內存。
* Allocate a direct {@link ByteBuf}.
*/
ByteBuf directBuffer();/**組合分配,把多個ByteBuf 組合到一起變成一個整體。
* Allocate a {@link CompositeByteBuf}.If it is a direct or heap buffer depends on the actual implementation.
*/
CompositeByteBuf compositeBuffer();
}
到這里有些小伙伴可能會有疑問,以上API 中為什么沒有前面提到的8 中類型的內存分配API?下面我們來看ByteBufAllocator 的基本實現類AbstractByteBufAllocator,重點分析主要API 的基本實現,比如buffer()方法源碼如下:
public abstract class AbstractByteBufAllocator implements ByteBufAllocator { @Override public ByteBuf buffer() {
//判斷是否默認支持directBuffer if (directByDefault) { return directBuffer(); } return heapBuffer(); } }
我們發現buffer()方法中做了判斷,是否默認支持directBuffer,如果支持則分配directBuffer,否則分配heapBuffer。directBuffer()方法和heapBuffer()方法的實現邏輯幾乎一致,來看directBuffer()方法:
@Override public ByteBuf directBuffer() {
//分配大小,初始大小256 默認最大capacity為Integer.MAX return directBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);
}
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}//校驗初始化大小和最大大小
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}
/**
* Create a direct {@link ByteBuf} with the given initialCapacity and maxCapacity.
*/
protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);
directBuffer()方法有多個重載方法,最終會調用newDirectBuffer()方法,我們發現newDirectBuffer()方法其實是一個抽象方法,最終,交給AbstractByteBufAllocator 的子類來實現。同理,我們發現heapBuffer()方法最終是調用newHeapBuffer()方法,而newHeapBuffer()方法也是抽象方法,具體交給AbstractByteBufAllocator 的子類實現。AbstractByteBufAllocator 的子類主要有兩個:PooledByteBufAllocator 和UnpooledByteBufAllocator,下面我們來看AbstractByteBufAllocator 子類實現的類結構圖:
到這里,其實我們還只知道directBuffer、heapBuffer 和pooled、unpooled 的分配規則,那unsafe 和非unsafe是如何判別的呢?其實,是Netty 自動幫我們判別的,如果操作系統底層支持unsafe 那就采用unsafe 讀寫,否則采用非unsafe 讀寫。我們可以從UnpooledByteBufAllocator 的源碼中驗證一下,來看源碼:
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity)
: new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
}
我們發現在newHeapBuffer()方法和newDirectBuffer()方法中,分配內存判斷PlatformDependent 是否支持Unsafa,如果支持則創建Unsafe 類型的Buffer,否則創建非Unsafe 類型的Buffer。由Netty 幫我們自動判斷了。
Unpooled 非池化內存分配:
堆內內存的分配:
現在我們來看UnpooledByteBufAllocator 的內存分配原理。首先,來看heapBuffer 的分配邏輯,進入newHeapBuffer()方法源碼:
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
//是否unsafe是由jdk底層去實現的,如果能夠獲取到unsafe對象,就使用unsafe
return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity)
: new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
}
通過調用PlatformDependent.hasUnsafe() 方法來判斷操作系統是否支持Unsafe , 如果支持Unsafe 則創建UnpooledUnsafeHeapByteBuf 類,否則創建UnpooledHeapByteBuf 類。我們先進入UnpooledUnsafeHeapByteBuf的構造器看看會進行哪些操作?
final class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf {
.... UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity);//父類構造器 如下 }.... }
public UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);//父類構造器 如下
}
public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);//父類構造器 如下
checkNotNull(alloc, "alloc");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
//把默認分配的數組new byte[initialCapacity]賦值給全局變量array。
setArray(allocateArray(initialCapacity));
setIndex(0, 0);
}
protected AbstractReferenceCountedByteBuf(int maxCapacity) {
super(maxCapacity);//父類構造器 如下
}
protected AbstractByteBuf(int maxCapacity) {
checkPositiveOrZero(maxCapacity, "maxCapacity");
this.maxCapacity = maxCapacity;
}
有一段關鍵方法就是setArray()方法,里面的實現也非常簡單,就是把默認分配的數組new byte[initialCapacity]賦值給全局變量array。緊接着就是調用了setIndex()方法,最終在setIndex0()方法中初始化readerIndex 和writerIndex:
@Override public ByteBuf setIndex(int readerIndex, int writerIndex) { if (checkBounds) {//校驗3者大小關系 checkIndexBounds(readerIndex, writerIndex, capacity()); } setIndex0(readerIndex, writerIndex); return this; }
//AbstractByteBuf 設置讀寫指針位置 final void setIndex0(int readerIndex, int writerIndex) { this.readerIndex = readerIndex; this.writerIndex = writerIndex; }
既然,UnpooledUnsafeHeapByteBuf 和UnpooledHeapByteBuf 調用的都是UnpooledHeapByteBuf 的構造方法,那么它們之間到底有什么區別呢?其實根本區別在於IO 的讀寫,我們可以分別來看一下它們的getByte()方法了解二者的區別。先來看UnpooledHeapByteBuf 的getByte()方法實現:
@Override
public byte getByte(int index) {
ensureAccessible();
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
return HeapByteBufUtil.getByte(array, index);
}
final class HeapByteBufUtil {
//直接用數組索引取值
static byte getByte(byte[] memory, int index) {
return memory[index];
}
}
再來看UnpooledUnsafeHeapByteBuf 的getByte()方法實現:
@Override
public byte getByte(int index) {
checkIndex(index);
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(array, index);
}//調用底層Unsafe去進行IO操作數據
static byte getByte(byte[] array, int index) {
return PlatformDependent.getByte(array, index);
}
通過這樣一對比我們基本已經了解UnpooledUnsafeHeapByteBuf 和UnpooledHeapByteBuf 的區別了。
堆外內存的分配:
還是回到UnpooledByteBufAllocator 的newDirectBuffer()方法:
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
跟分配堆內內存一樣,如果支持Unsafe 則調用UnsafeByteBufUtil.newUnsafeDirectByteBuf() 方法, 否則創建UnpooledDirectByteBuf 類。我們先看一下UnpooledDirectByteBuf 的構造器:
protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity);//父類構造器 如下 // check paramthis.alloc = alloc;
//調用了ByteBuffer.allocateDirect.allocateDirect()通過JDK 底層分配一個直接緩沖區,主要就是做了一次賦值。 setByteBuffer(ByteBuffer.allocateDirect(initialCapacity)); }
private void setByteBuffer(ByteBuffer buffer) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
this.buffer = buffer;
tmpNioBuf = null;
//返回剩余的可用長度,此長度為實際讀取的數據長度
capacity = buffer.remaining();
}
protected AbstractReferenceCountedByteBuf(int maxCapacity) {
super(maxCapacity);//父類構造器 如下
}
protected AbstractByteBuf(int maxCapacity) {
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity + " (expected: >= 0)");
}
this.maxCapacity = maxCapacity;
}
下面我們繼續來UnsafeByteBufUtil.newUnsafeDirectByteBuf()方法,看它的邏輯:
static UnpooledUnsafeDirectByteBuf newUnsafeDirectByteBuf(
ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
if (PlatformDependent.useDirectBufferNoCleaner()) {
return new UnpooledUnsafeNoCleanerDirectByteBuf(alloc, initialCapacity, maxCapacity);
}
return new UnpooledUnsafeDirectByteBuf(alloc, initialCapacity, maxCapacity);
}
protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);//父類構造器 如下
//check param
this.alloc = alloc;
setByteBuffer(allocateDirect(initialCapacity), false);
}
protected AbstractReferenceCountedByteBuf(int maxCapacity) {
super(maxCapacity);//父類構造器 如下
}
protected AbstractByteBuf(int maxCapacity) {
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity + " (expected: >= 0)");
}//設置Buf最大大小
this.maxCapacity = maxCapacity;
}
它的邏輯和UnpooledDirectByteBuf 構造器的邏輯是相似的, 所以我們關注setByteBuffer() 方法:
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) { if (tryFree) { ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { if (doNotFree) { doNotFree = false; } else { freeDirect(oldBuffer); } } } this.buffer = buffer; memoryAddress = PlatformDependent.directBufferAddress(buffer); tmpNioBuf = null;
//返回剩余的可用長度,此長度為實際讀取的數據長度 capacity = buffer.remaining(); }
同樣還是先把創建從JDK 底層創建好的buffer 保存, 接下來有個很重要的操作就是調用了PlatformDependent.directBufferAddress()方法獲取的buffer 真實的內存地址,並保存到memoryAddress 變量中。我們進入PlatformDependent.directBufferAddress()方法一探究竟。
public static long directBufferAddress(ByteBuffer buffer) {
return PlatformDependent0.directBufferAddress(buffer);
}
static long directBufferAddress(ByteBuffer buffer) {
return getLong(buffer, ADDRESS_FIELD_OFFSET);
}
private static long getLong(Object object, long fieldOffset) {
return UNSAFE.getLong(object, fieldOffset);
}
可以看到,調用了UNSAFE 的getLong()方法,這個方法是一個native 方法。它是直接通過buffer 的內存地址加上一個偏移量去取數據,。到這里我們已經基本清楚UnpooledUnsafeDirectByteBuf 和UnpooledDirectByteBuf 的區別,非unsafe 是通過數組的下標取數據,而unsafe 是直接操作內存地址,相對於非unsafe 來說效率當然要更高。
Pooled 池化內存分配:
現在開始, 我們來分析Pooled 池化內存的分配原理。我們首先還是找到AbstractByteBufAllocator 的子類PooledByteBufAllocator 實現的分配內存的兩個方法newDirectBuffer()方法和newHeapBuffer()方法,我們以newDirectBuffer()方法為例看看:
public class PooledByteBufAllocator extends AbstractByteBufAllocator {
......
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
if (PlatformDependent.hasUnsafe()) {
buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}......
}
首先,我們看到它是通過threadCache.get()拿到一個類型為PoolThreadCache 的cache 對象,然后,通過cache 拿到directArena 對象,最終會調用directArena.allocate()方法分配ByteBuf。這個地方大家可能會看得有點懵,我們來詳細分析一下。我們跟進到threadCache 對象其實是PoolThreadLocalCache 類型的變量, 跟進到PoolThreadLocalCache 的源碼:
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}.......
}
這里看到的 PoolArena,netty總的內存池是一個數組,數組每一個成員是一個獨立的內存池。相當於一個國家(netty)有多個省(poolArena)分別自治管理不同的地區。
從名字來看,我們發現PoolThreadLocalCache 的initialValue()方法就是用來初始化PoolThreadLocalCache 的。首先就調用了leastUsedArena()方法分別獲得類型為PoolArena 的heapArena 和directArena 對象。然后把heapArena 和directArena 對象作為參數傳遞到了PoolThreadCache 的構造器中。那么heapArena 和directArena 對象是在哪里初始化的呢?我們查找一下發現在PooledByteBufAllocator 的構造方法中調用newArenaArray()方法給heapArenas 和directArenas 賦值了。
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
......if (nHeapArena > 0) {
heapArenas = newArenaArray(nHeapArena);
......
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
if (nDirectArena > 0) {
directArenas = newArenaArray(nDirectArena);
......
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
}
private static <T> PoolArena<T>[] newArenaArray(int size) {
return new PoolArena[size];
}
其實就是創建了一個固定大小的PoolArena 數組,數組大小由傳入的參數nHeapArena 和nDirectArena 來決定。我們再回到PooledByteBufAllocator 的構造器源碼, 看nHeapArena 和nDirectArena 是怎么初始化的, 我們找到PooledByteBufAllocator 的重載構造器:
public PooledByteBufAllocator(boolean preferDirect) {
//調用重載構造器,如下 this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER); }
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
}
我們發現,nHeapArena 和nDirectArena 是通過DEFAULT_NUM_HEAP_ARENA 和DEFAULT_NUM_DIRECT_ARENA這兩個常量默認賦值的。再繼續跟進常量的定義(在靜態代碼塊里):
final int defaultMinNumArena = runtime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numHeapArenas", (int) Math.min( defaultMinNumArena, runtime.maxMemory() / defaultChunkSize / 2 / 3))); DEFAULT_NUM_DIRECT_ARENA = Math.max(0, SystemPropertyUtil.getInt( "io.netty.allocator.numDirectArenas", (int) Math.min( defaultMinNumArena, PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
到這里為止, 我們才知道nHeapArena 和nDirectArena 的默認賦值。默認是分配CPU 核數*2 , 也就是把defaultMinNumArena 的值賦值給nHeapArena 和nDirectArena。對於CPU 核數*2 大家應該有印象,EventLoopGroup 給分配線程時默認線程數也是CPU 核數*2,。那么,Netty 為什么要這樣設計呢?其實,主要目的就是保證Netty 中的每一個任務線程都可以有一個獨享的Arena,保證在每個線程分配內存的時候是不用加鎖的。
基於上面的分析,我們知道Arena 有heapArean 和DirectArena,這里我們統稱為Arena。假設我們有四個線程,那么對應會分配四個Arean。在創建BtyeBuf 的時候,首先通過PoolThreadCache 獲取Arena 對象並賦值給其成員變量,然后,每個線程通過PoolThreadCache 調用get 方法的時候會拿到它底層的Arena,也就是說EventLoop1 拿到Arena1,EventLoop2 拿到Arena2,以此類推,如下圖所示:
那么PoolThreadCache 除了可以旨在Arena 上進行內存分配,還可以在它底層維護的ByteBuf 緩存列表進行分配。舉個例子:我們通過PooledByteBufAllocator 去創建了一個1024 字節大小的ByteBuf,當我們用完釋放之后,我們可能在其他地方會繼續分配1024 字節大小的ByteBuf。這個時候,其實不需要在Arena 上進行內存分配,而是直接通過PoolThreadCache 中維護的ByteBuf 的緩存列表直接拿過來返回。那么,在PooledByteBufAllocator 中維護三種規格大小的緩存列表,分別是三個值tinyCacheSize、smallCacheSize、normalCacheSize:
public class PooledByteBufAllocator extends AbstractByteBufAllocator { private final int tinyCacheSize; private final int smallCacheSize; private final int normalCacheSize;
static{
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
}
public PooledByteBufAllocator(boolean preferDirect) { this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER); } public PooledByteBufAllocator(int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { this(false, nHeapArena, nDirectArena, pageSize, maxOrder); } public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE); } public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { super(preferDirect); threadCache = new PoolThreadLocalCache(); this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder); ...... } }
我們看到, 在PooledByteBufAllocator 的構造器中, 分別賦值了tinyCacheSize=512 , smallCacheSize=256 ,normalCacheSize=64。通過這樣一種方式,Netty 中給我們預創建固定規格的內存池,大大提高了內存分配的性能。
DirectArena 內存分配流程:
Arena 分配內存的基本流程有三個步驟:
- 從對象池里拿到PooledByteBuf 進行復用;
- 從緩存中進行內存分配;
- 從內存堆里進行內存分配。
我們以directBuff 為例, 首先來看從對象池里拿到PooledByteBuf 進行復用的情況, 我們依舊跟進到PooledByteBufAllocator 的newDirectBuffer()方法:
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
.......
}
return toLeakAwareBuffer(buf);
}......
上面的PoolArena 我們已經清楚,現在我們直接跟進到PoolArena 的allocate()方法:
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}
在這個地方其實就非常清晰了,首先調用了newByteBuf()方法去拿到一個PooledByteBuf 對象,接下來通過allocate()方法在線程私有的PoolThreadCache 中分配一塊內存,然后buf 里面的內存地址之類的值進行初始化。我們可以跟進到newByteBuf()看一下,選擇DirectArena 對象:
@Override protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) { if (HAS_UNSAFE) { return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); } else { return PooledDirectByteBuf.newInstance(maxCapacity); }
}
我們發現首先判斷是否支持UnSafe , 默認情況下一般是支持Unsafe 的, 所以我們繼續進入到PooledUnsafeDirectByteBuf 的newInstance()方法:
final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
@Override
protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
return new PooledUnsafeDirectByteBuf(handle, 0);
}
};
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}.......
}
顧名思義,我看到首先就是從RECYCLER(也就是內存回收站)對象的get()方法拿到一個buf。從上面的代碼片段來看,RECYCLER 對象實現了一個newObject()方法,當回收站里面沒有可用的buf 時就會創建一個新的buf。因為獲取到的buf 可能是回收站里面拿出來的,復用前需要重置。因此,繼續往下看就會調用buf 的reuse()方法:
final void reuse(int maxCapacity) {
maxCapacity(maxCapacity);
setRefCnt(1);
setIndex0(0, 0);
discardMarks();
}
我們發現reuse()就是對所有的參數重新歸為初始狀態。到這里我們應該已經清楚從內存池獲取buf 對象的全過程。那么接下來,再回到PoolArena 的allocate()方法,看看真實的內存是如何分配出來的?buf 的內存分配主要有兩種情況,分別是:從緩存中進行內存分配和從內存堆里進行內存分配。我們來看代碼,進入allocate()方法具體邏輯:
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { ... if (normCapacity <= chunkSize) { if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; }
allocateNormal(buf, reqCapacity, normCapacity); } else { // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, reqCapacity); } }
這段代碼邏輯看上去非常復雜,其實我們省略掉的邏輯基本上都是判斷不同規格大小,從其對應的緩存中獲取內存。如果所有規格都不滿足,那就直接調用allocateHuge()方法進行真實的內存分配。