netty源碼解析(4.0)-24 ByteBuf基於內存池的內存管理


 io.netty.buffer.PooledByteBuf<T>使用內存池中的一塊內存作為自己的數據內存,這個塊內存是PoolChunk<T>的一部分。PooledByteBuf<T>是一個抽象類型,它有4個派生類:

  • PooledHeapByteBuf, PooledUnsafeHeapByteBuf 使用堆內存的PooledByteBuffer<byte[]>。
  • PooledDirectByteBuf, PooledUnsafeDirectByteBuf 使用直接內存的PooledByteBuf<ByteBuffer>。

初始化

  PooledByteBuf的初始化過程分為兩個步驟:創建實例;初始化內存。這兩個步驟的代碼如下:

    protected PooledByteBuf(Recycler.Handle recyclerHandle, int maxCapacity) {
        super(maxCapacity);
        this.recyclerHandle = recyclerHandle;
    }

    void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        init0(chunk, handle, offset, length, maxLength, cache);
    }

    private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        assert handle >= 0;
        assert chunk != null;

        this.chunk = chunk;
        memory = chunk.memory;
        allocator = chunk.arena.parent;
        this.cache = cache;
        this.handle = handle;
        this.offset = offset;
        this.length = length;
        this.maxLength = maxLength;
        tmpNioBuf = null;
    }

  創建實例時調用的構造方法只是為maxCapacity和recyclerHandler屬性賦值,構造方法是protected,不打算暴露到外面。派生類都提供了newInstance方法創建實例,以PooledHeapByteBuf為例,它的newInstance方法實現如下:

 1     private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
 2         @Override
 3         protected PooledHeapByteBuf newObject(Handle handle) {
 4             return new PooledHeapByteBuf(handle, 0);
 5         }
 6     };
 7 
 8     static PooledHeapByteBuf newInstance(int maxCapacity) {
 9         PooledHeapByteBuf buf = RECYCLER.get();
10         buf.reuse(maxCapacity);
11         return buf;
12     }

  這里的newInstance使用RECYCLER創建實例對象。Recycler<T>是一個輕量級的,支持循環使用的對象池。當對象池中沒有可用對象時,會在第4行使用構造方法創建一個新的對象。

 

  init調用init0初始化數據內存,init0方法為幾個內存相關的關鍵屬性賦值:

  • chunk:  PoolChunk<T>對象,這個PooledByteBuf使用的內存就是它的一部分。
  • memory: 內存對象。更准確地說,PooledByteBuf使用的內存是它的一部分。
  • allocator: 創建這個PooledByteBuf的PooledByteBufAllocator對象。
  • cache:  線程專用的內存緩存。分配內存時會優先從這個緩存中尋找合適的內存塊。
  • handle:  內存在chunk中node的句柄。chunk使用handle可以計算出它對應內存的起始位置offset。
  • offset:  分配內存的起始位置。
  • length: 分配內存的長度,也是這個PooledByteBuf的capacity。
  • maxLength: 這塊內存node的最大長度。當調用capacity(int newCapacity)方法增加capacity時,只要newCapacity不大於這個值,就不用從新分配內存。

  內存初始化完成之后,這個PooledByteBuf可使用的內存范圍是memory內存中[offset, offset+length)。idx方法可以把ByteBuf的索引轉換成memory的索引:

1     protected final int idx(int index) {
2         return offset + index;
3     }

 

重新分配內存

  和前面講過的ByteBuf實現一樣,PooledByteBuf也需要使用capacity(int newCapacity)改變內存大小,也會涉及到把數據從舊內存中復制到新內存的問題。也就是說,要解決的問題是一樣的,只是具體實現的差異。

 1     @Override
 2     public final ByteBuf capacity(int newCapacity) {
 3         checkNewCapacity(newCapacity);
 4 
 5         // If the request capacity does not require reallocation, just update the length of the memory.
 6         if (chunk.unpooled) {
 7             if (newCapacity == length) {
 8                 return this;
 9             }
10         } else {
11             if (newCapacity > length) {
12                 if (newCapacity <= maxLength) {
13                     length = newCapacity;
14                     return this;
15                 }
16             } else if (newCapacity < length) {
17                 if (newCapacity > maxLength >>> 1) {
18                     if (maxLength <= 512) {
19                         if (newCapacity > maxLength - 16) {
20                             length = newCapacity;
21                             setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
22                             return this;
23                         }
24                     } else { // > 512 (i.e. >= 1024)
25                         length = newCapacity;
26                         setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
27                         return this;
28                     }
29                 }
30             } else {
31                 return this;
32             }
33         }
34 
35         // Reallocation required.
36         chunk.arena.reallocate(this, newCapacity, true);
37         return this;
38     }

  這個方法處理兩大類情況: 不重新分配內存;重新分配內存並復制ByteBuf中的數據和狀態。

  不重新分配內存: 

  8行: chunk不需要回收到內存池中,且newCapacity沒有變化。

  11-32行: chunk需要回收到內存池中。

    13-14行:內存增大,且newcapacity不大於maxLength。把容量修改成newCapacity即可。

    20-22行: 內存減小,  newCapacity 大於maxLength的一半,maxLength<=512, newCapacity >maxLength - 16。 把容量修改成newCapacity, 調整readerIndex, writerIndex。

    25-27行: 內存減小,newCapacity大於maxLength的一半,  maxLength > 512。把容量修改成newCapacity, 調整readerIndex, writerIndex。

    31行: 內存不變,不做任何操作。

  需要重新分配內存:

  36行: 任何不滿足以上情況的都要重新分配內存。這里使用Arena的reallocate方法重新分配內存,並把舊內存釋放調,代碼如下:

 1     //io.netty.buffer.PoolArena#reallocate, 
 2     void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
 3         if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
 4             throw new IllegalArgumentException("newCapacity: " + newCapacity);
 5         }
 6 
 7         int oldCapacity = buf.length;
 8         if (oldCapacity == newCapacity) {
 9             return;
10         }
11 
12         PoolChunk<T> oldChunk = buf.chunk;
13         long oldHandle = buf.handle;
14         T oldMemory = buf.memory;
15         int oldOffset = buf.offset;
16         int oldMaxLength = buf.maxLength;
17         int readerIndex = buf.readerIndex();
18         int writerIndex = buf.writerIndex();
19 
20         allocate(parent.threadCache(), buf, newCapacity);
21         if (newCapacity > oldCapacity) {
22             memoryCopy(
23                     oldMemory, oldOffset,
24                     buf.memory, buf.offset, oldCapacity);
25         } else if (newCapacity < oldCapacity) {
26             if (readerIndex < newCapacity) {
27                 if (writerIndex > newCapacity) {
28                     writerIndex = newCapacity;
29                 }
30                 memoryCopy(
31                         oldMemory, oldOffset + readerIndex,
32                         buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
33             } else {
34                 readerIndex = writerIndex = newCapacity;
35             }
36         }
37 
38         buf.setIndex(readerIndex, writerIndex);
39 
40         if (freeOldMemory) {
41             free(oldChunk, oldHandle, oldMaxLength, buf.cache);
42         }
43     }

  7-9行: 內存大小沒變化,返回。

  12-18行: 記錄下舊內存的信息,readerIndex, writerIndex。

  20行: 為PooledByteBuf分配新內存。

  21-38行: 把舊內存中數據復制到新內存,並把readerIndex,writerIndex設置在正確。

  41行: 釋放就內存。

 

釋放內存

  內存釋放代碼在deallocate中:

 1     @Override
 2     protected final void deallocate() {
 3         if (handle >= 0) {
 4             final long handle = this.handle;
 5             this.handle = -1;
 6             memory = null;
 7             tmpNioBuf = null;
 8             chunk.arena.free(chunk, handle, maxLength, cache);
 9             chunk = null;
10             recycle();
11         }
12     }

  關鍵是第8行代碼,使用PoolArena的free方法釋放內存。然后是recycle把當前PooledByteBuf對象放到RECYCLER中循環使用。

 

PooledByteBufAllocator創建內存管理模塊

  在前面分析PooledByteBuf內存初始化,重新分配及釋放時,看到了內存管理的三個核心模塊: PoolArena(chunk.arena),  PoolChunk(chunk),  PoolThreadCache(cache)。PooledByteBuf的內存管理能力都是使用這三模塊實現的,它本身沒有實現內存管理算法。當需要為PooledByteBuf分配一塊內存時,先從一個線程專用的PoolThreadCache中得到一個PoolArena,  使用PoolArena的allocate找到一個滿足要求內存塊PoolChunk,  從這個內存塊中分配一塊連續的內存handle,計算出這塊內存起始位置的偏移量offset, 最后調用PooledByteBuf的init方法初始化內存完成內存分配。 釋放內存調用PoolArena的free方法。在內存分配時,會優先從PoolThreadCache中尋找合適的內存塊。在內存釋放時會把內存塊暫時放在PoolThreadCache中,等使用頻率過低時才會還給PoolChunk。這三個模塊中PoolArena,  PoolThreadCache由PooledByteBufAllocator創建,PoolChunk由PoolArean維護。

  PooledByteBufAllocator維護了相關的幾個屬性:

  PoolArena<byte[]>[] heapArenas

  PoolArena<ByteBuffer>[] directArenas

  PoolThreadLocalCache threadCache

  headArenas和directArenas分別維護了多個PoolArena, 他們分別用來分配堆內存和直接內存。 如果使用得當,可以讓每個線程持有一個專用的PoolArena,  避免線程間數據同步的開銷。PoolThreadLocalCache會為每個線程創建一個專用的PoolThreadCache實例,這個實例分別持有一個heapArena和directArena。

  接下來的的幾個章節會詳細分析PoolArena和PoolThreadCache的實現代碼。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM