SubPage 級別的內存分配:
通過之前的學習我們知道, 如果我們分配一個緩沖區大小遠小於page, 則直接在一個page 上進行分配則會造成內存浪費, 所以需要將page 繼續進行切分成多個子塊進行分配, 子塊分配的個數根據你要分配的緩沖區大小而定, 比如只需要分配1KB 的內存, 就會將一個page 分成8 等分。簡單起見, 我們這里僅僅以16 字節為例, 跟蹤其分配邏輯。在分析其邏輯前, 首先看PoolArean 的一個屬性:
private final PoolSubpage<T>[] tinySubpagePools;
這個屬性是一個PoolSubpage 的數組, 有點類似於一個subpage 的緩存, 我們創建一個subpage 之后, 會將創建的subpage 與該屬性其中每個關聯, 下次在分配的時候可以直接通過該屬性的元素去找關聯的subpage。我們其中是在構造方法中初始化的, 看構造方法中其初始化代碼:
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools); for (int i = 0; i < tinySubpagePools.length; i ++) { tinySubpagePools[i] = newSubpagePoolHead(pageSize); }
這里為numTinySubpagePools 為32,跟到newSubpagePoolArray(numTinySubpagePools)方法中:
private PoolSubpage<T>[] newSubpagePoolArray(int size) { return new PoolSubpage[size]; }
這里直接創建了一個PoolSubpage 數組, 長度為32,在構造方法中創建完畢之后, 會通過循環為其賦值。繼續跟到newSubpagePoolHead()方法中:
private PoolSubpage<T> newSubpagePoolHead(int pageSize) { PoolSubpage<T> head = new PoolSubpage<T>(pageSize); head.prev = head; head.next = head; return head; }
在newSubpagePoolHead()方法中創建了一個PoolSubpage 對象head。這種寫法我們知道Subpage 其實也是個雙向鏈表, 這里的將head 的上一個節點和下一個節點都設置為自身, 有關PoolSubpage 的關聯關系, 我們稍后分析。這樣通過循環創建PoolSubpage, 總共會創建出32 個subpage, 其中每個subpage 實際代表一塊內存大小:
tinySubPagePools 的結構就有點類似緩存數組tinySubPageDirectCaches 的結構。了解了tinySubpagePools屬性, 我們看PoolArean 的allocate 方法, 也就是緩沖區的入口方法:
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { //規格化 reqCapacity=256 final int normCapacity = normalizeCapacity(reqCapacity); if (isTinyOrSmall(normCapacity)) { // capacity < pageSize int tableIdx; PoolSubpage<T>[] table; //判斷是不是tiny boolean tiny = isTiny(normCapacity); if (tiny) { // < 512//緩存分配 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; }//通過tinyIdx 拿到tableIdx tableIdx = tinyIdx(normCapacity); //subpage 的數組 table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } //拿到對應的節點 final PoolSubpage<T> head = table[tableIdx]; synchronized (head) { final PoolSubpage<T> s = head.next; //默認情況下, head 的next 也是自身 if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, handle, reqCapacity); if (tiny) { allocationsTiny.increment(); } else { allocationsSmall.increment(); } return; } } allocateNormal(buf, reqCapacity, normCapacity); return; } if (normCapacity <= chunkSize) { //首先在緩存上進行內存分配 if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; }//分配不成功, 做實際的內存分配 allocateNormal(buf, reqCapacity, normCapacity); } else {//大於這個值, 就不在緩存上分配 // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, reqCapacity); } }
之前我們最這個方法剖析過在page 級別相關內存分配邏輯, 先在我們來看subpage 級別分配的相關邏輯。假設我們分配16 字節的緩沖區, isTinyOrSmall(normCapacity)就會返回true, 進入if 塊,同樣if (tiny)這里會返回true, 繼續跟到if (tiny)中的邏輯。首先會在緩存中分配緩沖區, 如果分配不到, 就開辟一塊內存進行內存分配,先看這一步:
tableIdx = tinyIdx(normCapacity);
這里通過normCapacity 拿到tableIdx, 我們跟進去:
static int tinyIdx(int normCapacity) { return normCapacity >>> 4; }
這里將normCapacity 除以16, 其實也就是1。我們回到PoolArena 的allocate()方法繼續看:
table = tinySubpagePools;
這里將tinySubpagePools 賦值到局部變量table 中, 繼續往下看:final PoolSubpage<T> head = table[tableIdx] 這步時通過下標拿到一個PoolSubpage, 因為我們以16 字節為例, 所以我們拿到下標為1 的PoolSubpage, 對應的內存大小也就是16Byte。再看final PoolSubpage<T> s = head.next 這一步, 跟我們剛才了解的的tinySubpagePools 屬性, 默認情況下head.next 也是自身, 所以if (s != head)會返回false,我們繼續往下看,會走到allocateNormal(buf, reqCapacity, normCapacity)這個方法:
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { //首先在原來的chunk 上進行內存分配(1) if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //創建chunk 進行內存分配(2) // Add a new chunk. PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle > 0; //初始化byteBuf(3) c.initBuf(buf, handle, reqCapacity); qInit.add(c); }
這里的邏輯我們之前的已經剖析過, 首先在原來的chunk 中分配, 如果分配不成功, 則會創建chunk 進行分配。我們看這一步long handle = c.allocate(normCapacity) ,跟到allocate(normCapacity)方法中:
long allocate(int normCapacity) { if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize return allocateRun(normCapacity); } else { return allocateSubpage(normCapacity); } }
我們分析page 級別分配的時候, 剖析的是allocateRun(normCapacity)方法。因為這里我們是以16 字節舉例,所以這次我們剖析allocateSubpage(normCapacity)方法, 也就是在subpage 級別進行內存分配。
private long allocateSubpage(int normCapacity) { PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity); synchronized (head) { int d = maxOrder; //表示在第11 層分配節點 int id = allocateNode(d); if (id < 0) { return id; } //獲取初始化的subpage final PoolSubpage<T>[] subpages = this.subpages; final int pageSize = this.pageSize; freeBytes -= pageSize; //表示第幾個subpageIdx int subpageIdx = subpageIdx(id); PoolSubpage<T> subpage = subpages[subpageIdx]; if (subpage == null) { //如果subpage 為空 subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity); //則將當前的下標賦值為subpage subpages[subpageIdx] = subpage; } else { subpage.init(head, normCapacity); }
//取出一個子page return subpage.allocate(); } }
首先, 通過PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity) 這種方式找到head 節點, 實際上這里head, 就是我們剛才分析的tinySubpagePools 屬性的第一個節點, 也就是對應16B 的那個節點。int d =maxOrder 是將11 賦值給d, 也就是在內存樹的第11 層取節點, 這部分在Page分配時剖析過了。int id = allocateNode(d) 這里獲取的是分析過的, 字節數組memoryMap 的下標, 這里指向一個page, 如果第一次分配, 指向的是0-8k 的那個page, 上一小節對此進行詳細的剖析這里不再贅述。final PoolSubpage<T>[] subpages = this.subpages這一步, 是拿到PoolChunk 中成員變量subpages 的值, 也是個PoolSubpage 的數組, 在PoolChunk 進行初始化的時候, 也會初始化該數組, 長度為2048。也就是說每個chunk 都維護着一個subpage 的列表, 如果每一個page 級別的內存都需要被切分成子page, 則會將這個這個page 放入該列表中, 專門用於分配子page, 所以這個列表中的subpage, 其實就是一個用於切分的page。
int subpageIdx = subpageIdx(id) 這一步是通過id 拿到這個PoolSubpage 數組的下標, 如果id 對應的page 是0-8k的節點, 這里拿到的下標就是0。在if (subpage == null) 中, 因為默認subpages 只是創建一個數組, 並沒有往數組中賦值, 所以第一次走到這里會返回true, 跟到if 塊中:
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
這里通過new PoolSubpage 創建一個新的subpage 之后, 通過subpages[subpageIdx] = subpage 這種方式將新創建的subpage 根據下標賦值到subpages 中的元素中。在new PoolSubpage 的構造方法中, 傳入head, 就是我們剛才提到過的tinySubpagePools 屬性中的節點, 如果我們分配的16 字節的緩沖區, 則這里對應的就是第一個節點,我們跟到PoolSubpage 的構造方法中:
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) { this.chunk = chunk; this.memoryMapIdx = memoryMapIdx; this.runOffset = runOffset; this.pageSize = pageSize; bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64 init(head, elemSize); }
這里重點關注屬性bitmap, 這是一個long 類型的數組, 初始大小為8, 這里只是初始化的大小, 真正的大小要根據將page 切分多少塊而確定,這里將屬性進行了賦值, 我們跟到init()方法中:
void init(PoolSubpage<T> head, int elemSize) { doNotDestroy = true; this.elemSize = elemSize; if (elemSize != 0) { maxNumElems = numAvail = pageSize / elemSize; nextAvail = 0; bitmapLength = maxNumElems >>> 6; if ((maxNumElems & 63) != 0) { bitmapLength ++; } for (int i = 0; i < bitmapLength; i ++) {
//bitmap 標識哪個子page 被分配
//0 標識未分配, 1 表示已分配 bitmap[i] = 0; } }/加到arena 里面 addToPool(head);
}
this.elemSize = elemSize 表示保存當前分配的緩沖區大小, 這里我們以16 字節舉例, 所以這里是16。maxNumElems= numAvail = pageSize / elemSize 這里初始化了兩個屬性maxNumElems, numAvail, 值都為pageSize / elemSize,表示一個page 大小除以分配的緩沖區大小, 也就是表示當前page 被划分了多少分。numAvail 則表示剩余可用的塊數, 由於第一次分配都是可用的, 所以numAvail=maxNumElems;bitmapLength 表示bitmap 的實際大小, 剛才我們分析過, bitmap 初始化的大小為8, 但實際上並不一定需要8 個元素,元素個數要根據page 切分的子塊而定, 這里的大小是所切分的子塊數除以64。再往下看, if ((maxNumElems & 63) != 0) 判斷maxNumElems 也就是當前配置所切分的子塊是不是64 的倍數, 如果不是, 則bitmapLength 加1,最后通過循環, 將其分配的大小中的元素賦值為0。
這里詳細說明一下bitmap, 這里是個long 類型的數組, long 數組中的每一個值, 也就是long 類型的數字, 其中的每一個比特位, 都標記着page 中每一個子塊的內存是否已分配, 如果比特位是1, 表示該子塊已分配, 如果比特位是0,表示該子塊未分配, 標記順序是其二進制數從低位到高位進行排列。我們應該知道為什么bitmap 大小要設置為子塊數量除以64, 因為long 類型的數字是64 位, 每一個元素能記錄64 個子塊的數量, 這樣就可以通過子page 個數除以64的方式決定bitmap 中元素的數量。如果子塊不能整除64, 則通過元素數量+1 方式, 除以64 之后剩余的子塊通過long中比特位由低到高進行排列記錄,其邏輯結構如下圖所示:
進入PoolSubpage 的addToPool(head)方法:
private void addToPool(PoolSubpage<T> head) { assert prev == null && next == null; prev = head; next = head.next; next.prev = this; head.next = this; }
這里的head 我們剛才講過, 是Arena 中數組tinySubpagePools 中的元素, 通過以上邏輯, 就會將新創建的Subpage通過雙向鏈表的方式關聯到tinySubpagePools 中的元素, 我們以16 字節為例, 關聯關系如圖:
這樣, 下次如果還需要分配16 字節的內存, 就可以通過tinySubpagePools 找到其元素關聯的subpage 進行分配了。我們再回到PoolChunk 的allocateSubpage()方法,創建完了一個subpage, 我們就可以通過subpage.allocate()方法進行內存分配了。我們跟到allocate()方法中:
long allocate() { if (elemSize == 0) { return toHandle(0); } if (numAvail == 0 || !doNotDestroy) { return -1; }
//取一個bitmap 中可用的id(絕對id) final int bitmapIdx = getNextAvail(); //除以64(bitmap 的相對下標) int q = bitmapIdx >>> 6; //除以64 取余, 其實就是當前絕對id 的偏移量 int r = bitmapIdx & 63; assert (bitmap[q] >>> r & 1) == 0; //當前位標記為1 bitmap[q] |= 1L << r; //如果可用的子page 為0 //可用的子page-1 if (-- numAvail == 0) { //則移除相關子page removeFromPool(); }
//bitmapIdx 轉換成handler return toHandle(bitmapIdx); }
這里的邏輯看起來比較復雜, 我們一點點來剖析,首先看:
final int bitmapIdx = getNextAvail();
其中bitmapIdx 表示從bitmap 中找到一個可用的bit 位的下標, 注意, 這里是bit 的下標, 並不是數組的下標, 我們之前分析過, 因為每一比特位代表一個子塊的內存分配情況, 通過這個下標就可以知道那個比特位是未分配狀態,我們跟進去:
private int getNextAvail() { int nextAvail = this.nextAvail; if (nextAvail >= 0) {
//一個子page 被釋放之后, 會記錄當前子page 的bitmapIdx 的位置, 下次分配可以直接通過bitmapIdx 拿到一個子page this.nextAvail = -1; return nextAvail; } return findNextAvail(); }
上述代碼片段中的nextAvail, 表示下一個可用的bitmapIdx, 在釋放的時候的會被標記, 標記被釋放的子塊對應bitmapIdx 的下標, 如果<0 則代表沒有被釋放的子塊, 則通過findNextAvail 方法進行查找,繼續跟進findNextAvail()方法:
private int findNextAvail() { //當前long 數組 final long[] bitmap = this.bitmap; //獲取其長度 final int bitmapLength = this.bitmapLength; for (int i = 0; i < bitmapLength; i ++) { //第i 個 long bits = bitmap[i]; //!=-1 說明64 位沒有全部占滿 if (~bits != 0) { //找下一個節點 return findNextAvail0(i, bits); } }
return -1; }
這里會遍歷bitmap 中的每一個元素, 如果當前元素中所有的比特位並沒有全部標記被使用, 則通過findNextAvail0(i,bits)方法一個一個往后找標記未使用的比特位。再繼續跟findNextAvail0():
private int findNextAvail0(int i, long bits) { //多少份 final int maxNumElems = this.maxNumElems; //乘以64, 代表當前long 的第一個下標 final int baseVal = i << 6; //循環64 次(指代當前的下標) for (int j = 0; j < 64; j ++) { //第一位為0(如果是2 的倍數, 則第一位就是0) if ((bits & 1) == 0) { //這里相當於加, 將i*64 之后加上j, 獲取絕對下標 int val = baseVal | j; //小於塊數(不能越界) if (val < maxNumElems) { return val; } else { break; } }
//當前下標不為0 //右移一位 bits >>>= 1; }
return -1; }
這里從當前元素的第一個比特位開始找, 直到找到一個標記為0 的比特位, 並返回當前比特位的下標, 大致流程如下圖所示:
我們回到allocate()方法中:
long allocate() { if (elemSize == 0) { return toHandle(0); } if (numAvail == 0 || !doNotDestroy) { return -1; } //取一個bitmap 中可用的id(絕對id) final int bitmapIdx = getNextAvail(); //除以64(bitmap 的相對下標) int q = bitmapIdx >>> 6; //除以64 取余, 其實就是當前絕對id 的偏移量 int r = bitmapIdx & 63; assert (bitmap[q] >>> r & 1) == 0; //當前位標記為1 bitmap[q] |= 1L << r; //如果可用的子page 為0 //可用的子page-1 if (-- numAvail == 0) { //則移除相關子page removeFromPool(); } //bitmapIdx 轉換成handler return toHandle(bitmapIdx); }
找到可用的bitmapIdx 之后, 通過int q = bitmapIdx >>> 6 獲取bitmap 中bitmapIdx 所屬元素的數組下標。int r =bitmapIdx & 63 表示獲取bitmapIdx 的位置是從當前元素最低位開始的第幾個比特位。bitmap[q] |= 1L << r 是將bitmap 的位置設置為不可用, 也就是比特位設置為1, 表示已占用。然后將可用子配置的數量numAvail 減1。如果沒有可用子page 的數量, 則會將PoolArena 中的數組tinySubpagePools 所關聯的subpage 進行移除。最后通過toHandle(bitmapIdx)獲取當前子塊的handle, 上一小節我們知道handle 指向的是當前chunk 中的唯一的一塊內存, 我們跟進toHandle(bitmapIdx)中:
private long toHandle(int bitmapIdx) { return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx; }
(long) bitmapIdx << 32 是將bitmapIdx 右移32 位, 而32 位正好是一個int 的長度, 這樣, 通過(long) bitmapIdx <<32 | memoryMapIdx 計算, 就可以將memoryMapIdx, 也就是page 所屬的下標的二進制數保存在(long) bitmapIdx<< 32 的低32 位中。0x4000000000000000L 是一個最高位是1 並且所有低位都是0 的二進制數, 這樣通過按位或的方式可以將(long) bitmapIdx << 32 | memoryMapIdx 計算出來的結果保存在0x4000000000000000L 的所有低位中,這樣, 返回對的數字就可以指向chunk 中唯一的一塊內存,我們回到PoolArena 的allocateNormal 方法中:
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { //首先在原來的chunk 上進行內存分配(1) if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //創建chunk 進行內存分配(2) PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle > 0; //初始化byteBuf(3) c.initBuf(buf, handle, reqCapacity); qInit.add(c); }
分析完了long handle = c.allocate(normCapacity)這步, 這里返回的handle 就指向chunk 中的某個page 中的某個子塊所對應的連續內存。最后, 通過iniBuf 初始化之后, 將創建的chunk 加到ChunkList 里面,我們跟到initBuf 方法中:
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) { int memoryMapIdx = memoryMapIdx(handle); //bitmapIdx 是后面分配subpage 時候使用到的 int bitmapIdx = bitmapIdx(handle); if (bitmapIdx == 0) { byte val = value(memoryMapIdx); assert val == unusable : String.valueOf(val); //runOffset(memoryMapIdx):偏移量 //runLength(memoryMapIdx):當前節點的長度 buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache()); } else { initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity); } }
這部分在前面我們分析過, 相信大家不會陌生, 這里有區別的是if (bitmapIdx == 0) 的判斷, 這里的bitmapIdx 不會是0, 這樣, 就會走到initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity)方法中,跟到initBufWithSubpage()方法:
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) { assert bitmapIdx != 0; int memoryMapIdx = memoryMapIdx(handle); PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)]; assert subpage.doNotDestroy; assert reqCapacity <= subpage.elemSize; buf.init( this, handle, runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize, arena.parent.threadCache()); }
首先拿到memoryMapIdx, 這里會將我們之前計算handle 傳入, 跟進去:
private static int memoryMapIdx(long handle) { return (int) handle; }
這里將其強制轉化為int 類型, 也就是去掉高32 位, 這樣就得到memoryMapIdx,回到initBufWithSubpage 方法中:我們注意在buf 調用init 方法中的一個參數: runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) *subpage.elemSize,這里的偏移量就是, 原來page 的偏移量+子塊的偏移量:bitmapIdx & 0x3FFFFFFF 代表當前分配的子page 是屬於第幾個子page。(bitmapIdx & 0x3FFFFFFF) * subpage.elemSize 表示在當前page 的偏移量。這樣, 分配的ByteBuf 在內存讀寫的時候, 就會根據偏移量進行讀寫。最后,我們跟到init()方法中:
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { //初始化 assert handle >= 0; assert chunk != null; //在哪一塊內存上進行分配的 this.chunk = chunk; //這一塊內存上的哪一塊連續內存 this.handle = handle; memory = chunk.memory; this.offset = offset; this.length = length; this.maxLength = maxLength; tmpNioBuf = null; this.cache = cache; }
這里又是我們熟悉的邏輯, 初始化了屬性之后, 一個緩沖區分配完成,以上就是Subpage 級別的緩沖區分配邏輯。