Netty內存池ByteBuf 內存回收


內存池ByteBuf 內存回收:

  在前面的章節中我們有提到, 堆外內存是不受JVM 垃圾回收機制控制的, 所以我們分配一塊堆外內存進行ByteBuf 操作時, 使用完畢要對對象進行回收, 本節就以PooledUnsafeDirectByteBuf 為例講解有關內存分配的相關邏輯。PooledUnsafeDirectByteBuf 中內存釋放的入口方法是其父類AbstractReferenceCountedByteBuf 中的release()方法:

public boolean release() {
   return release0(1);
}
private boolean release0(int decrement) {
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}

if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
deallocate();
return true;
}
return false;
}
}
}

  if (refCnt == decrement) 中判斷當前byteBuf 是否沒有被引用了, 如果沒有被引用, 則通過deallocate()方法進行釋放。因為我們是以PooledUnsafeDirectByteBuf 為例, 所以這里會調用其父類PooledByteBuf 的deallocate()方法:

protected final void deallocate() {
    if (handle >= 0) {
        final long handle = this.handle;
     //表示當前的ByteBuf 不再指向任何一塊內存
this.handle = -1;
     //這里將memory 也設置為null memory
= null;
     //這一步是將ByteBuf 的內存進行釋放 chunk.arena.free(chunk, handle, maxLength, cache);
     //將對象放入的對象回收站, 循環利用 recycle(); } }

  跟進 free 方法:

void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
  //是否為unpooled
  if (chunk.unpooled) {
    int size = chunk.chunkSize();
    destroyChunk(chunk);
    activeBytesHuge.add(-size);
    deallocationsHuge.increment();
  } else {
    //那種級別的Size
    SizeClass sizeClass = sizeClass(normCapacity);
    //加到緩存里
    if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
      return;
    }
    //將緩存對象標記為未使用     freeChunk(chunk, handle, sizeClass);   } }

  首先判斷是不是unpooled, 我們這里是Pooled, 所以會走到else 塊中:sizeClass(normCapacity)計算是哪種級別的size, 我們按照tiny 級別進行分析;cache.add(this, chunk, handle, normCapacity, sizeClass)是將當前當前ByteBuf 進行緩存;我們之前講過, 再分配ByteBuf 時首先在緩存上分配, 而這步, 就是將其緩存的過程, 繼續跟進去:

boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
  //拿到MemoryRegionCache 節點
  MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
  if (cache == null) {
    return false;
  }
  //將chunk, 和handle 封裝成實體加到queue 里面   return cache.add(chunk, handle); }

  首先根據根據類型拿到相關類型緩存節點, 這里會根據不同的內存規格去找不同的對象, 我們簡單回顧一下, 每個緩存對象都包含一個queue, queue 中每個節點是entry, 每一個entry 中包含一個chunk 和handle, 可以指向唯一的連續的內存,我們跟到cache 中:

 

private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
  switch (sizeClass) {
    case Normal:
        return cacheForNormal(area, normCapacity);
    case Small:
        return cacheForSmall(area, normCapacity);
    case Tiny:
        return cacheForTiny(area, normCapacity);
    default:
        throw new Error();
  }
}

  假設我們是tiny 類型, 這里就會走到cacheForTiny(area, normCapacity)方法中, 跟進去:

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
   int idx = PoolArena.tinyIdx(normCapacity);
   if (area.isDirect()) {
       return cache(tinySubPageDirectCaches, idx);
    }
    return cache(tinySubPageHeapCaches, idx);
}

  這個方法我們之前剖析過, 就是根據大小找到第幾個緩存中的第幾個緩存, 拿到下標之后, 通過cache 去超相對應的緩存對象:

private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
    if (cache == null || idx > cache.length - 1) {
        return null;
    }
    return cache[idx];
}

  我們這里看到, 是直接通過下標拿的緩存對象,回到add()方法中:

boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
  //拿到MemoryRegionCache 節點
  MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
  if (cache == null) {
    return false;
  }
  //將chunk, 和handle 封裝成實體加到queue 里面
  return cache.add(chunk, handle);
}

  這里的cache 對象調用了一個add 方法, 這個方法就是將chunk 和handle 封裝成一個entry 加到queue 里面,我們跟到add()方法中:

public final boolean add(PoolChunk<T> chunk, long handle) {
       Entry<T> entry = newEntry(chunk, handle);
       boolean queued = queue.offer(entry);
       if (!queued) {
            // If it was not possible to cache the chunk, immediately recycle the entry
            entry.recycle();
        }
        return queued;
}

  我們之前介紹過, 從在緩存中分配的時候從queue 彈出一個entry, 會放到一個對象池里面, 而這里Entry<T> entry =newEntry(chunk, handle) 就是從對象池里去取一個entry 對象, 然后將chunk 和handle 進行賦值, 然后通過queue.offer(entry)加到queue,我們回到free()方法中:

void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
  //是否為unpooled
  if (chunk.unpooled) {
    int size = chunk.chunkSize();
    destroyChunk(chunk);
    activeBytesHuge.add(-size);
    deallocationsHuge.increment();
  } else {
    //那種級別的Size
    SizeClass sizeClass = sizeClass(normCapacity);
    //加到緩存里
    if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
      return;
    }
    //將緩存對象標記為未使用
    freeChunk(chunk, handle, sizeClass);
  }
}

  這里加到緩存之后, 如果成功, 就會return, 如果不成功, 就會調用freeChunk(chunk, handle, sizeClass)方法, 這個方法的意義是, 將原先給ByteBuf 分配的內存區段標記為未使用,跟進freeChunk()簡單分析下:

void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
        final boolean destroyChunk;//銷毀標志
        synchronized (this) {
            switch (sizeClass) {//區分大小類型
            case Normal:
                ++deallocationsNormal;
                break;
            case Small:
                ++deallocationsSmall;
                break;
            case Tiny:
                ++deallocationsTiny;
                break;
            default:
                throw new Error();
            }//執行銷毀
            destroyChunk = !chunk.parent.free(chunk, handle);
        }
        if (destroyChunk) {
            // destroyChunk not need to be called while holding the synchronized lock.
            destroyChunk(chunk);
        }
}

  我們再跟到free()方法中:

boolean free(PoolChunk<T> chunk, long handle) {
    chunk.free(handle);
    if (chunk.usage() < minUsage) {
        remove(chunk);
        // Move the PoolChunk down the PoolChunkList linked-list.
        return move0(chunk);
    }
    return true;
}

  chunk.free(handle)的意思是通過chunk 釋放一段連續的內存,再跟到free()方法中:

void free(long handle) {
    int memoryMapIdx = memoryMapIdx(handle);
    int bitmapIdx = bitmapIdx(handle);

    if (bitmapIdx != 0) { // free a subpage
        PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
        assert subpage != null && subpage.doNotDestroy;

        // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
        // This is need as we may add it back and so alter the linked-list structure.
        PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
        synchronized (head) {
            if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
                return;
            }
        }
    }
    freeBytes += runLength(memoryMapIdx);
    setValue(memoryMapIdx, depth(memoryMapIdx));
    updateParentsFree(memoryMapIdx);
}

  if (bitmapIdx != 0)這里判斷是當前緩沖區分配的級別是Page 還是Subpage, 如果是Subpage, 則會找到相關的Subpage 將其位圖標記為0,如果不是subpage, 這里通過分配內存的反向標記, 將該內存標記為未使用。這段邏輯大家可以自行分析, 如果之前分配相關的知識掌握扎實的話, 這里的邏輯也不是很難。回到PooledByteBuf 的deallocate方法中:

protected final void deallocate() {
    if (handle >= 0) {
        final long handle = this.handle;
     //表示當前的ByteBuf 不再指向任何一塊內存
        this.handle = -1;
     //這里將memory 也設置為null
        memory = null;
     //這一步是將ByteBuf 的內存進行釋放
        chunk.arena.free(chunk, handle, maxLength, cache);
     //將對象放入的對象回收站, 循環利用
        recycle();
    }
}

  最后, 通過recycle()將釋放的ByteBuf 放入對象回收站,以上就是內存回收的大概邏輯。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM