Netty源碼—五、內存分配概述


Netty中的內存管理應該是借鑒了FreeBSD內存管理的思想——jemalloc。Netty內存分配過程中總體遵循以下規則:

  • 優先從緩存中分配
  • 如果緩存中沒有的話,從內存池看看有沒有剩余可用的
  • 如果已申請的沒有的話,再真正申請內存
  • 分段管理,每個內存大小范圍使用不同的分配策略

我們先總體上看下Netty內存分配的策略,然后再結合對應的數據結構來看看每種策略的具體實現。

總體分配策略

netty根據需要分配內存的大小使用不同的分配策略,主要分為以下幾種情況(pageSize默認是8K, chunkSize默認是16m):

tiny: allocateSize<512,allocateSubpage

small: pageSize>=allocateSize >=512,allocateSubpage

normal: chunkSize >= allocateSize > pageSize ,allocateRun

huge: allocateSize > chunkSize

內存分配的調用堆棧

結合上面內存分配的調用堆棧看看內存分配的主要過程:

  1. new一個ByteBuf,如果是direct則new:PooledUnsafeDirectByteBuf
  2. 從緩存中查找,沒有可用的緩存進行下一步
  3. 從內存池中查找可用的內存,查找的方式如上所述(tiny、small、normal)
  4. 如果找不到則重新申請內存,並將申請到的內存放入內存池
  5. 使用申請到的內存初始化ByteBuf

基本數據結構

PoolSubpage:一個內存頁,默認是8k

PoolChunk:有多個PoolSubpage組成,默認包含2048個subpage,即默認大小是16m

chunk內部包含一個byte數組memoryMap,默認包含4096個元素,memoryMap實際上是一棵完全二叉樹,共有12層,也就是maxOrder默認是11(從0開始),所以這棵樹總共有2048個葉子結點,每個葉子節點對應一個subpage,樹中非葉子節點的內存大小由左子節點的內存大小加上右子節點的內存大小,memoryMap數組中存儲的值是byte類型,其實就是該樹節點在樹中的深度(深度從0開始)。樹的基本結構如下圖:

PoolSubpage表示一個內存頁大小,還可以繼續划分成更小的內存塊,以便能充分利用每一個page。

所以在分配內存的時候,如果分配的內存小於pageSIze(默認8k)大小,則會從PoolSubpage中分配;

如果需要分配的內存大於pageSize且小於chunkSize(默認16m)的內存從chunk中分配

如果大於chunkSize的內存則直接分配,Netty不做進一步管理。

normal內存申請

先看下分配內存的入口方法allocate

// 這個方法進行具體申請內存的操作
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    final int normCapacity = normalizeCapacity(reqCapacity);
    // 小於pageSize(默認是8K)
    if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
        int tableIdx;
        PoolSubpage<T>[] table;
        boolean tiny = isTiny(normCapacity);
        if (tiny) { // < 512
            // 使用緩存
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            tableIdx = tinyIdx(normCapacity);
            table = tinySubpagePools;
        } else {
            // 512-8K
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }

        final PoolSubpage<T> head = table[tableIdx];

        /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                // 這里為什么一定可以找到可用的內存塊(handle>=0)呢?
                // 因為在io.netty.buffer.PoolSubpage#allocate的時候,如果可用內存塊為0了會將該page從鏈表中remove,所以保證了head.next一定有可用的內存
                long handle = s.allocate();
                assert handle >= 0;
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);

                if (tiny) {
                    allocationsTiny.increment();
                } else {
                    allocationsSmall.increment();
                }
                return;
            }
        }
        allocateNormal(buf, reqCapacity, normCapacity);
        return;
    }
    if (normCapacity <= chunkSize) {
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            // was able to allocate out of the cache so move on
            return;
        }
        allocateNormal(buf, reqCapacity, normCapacity);
    } else {
        // Huge allocations are never served via the cache so just call allocateHuge
        allocateHuge(buf, reqCapacity);
    }
}

可以看到除了huge內存之外,其他內存申請都可能會調用到allocateNormal,我們先來看下normal內存的申請。

上文一直提到tiny、small、normal、huge,這里我們看下這些內存划分的標准是什么

// normCapacity < 512
static boolean isTiny(int normCapacity) {
    // 小於512的是tiny
    return (normCapacity & 0xFFFFFE00) == 0;
}

// capacity < pageSize
// subpageOverflowMask = ~(pageSize - 1);
// 所以小於8k的是small或者tiny,結合tiny的范圍,small的范圍就是:512-8192
boolean isTinyOrSmall(int normCapacity) {
    return (normCapacity & subpageOverflowMask) == 0;
}

大於8k但是小於chunkSize(16m)的使用allocateNormal來申請內存,所以我們暫且把這個范圍的稱為normal

大於chunkSize使用allocateHuge來申請內存,我們暫且把這個范圍的稱為huge。

還需要一個鋪墊,申請內存大小的規整。Netty並不是申請多少就分配多少,會根據一定的規則分配大於等於需要內存的規整過的值。上面在allocate方法剛開始就會先將reqCapacity規范化為normCapacity,使用下面的方法,規范的過程也是進行了分類,不同的內存大小類型規范化的方式不一樣

// 規范化申請的內存大小為2的指數次
// io.netty.buffer.PoolArena#normalizeCapacity
int normalizeCapacity(int reqCapacity) {
    if (reqCapacity < 0) {
        throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
    }
    if (reqCapacity >= chunkSize) {
        return reqCapacity;
    }

    if (!isTiny(reqCapacity)) { // >= 512
        // Doubled
		// 如果申請的內存大於512則規范化為大於reqCapacity的最近的2的指數次的值
        int normalizedCapacity = reqCapacity;
        // 當normalizedCapacity本身就是2的指數次的時候,取其本身
        // 這個時候防止下面的算法再向后查找,先將normalizedCapacity-1
        normalizedCapacity --;
        normalizedCapacity |= normalizedCapacity >>>  1;
        normalizedCapacity |= normalizedCapacity >>>  2;
        normalizedCapacity |= normalizedCapacity >>>  4;
        normalizedCapacity |= normalizedCapacity >>>  8;
        normalizedCapacity |= normalizedCapacity >>> 16;
        normalizedCapacity ++;

        // 如果上面的計算結果溢出了(如果reqCapacity是Integer.MAX_VALUE),則去掉最高位
        if (normalizedCapacity < 0) {
            normalizedCapacity >>>= 1;
        }

        return normalizedCapacity;
    }

    // 下面之所以是16的倍數是因為用來管理tiny內存tinySubpagePools數組的大小剛好是512>>>4,32個元素
    // 每個元素PoolSubpage本身會構成鏈表,也就是說每個元素(PoolSubpage)對應的鏈表內每個元素的內存塊大小(elemSize)是相同的,數組內每個鏈表的elemSize依次是:
    // 16,32,48......480,496,512
    // Quantum-spaced
    // 剛好是16的倍數(這個時候reqCapacity<512)
    if ((reqCapacity & 15) == 0) {
        return reqCapacity;
    }
	// 找到距離reqCapacity最近的下一個16的倍數
    return (reqCapacity & ~15) + 16;
}

這里解釋下上面if (!isTiny(reqCapacity))里面的位運算,由於要尋找的數是2的指數次,所以二進制表示除了最高位是1,后面的位都應該是0,假設尋找的是x // x-1的二進制所有位都是1,所以變成了尋找比x少一位的二進制全1的數

normalizedCapacity二進制表示的第一位肯定是1,右移1位之后,第二位變為了1,兩者進行邏輯或的時候,前兩位一定是1同理

  • 繼續右移2位之后,前4位肯定是1
  • 繼續右移4位之后,前8位肯定是1
  • 繼續右移8位之后,前16位肯定是1
  • 繼續右移16位之后,前32位一定是1

這樣就找到了全是1的數,然后再加上1就是2的指數次。

總結上面的代碼邏輯,規范化的過程是:

  • 如果是huge,大於chunkSize直接返回reqCapacity
  • 如果是small或者normal,大於512,則規范化為大於reqCapacity的最近的2的指數次的值
  • 如果是tiny,小於512,則規范為16的倍數

上面這個規范化的原因和每類內存申請的數據結構有密切的關系,我們這里先只關心normal類型的被規范化為2的指數次。

下面接着看normal內存的申請

// io.netty.buffer.PoolArena#allocateNormal
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    // 先從內存池中獲取需要的內存
    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
        q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
        q075.allocate(buf, reqCapacity, normCapacity)) {
        return;
    }

    // 如果從現有內存池中沒有找到可用的內存,則重新申請一個chunk
    // Add a new chunk.
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
    long handle = c.allocate(normCapacity);
    assert handle > 0;
    // 用申請的內存初始化buffer
    c.initBuf(buf, handle, reqCapacity);
    // 剛剛初始化的chunk放在init鏈表中
    qInit.add(c);
}

先看下上面用到的幾個數據結構

// 每個鏈表存放的是已經被分配過的chunk,不同使用率的chunk被存放在不同的鏈表中
// 初始情況鏈表都是空的,剛開始從init開始,依次向后尋找,找到合適范圍的list,然后add
// qinit - q000 - q025 - q050 - q075 - q100 
// 注意qinit和q000之間是單向,也就是說qinit的chunk可以move到q000,但是q000的chunk不能再向前move了
// 使用率在50%-100%
private final PoolChunkList<T> q050;
// 使用率在25%-75%
private final PoolChunkList<T> q025;
// 使用率在1%-50%
private final PoolChunkList<T> q000;
// 使用率在0%-25%
private final PoolChunkList<T> qInit;
// 使用率在75%-100%
private final PoolChunkList<T> q075;
// 使用率100%
private final PoolChunkList<T> q100;

allocateNormal一開始先從已經申請過的chunk中查找有無可用內存,上面幾個鏈表查找的順序是q050、q025、q000、qinit、q075,關於這個順序問題可以參考網上的一篇文章。還有一個問題,既然有了q000,為什么需要qinit?前面這篇文章也有介紹,這結合我的理解再說下:

首先qinit的內存使用率是0-25%,q000的內存使用率是1-50%,q000沒有使用率是0的chunk;其次在構成鏈表的時候qinit.next = q000,但是q000.prev是null,如果是一個chunk的使用率變為0以后,調用io.netty.buffer.PoolChunkList#free方法來釋放,由於當前使用率是0,而q000的最小使用率是1%,所以會執行下面代碼的remove和move,將這個chunk移除出鏈表等待被回收。

// io.netty.buffer.PoolChunkList#free
boolean free(PoolChunk<T> chunk, long handle) {
    chunk.free(handle);
    if (chunk.usage() < minUsage) {
        // 如果小於當前鏈表的最小利用率,將chunk從鏈表中移除
        remove(chunk);
        // Move the PoolChunk down the PoolChunkList linked-list.
        // 將chunk添加到當前鏈表的上一個鏈表,,如果是當前鏈表是q000,沒有上一個,也就是這個chunk不能再被使用,等待被GC護回收內存
        return move0(chunk);
    }
    return true;
}

基本數據結構說了,接下來看看具體的邏輯。allocateNormal一開始是調用的PoolChunkList#allocate

// io.netty.buffer.PoolChunkList#allocate
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    if (head == null || normCapacity > maxCapacity) {
        // Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
        // be handled by the PoolChunks that are contained in this PoolChunkList.
        // 當前鏈表為空或者申請內存的大小大於當前鏈表的利用率直接返回false
        return false;
    }

    // 從鏈表頭開始依次查找可用的內存
    for (PoolChunk<T> cur = head;;) {
        // 針對每個chunk申請內存,如果返回handle小於0表示沒有符合條件的內存,繼續查找下一個
        long handle = cur.allocate(normCapacity);
        if (handle < 0) {
            cur = cur.next;
            if (cur == null) {
                return false;
            }
        } else {
            // 如果找到可用的內存,用來初始化buffer
            cur.initBuf(buf, handle, reqCapacity);
            if (cur.usage() >= maxUsage) {
                // 如果當前chunk的利用率大於當前鏈表的最大利用率需要移動到下一個鏈表
                remove(cur);
                nextList.add(cur);
            }
            return true;
        }
    }
}

接下來看看最終chunk是怎么分配內存的,下面這個方法其實就是計算出chunk中符合條件的內存的memoryMap數組中的index

// io.netty.buffer.PoolChunk#allocate
long allocate(int normCapacity) {
    if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
        // 大於等於pageSize,8k
        return allocateRun(normCapacity);
    } else {
        // 小於pageSize
        return allocateSubpage(normCapacity);
    }
}

這個里面就根據申請內存的大小使用不同的分配策略,這里先看下大於pageSize的內存分配

大於pageSize的內存申請

// io.netty.buffer.PoolChunk#allocateRun
private long allocateRun(int normCapacity) {
    // 計算出所申請的內存位於樹的哪一層
    // Integer.numberOfLeadingZeros(pageSize)表示pageSize二進制表示的時候,前面有多個0,也就是第一個1出現的位置
    // pageShift = Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize) = 13
    // pageShift就是pageSize的最高位相對於右邊的位偏移
    int d = maxOrder - (log2(normCapacity) - pageShifts);
    // 在d層找到可用的內存並返回對應的memoryMap數組元素的index
    int id = allocateNode(d);
    if (id < 0) {
        // 如果小於0說明沒有找到
        return id;
    }
    // 如果找到了則更新當前chunk可用內存大小
    freeBytes -= runLength(id);
    return id;
}

上面代碼中d的含義:要申請內存位於memoryMap數的第幾層(下面是第maxOrder層)

maxOrder:表示memoryMap數總共有maxOrder層,從上到下依次是0,1,2,3,4,5,6,7,8,9,10,11,也即是默認12層

log2(normCapacity):要申請內存大小二進制最高位相對於右邊的偏移

pageShift:pageSize二進制最高位相對於右邊的偏移

(log2(normCapacity) - pageShifts):要申請內存的大小在樹中的層數與最底層的層數(最底層的內存大小是pageSIze,8K)差,memoryMap樹中,兩層之間表示的內存是2倍關系,比如第10層是第11層的2倍,第11層是8k,那么第10層就是16k

maxOrder - (log2(normCapacity) - pageShifts):表示要申請的內存大小位於數的哪一層

舉個例子,假如申請的內存是32k,log2(normalCapacity) = 15,32k距離8k中間隔了15-13=2層,所以要申請的內存位於第11-2=9層,驗證一下,葉子節點內存大小是一個page8k,父節點16k,父節點的父節點是32k,所以申請的內存應該位於上一層的上一層,也就是第9層

那么怎么從樹中找到合適的內存呢,下面這個方法就是在指定的d層找到符合條件的內存,由於每個節點的值本來是該節點所在的深度depth,如果該節點的內存(包括子節點)已經被分配過,則會被標記為不可用,所以只要從根節點開始,依次向下查找,如果當前節點沒有被分配則找左子節點,如果該節點已經被分配了則找當前節點的兄弟節點,直到找到第d層依然沒有可用內存的時候,

/**
* Algorithm to allocate an index in memoryMap when we query for a free node
* at depth d
*
* @param d depth 樹的深度
* @return index in memoryMap
*/
// 在d層查找一個可用的節點,根據所在層數來查找蓋層可用的內存的memoryMap的index
private int allocateNode(int d) {
    int id = 1;
    int initial = - (1 << d); // has last d bits = 0 and rest all = 1
    byte val = value(id);
    if (val > d) { // unusable
        return -1;
    }
    // id < 2^d 的時候 id & initial = 0,也就是說d層所有的id&inital都是大於0的
    // 所以這兩個條件就限定了尋找的節點在d層,並且可用free
    // 這里就是查找第d層的可用節點,為什么不直接從d層開始查找呢?
    while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
        id <<= 1;
        val = value(id);
        // 如果該節點的子節點已經被分配過了,那么找兄弟節點
        if (val > d) {
            // id = id + 1
            id ^= 1;
            val = value(id);
        }
    }
    byte value = value(id);
    assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
                                                                  value, id & initial, d);
    // 將當前內存標記為不可用,也就是將該節點的數組值更新maxOrder+1
    setValue(id, unusable); // mark as unusable
    updateParentsAlloc(id);
    return id;
}

前面說過chunk的數據結構memoryMap,申請的內存的時候也是和這個數據結構密切相關,其實就是找到這棵樹中可用內存的節點的index,然后更新memoryMap中內存的使用情況。

再看一個問題:為什么不直接從d層開始查找呢,而是從根節點開始?

為了更快的查找。如果直接從d層開始查找,需要的時間是最壞是2^d(要找的節點在最右側),最好是1(要找的節點在最左側),因為可能存在前面的節點已經被分配的情況。使用從上往下查找的時候,如果發現子節點被分配過了,就直接查找兄弟節點,時間是log2d

上面是大於等於pageSize的內存申請,接下來看看另外一種小於pageSize的內存申請

小於pageSize的內存申請

// io.netty.buffer.PoolChunk#allocateSubpage
private long allocateSubpage(int normCapacity) {
    // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
    // This is need as we may add it back and so alter the linked-list structure.
    // 從tinySubpagePools或者smallSubpagePools中查找
    PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
    synchronized (head) {
        // 由於是小於pageSize的,所以直接在最后一層,也就是樹的最底層查找,因為樹的最底層的節點的內存大小事pageSize
        int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
        int id = allocateNode(d);
        if (id < 0) {
            return id;
        }

        final PoolSubpage<T>[] subpages = this.subpages;
        final int pageSize = this.pageSize;
		// 更新當前chunk的剩余內存大小,本次申請的是一個page,所以減去pageSize
        freeBytes -= pageSize;
		// memoryIdx - 2048就是page的index
        int subpageIdx = subpageIdx(id);
        PoolSubpage<T> subpage = subpages[subpageIdx];
        if (subpage == null) {
            // 該page尚未初始化過,第一次初始化
            // 初始化的過程會將subpage放到鏈表頭
            subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
            subpages[subpageIdx] = subpage;
        } else {
            // 重新init
            subpage.init(head, normCapacity);
        }
        // 返回的是申請到內存的handle,包含了bitmap和memoryMapIdx信息
        return subpage.allocate();
    }
}

/**
* Returns the bitmap index of the subpage allocation.
* 返回的是申請到的可用內存塊在bitmap數組中對應bit的index
*/
long allocate() {
    if (elemSize == 0) {
        return toHandle(0);
    }

    if (numAvail == 0 || !doNotDestroy) {
        return -1;
    }

    // 獲取下一個可用內存塊的位置
    final int bitmapIdx = getNextAvail();
    // 右移6位,得到的是該bit對應bitmap數組中的下標
    int q = bitmapIdx >>> 6;
    // 邏輯與63,得到的是該bit在long型數據中的第幾個bit
    int r = bitmapIdx & 63;
    assert (bitmap[q] >>> r & 1) == 0;
    // 將該bit置為已使用
    bitmap[q] |= 1L << r;
    // 將可用的內存塊總數減1
    if (-- numAvail == 0) {
        removeFromPool();
    }

    return toHandle(bitmapIdx);
}

private long toHandle(int bitmapIdx) {
    // 2^62,高位是bitmapIdx,低位是memoryMapIdx
    return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}

private int getNextAvail() {
    int nextAvail = this.nextAvail;
    if (nextAvail >= 0) {
        this.nextAvail = -1;
        return nextAvail;
    }
    return findNextAvail();
}

private int findNextAvail() {
    final long[] bitmap = this.bitmap;
    final int bitmapLength = this.bitmapLength;
    // 從實際使用到的標志位bitmapLength中查找
    for (int i = 0; i < bitmapLength; i ++) {
        long bits = bitmap[i];
        // bits不全為1,表示有空余
        if (~bits != 0) {
            return findNextAvail0(i, bits);
        }
    }
    return -1;
}

private int findNextAvail0(int i, long bits) {
    final int maxNumElems = this.maxNumElems;
    // i為數組的下標,表示第i個long數
    final int baseVal = i << 6;
    // 判斷long的每一位是否時可用的
    for (int j = 0; j < 64; j ++) {
        if ((bits & 1) == 0) {
            // 找到可用的后,記錄可用內存塊對應於bitmap中的index,j為該long數據的第j位
            int val = baseVal | j;
            if (val < maxNumElems) {
                return val;
            } else {
                break;
            }
        }
        // 查找下一位
        bits >>>= 1;
    }
    return -1;
}


首先是從內存池中獲取內存,PoolArena維護了以下兩個數組,arena.findSubpagePoolHead就是從以下兩個數組中查找可用的chunk的head,接下來就是從找到的chunk中查找可用內存,類似上面大於8k申請的查找方式,實際上是調用的同一個方法:io.netty.buffer.PoolChunk#allocateNode

// small內存池,數組大小是log2(pageSize>>>10),默認就是4,所以數組index的計算方式也是這樣log2(reqCapacity>>>10)
// 每個元素保存的是一個PoolSubpage鏈表,一個PoolSubpage鏈表里面的subpage都是相同的塊大小(8k分成小童的份數)
// 每個元素的elemSize依次是:1k,2k,4k,8k
private final PoolSubpage<T>[] smallSubpagePools
// tiny內存池,數組大小是512 >>> 4,也就是32,數組index的計算方式也就是一樣:reqCapacity除以16
// 每個元素的elemSize依次是:16,32,48......480,496,512,每個PoolSubpage又是一個鏈表,該鏈表上的elemSize相同
private final PoolSubpage<T>[] tinySubpagePools;

這里也可以看出之前將申請內存的大小規范化的目的了,也能理解為什么tiny規范化為16的倍數,而small規范化為2的指數次。因為

small的內存范圍是:8192 > size > 512,大於512並且是2的指數次,正好是1k、2k、4k、8k

tiny的內存范圍是:size <= 512,16的倍數正好是16、32、48...

init buffer

上面一系列的方法都是計算出可用內存在memoryMap數組中的index,接下來看看怎么使用這個index初始化buffer

// io.netty.buffer.PoolChunk#initBuf
// 找到chunk信息后調用這個方法初始化buf,用分配好的內存初始化buf
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
    int memoryMapIdx = memoryMapIdx(handle);
    int bitmapIdx = bitmapIdx(handle);
    // 如果分配的內存大於一個page,不需要使用bitmapIdx,這個時候bitmapIdx為0
    if (bitmapIdx == 0) {
        byte val = value(memoryMapIdx);
        assert val == unusable : String.valueOf(val);
        buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),
                 arena.parent.threadCache());
    } else {
        initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
    }
}

// io.netty.buffer.PoolChunk#initBufWithSubpage(io.netty.buffer.PooledByteBuf<T>, long, int, int)
// 使用PoolSubpage初始化buf
// handle構成:2^62 | long(bitmapIdx) << 32 | memoryMapIdx
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
    assert bitmapIdx != 0;
	// memoryMapIdx方法是直接將handler強轉為int,也就是截去了高32位,低32位就是memoryMapIdx
    int memoryMapIdx = memoryMapIdx(handle);

    PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
    assert subpage.doNotDestroy;
    assert reqCapacity <= subpage.elemSize;
	// 0x3FFFFFFF是2^31-1,此刻的bitmapIdx是2^30 | bitmapIdx
    // 所以bitmapIdx & 0x3FFFFFFF得到的就是原始的bitmapIdx
    buf.init(
        this, handle,
        runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize,
        arena.parent.threadCache());
}

// 根據memoryIdx獲取對應的subpage的序號,memoryIdx一定是位於2048-4097之間才能使用此方法
private int subpageIdx(int memoryMapIdx) {
    // memoryIdx位於2048-4097之間,和2048即2^11進行異或運算,相當於memoryIdx-2048
    return memoryMapIdx ^ maxSubpageAllocs; // remove highest set bit, to get offset
}

上面根據不同的handle使用不同的初始化buffer的策略

  • 大於pageSize返回的handle,是一個memoryMap數組的下標
  • 小於pageSize返回的是handle,包含了bitmap和memoryMapIdx數組元素的下標信息

這里再說下bitmap是什么意思,由於一個PoolSubpage的大小默認是8k,如果要分割成更小的內存塊需要使用一個標記來記錄這個page划分成的內存塊哪些已經使用,那些是可用的,io.netty.buffer.PoolSubpage#bitmap就是這個作用,PoolSubpage中相關的字段有

// 記錄subpage分割后的內存塊的使用情況,一個bit對應一個內存塊
private final long[] bitmap;

PoolSubpage<T> prev;
PoolSubpage<T> next;

boolean doNotDestroy;
// 內存塊的大小
int elemSize;
// 划分后的內存塊的大小
private int maxNumElems;
// 實際使用到的bitmap數組的長度
private int bitmapLength;
// 下一個可用的內存塊的位置
private int nextAvail;
private int numAvail;

allocateNormal終於申請到了內存,也初始化了buffer,但是這個方法不僅包含了分配normal內存的方法也包含了tiny和small內存的分配方法。

tiny內存分配

tiny內存分配流程:

  1. 如果申請的是tiny類型,會先從tiny緩存中嘗試分配,如果緩存分配成功則返回

  2. 否則從tinySubpagePools中嘗試分配

  3. 如果上面沒有分配成功則使用allocateNormal進行分配

從緩存中分配

這里以啟用了緩存為例來說明,使用到的緩存類是PoolThreadCache,緩存是通過隊列實現的,一個隊列中存儲的內存大小都是相同的

// io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, io.netty.buffer.PooledByteBuf<T>, int)
// 這里的cache是PoolThreadCache
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
    // was able to allocate out of the cache so move on
    return;
}

boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    // 緩存維護了一個隊列,這個隊列中存儲的內存塊大小都相同
    // 找到緩存之后,從隊列中取出一個內存塊用來初始化buffer
    return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

// 查找緩存數組中緩存的內存
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
    // 計算出申請內存位於緩存數組中的位置,即數組下標
    int idx = PoolArena.tinyIdx(normCapacity);
    if (area.isDirect()) {
        // 使用直接內存的緩存
        return cache(tinySubPageDirectCaches, idx);
    }
    // 使用堆內存的緩存
    return cache(tinySubPageHeapCaches, idx);
}

static int tinyIdx(int normCapacity) {
    // 由於tiny緩存數組大小是32,依次對應的內存大小是16、32...,512,所以數組的下標應該是申請內存的大小除以16
    // normCapacity = normCapacity / 16
    return normCapacity >>> 4;
}

從緩存中分配內存的過程

  1. 尋找緩存tinySubPageDirectCaches
  2. 使用緩存中的chunk初始化buf

關於tiny緩存的數據結構

// tiny緩存
// 是一個SubPageMemoryRegionCache數組,默認緩存數組長度:32,依次存儲的內存大小是16、32、48...,512
io.netty.buffer.PoolThreadCache#tinySubPageHeapCaches

// 初始化tiny緩存數組,數組大小是32
// static final int numTinySubpagePools = 512 >>> 4;
tinySubPageHeapCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);


// 初始化tiny和small緩存數組
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
    int cacheSize, int numCaches, SizeClass sizeClass) {
    if (cacheSize > 0) {
        @SuppressWarnings("unchecked")
        MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
        for (int i = 0; i < cache.length; i++) {
            // TODO: maybe use cacheSize / cache.length
            cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
        }
        return cache;
    } else {
        return null;
    }
}

緩存使用隊列實現,一個隊列最大元素個數

DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);

從tinySubpagePools中分配

上面緩存中如果沒有分配到內存的話,會向內存池tinySubpagePools申請,主要邏輯是:

  1. 計算tinySubpagePools數組的index,右移4,除以16(該數組長度是512>>>4)
  2. 取出index出的subpage,也就是這個index處head
  3. 從head.next查找合適的內存
  4. 找到可用內存后使用io.netty.buffer.PoolChunk#initBufWithSubpage(io.netty.buffer.PooledByteBuf , long, int)初始化buffer

前面已經介紹過tinySubpagePools,是一個數組,數組大小是32,每個元素是一個PoolSubpage,PoolSubpage本身是一個鏈表,所以要在這個里面查找可用內存,先要計算出數組下表,然后找到該位置的PoolSubpage,取出這個鏈表的頭,然后分配內存。關鍵代碼如下

// io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, io.netty.buffer.PooledByteBuf<T>, int)
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    final int normCapacity = normalizeCapacity(reqCapacity);
    if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
        int tableIdx;
        PoolSubpage<T>[] table;
        boolean tiny = isTiny(normCapacity);
        if (tiny) { // < 512
            // 省略中間代碼...
            tableIdx = tinyIdx(normCapacity);
            table = tinySubpagePools;
        } else {
            // 省略中間代碼...
        }

        final PoolSubpage<T> head = table[tableIdx];

        /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
        synchronized (head) {
            final PoolSubpage<T> s = head.next;
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                long handle = s.allocate();
                assert handle >= 0;
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                incTinySmallAllocation(tiny);
                return;
            }
        }
        // 省略中間代碼... 
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM