Netty輕量級對象池實現分析


什么是對象池技術?對象池應用在哪些地方?

對象池其實就是緩存一些對象從而避免大量創建同一個類型的對象,類似線程池的概念。對象池緩存了一些已經創建好的對象,避免需要時才創建對象,同時限制了實例的個數。池化技術最終要的就是重復的使用池內已經創建的對象。從上面的內容就可以看出對象池適用於以下幾個場景:

  1. 創建對象的開銷大
  2. 會創建大量的實例
  3. 限制一些資源的使用

如果創建一個對象的開銷特別大,那么提前創建一些可以使用的並且緩存起來(池化技術就是重復使用對象,提前創建並緩存起來重復使用就是池化)可以降低創建對象時的開銷。

會大量創建實例的場景,重復的使用對象可減少創建的對象數量,降低GC的壓力(如果這些對象的生命周期都很短暫,那么可以降低YoungGC的頻率;如果生命周期很長,那么可以避免掉這些對象被FullGC——生命周期長,且大量創建,這里就要結合系統的TPS等考慮池的大小了)。

對於限制資源的使用更多的是一種保護策略,比如數據庫鏈接池。除去這些對象本身的開銷外,他們對外部系統也會造成壓力,比如大量創建鏈接對DB也是有壓力的。那么池化除了優化資源以外,本身限制了資源數,對外部系統也起到了一層保護作用。

 

如何實現對象池?

開源實現Apache Commons Pool

自己實現:Netty輕量級對象池實現

Apache Commons Pool開源軟件庫提供了一個對象池API和一系列對象池的實現,支持各種配置,比如活躍對象數或者閑置對象個數等。DBCP數據庫連接池基於Apache Commons Pool實現。

Netty自己實現了一套輕量級的對象池。在Netty中,通常會有多個IO線程獨立工作,基於NioEventLoop的實現,每個IO線程輪詢單獨的Selector實例來檢索IO事件,並在IO來臨時開始處理。最常見的IO操作就是讀寫,具體到NIO就是從內核緩沖區拷貝數據到用戶緩沖區或者從用戶緩沖區拷貝數據到內核緩沖區。這里會涉及到大量的創建和回收BufferNetty對Buffer進行了池化從而降低系統開銷。

 

Netty對象池實現分析 

上面提到了IO操作中會涉及到大量的緩沖區操作,NIO提供了兩種Buffer最為緩沖區:DirectByteBufferHeapByteBuffer。Netty在兩種緩沖區的基礎上進行了池化進而提升性能。

 

DirectByteBuffer

DirectByteBuffer顧名思義是直接內存(Direct Memory)上的Byte緩存區,直接內存不是JVM Runtime數據區域的一部分,也不是Java虛擬機規范中定義的內存區域。簡單的說這部分就是機器內存,分配的大小等都和虛擬機限制無關。JDK1.4中開始我們可以使用native方法在直接內存上來分配內存,並在JVM堆內存上維持一個引用來進行訪問,當JVM堆內存上的引用被回收后,這塊內存被操作系統回收。

HeapByteBuffer

HeapByteBuffer是在JVM堆內存上分配的Byte緩沖區,可以簡單的理解為byte[]數組的一種封裝。基於HeapByteBuffer的寫流程通常要先在直接內存上分配一個臨時的緩沖區,將數據從Heap拷貝到直接內存,然后再將直接內存的數據發送到IO設備的緩沖區,之后回收直接內存。讀流程也類似。使用DirectByteBuffer避免了不必要的拷貝工作,所以在性能上會有提升。

DirectByteBuffer的缺點在於分配和回收的的代價相對較大,因此DirectByteBuffer適用於緩沖區可以重復使用的場景。

 

Netty的池化實現

Buffer為例,對應直接內存和堆內存,Netty的池化分別為PooledDirectByteBufferPolledHeapByteBuffer

 

通過PooledDirectByteBuffer的API定義可以看到,它的構造方法是私有的,而創建一個實例的入口是:

1  static PooledDirectByteBuf newInstance(int maxCapacity) {
2 
3         PooledDirectByteBuf buf = RECYCLER.get();
4 
5         buf.reuse(maxCapacity);
6 
7         return buf;
8 
9

可見RECYCLER是池化的核心,創建對象時都通過RECYCLER.get來獲得一個實例(Recycler就是Netty實輕量級池化技術的核心)。

 

Recycler實現分析(源碼分析)

 1 /**
 2 
 3  * Light-weight object pool based on a thread-local stack.
 4 
 5  *
 6 
 7  * @param <T> the type of the pooled object
 8 
 9  */
10 
11 public abstract class Recycler<T>

從注釋可以看出Netty基於thread-local實現了輕量級的對象池。

 

Recycler的API非常簡單:

get():獲取一個實例

recycle(T, Handle<T>):回收一個實例

newObject(Handle<T>)創建一個實例

 

get流程

 1     public final T get() {
 2 
 3         if (maxCapacity == 0) {
 4 
 5             return newObject((Handle<T>) NOOP_HANDLE);
 6 
 7         }
 8 
 9         Stack<T> stack = threadLocal.get();
10 
11         DefaultHandle<T> handle = stack.pop();
12 
13         if (handle == null) {
14 
15             handle = stack.newHandle();
16 
17             handle.value = newObject(handle);
18 
19         }
20 
21         return (T) handle.value;
22 
23     }

get的簡化流程(這里先不深究細節):

  1. 拿到當前線程對應的stack
  2. 從stack中pop出一個元素
  3. 如果不為空則返回,否則創建一個新的實例

可以大概明白Stack是對象池化背后存儲實例的數據結構:如果能從stack中拿到可用的實例就不再創建新的實例。

recycle流程

一個“池子”最核心的就是做兩件事情,第一個是上面的Get,即從池子中拿出一個可用的實例。另一個就是在用完后將數據放回到池子中(線程池、連接池都是這樣)。

 1 public final boolean recycle(T o, Handle<T> handle) {
 2 
 3         if (handle == NOOP_HANDLE) {
 4 
 5             return false;
 6 
 7         }
 8 
 9  
10 
11         DefaultHandle<T> h = (DefaultHandle<T>) handle;
12 
13         if (h.stack.parent != this) {
14 
15             return false;
16 
17         }
18 
19  
20 
21         h.recycle(o);
22 
23         return true;
24 
25     }
26 
27  
28 
29 ----------------------------------------------------------------------------------------
30 
31 public void recycle(Object object) {
32 
33         if (object != value) {
34 
35             throw new IllegalArgumentException("object does not belong to handle");
36 
37         }
38 
39         Thread thread = Thread.currentThread();
40 
41         if (thread == stack.thread) {
42 
43             stack.push(this);
44 
45             return;
46 
47         }
48 
49         // we don't want to have a ref to the queue as the value in our weak map
50 
51         // so we null it out; to ensure there are no races with restoring it later
52 
53         // we impose a memory ordering here (no-op on x86)
54 
55         Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
56 
57         WeakOrderQueue queue = delayedRecycled.get(stack);
58 
59         if (queue == null) {
60 
61             delayedRecycled.put(stack, queue = new WeakOrderQueue(stack, thread));
62 
63         }
64 
65         queue.add(this);
66 
67     }

回收一個實例核心的步驟由以上兩個方法組成:Recycler的recycle方法和DefaultHandle的recycle方法。

Recycler的recycle方法主要做了一些參數驗證。

DefaultHandle的recycle方法流程如下:

  1. 如果當前線程是當前stack對象的線程,那么將實例放入stack中,否則:
  2. 獲取當前線程對應的Map<Stack, WeakOrderQueue>,並將實例加入到Stack對應的Queue中。

 

從獲取實例和回收實例的代碼可以看出,整個對象池的核心實現由ThreadLocal和Stack及WrakOrderQueue構成,接着來看Stack和WrakOrderQueue的具體實現,最后概括整體實現。

 

Stack實體

Stack<T>

  parent:Recycler                                    // 關聯對應的Recycler

  thread:Thread                                      // 對應的Thread

  elements:DefaultHandle<?>[]           // 存儲DefaultHandle的數組

  head:WeakOrderQueue                     // 指向WeakOrderQueue元素組成的鏈表的頭部“指針”

  cursor,prev:WrakOrderQueue          // 當前游標和前一元素的“指針”

 

pop實現

 1 DefaultHandle<T> pop() {
 2 
 3         int size = this.size;
 4 
 5         if (size == 0) {
 6 
 7             if (!scavenge()) {
 8 
 9                 return null;
10 
11             }
12 
13             size = this.size;
14 
15         }
16 
17         size --;
18 
19         DefaultHandle ret = elements[size];
20 
21         if (ret.lastRecycledId != ret.recycleId) {
22 
23             throw new IllegalStateException("recycled multiple times");
24 
25         }
26 
27         ret.recycleId = 0;
28 
29         ret.lastRecycledId = 0;
30 
31         this.size = size;
32 
33         return ret;
34 
35     }
  1. 如果size0(這里的size表示stack中可用的元素),嘗試進行scavenge。
  2. 返回elements中的最后一個元素。
 1 boolean scavenge() {
 2             // continue an existing scavenge, if any
 3             if (scavengeSome()) {
 4                 return true;
 5             }
 6 
 7             // reset our scavenge cursor
 8             prev = null;
 9             cursor = head;
10             return false;
11         }
12 
13         boolean scavengeSome() {
14             WeakOrderQueue cursor = this.cursor;
15             if (cursor == null) {
16                 cursor = head;
17                 if (cursor == null) {
18                     return false;
19                 }
20             }
21 
22             boolean success = false;
23             WeakOrderQueue prev = this.prev;
24             do {
25                 if (cursor.transfer(this)) {
26                     success = true;
27                     break;
28                 }
29 
30                 WeakOrderQueue next = cursor.next;
31                 if (cursor.owner.get() == null) {
32                     // If the thread associated with the queue is gone, unlink it, after
33                     // performing a volatile read to confirm there is no data left to collect.
34                     // We never unlink the first queue, as we don't want to synchronize on updating the head.
35                     if (cursor.hasFinalData()) {
36                         for (;;) {
37                             if (cursor.transfer(this)) {
38                                 success = true;
39                             } else {
40                                 break;
41                             }
42                         }
43                     }
44                     if (prev != null) {
45                         prev.next = next;
46                     }
47                 } else {
48                     prev = cursor;
49                 }
50 
51                 cursor = next;
52 
53             } while (cursor != null && !success);
54 
55             this.prev = prev;
56             this.cursor = cursor;
57             return success;
58         }

簡要概括上面的流程就是Stack從“背后”的Queue中獲取可用的實例,如果Queue中沒有可用實例就遍歷到下一個Queue(Queue組成了一個鏈表)。

 push實現

 1 void push(DefaultHandle<?> item) {
 2             if ((item.recycleId | item.lastRecycledId) != 0) {
 3                 throw new IllegalStateException("recycled already");
 4             }
 5             item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
 6 
 7             int size = this.size;
 8             if (size >= maxCapacity) {
 9                 // Hit the maximum capacity - drop the possibly youngest object.
10                 return;
11             }
12             if (size == elements.length) {
13                 elements = Arrays.copyOf(elements, Math.min(size << 1, maxCapacity));
14             }
15 
16             elements[size] = item;
17             this.size = size + 1;
18         }

push相對pop流程要更加簡單,直接將回收的元素放到隊尾(實際是一個數組)。

 

WeakOrderQueue實體

WeakOrderQueue

  head,tail:Link                       // 內部元素的指針(WeakOrderQueue內部存儲的是一個Link的鏈表)

  next:WeakOrderQueue     // 指向下一個WeakOrderQueue的指針

  owner:Thread                      // 對應的線程

WeakOrderQueue核心包含兩個方法,add方法將元素添加到自身的“隊列”中,transfer方法將自己擁有的元素“傳輸”到Stack中。

 

Linke結構如下

1 private static final class Link extends AtomicInteger {
2             private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
3 
4             private int readIndex;
5             private Link next;
6         }

Link內部包含了一個數組用於存放實例,同時標記了讀取位置的索引和下一個Link元素的指針。

結合Link的結構,Weak的結構如下:

add方法

 1 void add(DefaultHandle<?> handle) {
 2 
 3         handle.lastRecycledId = id;
 4 
 5  
 6 
 7         Link tail = this.tail;
 8 
 9         int writeIndex;
10 
11         if ((writeIndex = tail.get()) == LINK_CAPACITY) {
12 
13             this.tail = tail = tail.next = new Link();
14 
15             writeIndex = tail.get();
16 
17         }
18 
19         tail.elements[writeIndex] = handle;
20 
21         handle.stack = null;
22 
23         // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
24 
25         // this also means we guarantee visibility of an element in the queue if we see the index updated
26 
27         tail.lazySet(writeIndex + 1);
28 
29     }

 

add操作將元素添加到tail指向的Link對象中,如果Link已滿則創建一個新的Link實例。

 

transfer方法

 1 boolean transfer(Stack<?> dst) {
 2 
 3             Link head = this.head;
 4             if (head == null) {
 5                 return false;
 6             }
 7 
 8             if (head.readIndex == LINK_CAPACITY) {
 9                 if (head.next == null) {
10                     return false;
11                 }
12                 this.head = head = head.next;
13             }
14 
15             final int srcStart = head.readIndex;
16             int srcEnd = head.get();
17             final int srcSize = srcEnd - srcStart;
18             if (srcSize == 0) {
19                 return false;
20             }
21 
22             final int dstSize = dst.size;
23             final int expectedCapacity = dstSize + srcSize;
24 
25             if (expectedCapacity > dst.elements.length) {
26                 final int actualCapacity = dst.increaseCapacity(expectedCapacity);
27                 srcEnd = Math.min(srcStart + actualCapacity - dstSize, srcEnd);
28             }
29 
30             if (srcStart != srcEnd) {
31                 final DefaultHandle[] srcElems = head.elements;
32                 final DefaultHandle[] dstElems = dst.elements;
33                 int newDstSize = dstSize;
34                 for (int i = srcStart; i < srcEnd; i++) {
35                     DefaultHandle element = srcElems[i];
36                     if (element.recycleId == 0) {
37                         element.recycleId = element.lastRecycledId;
38                     } else if (element.recycleId != element.lastRecycledId) {
39                         throw new IllegalStateException("recycled already");
40                     }
41                     element.stack = dst;
42                     dstElems[newDstSize ++] = element;
43                     srcElems[i] = null;
44                 }
45                 dst.size = newDstSize;
46 
47                 if (srcEnd == LINK_CAPACITY && head.next != null) {
48                     this.head = head.next;
49                 }
50 
51                 head.readIndex = srcEnd;
52                 return true;
53             } else {
54                 // The destination stack is full already.
55                 return false;
56             }
57         }

 

transfer方法收件根據stack的容量和自身擁有的實例數,計算出最終需要轉移的實例數。之后就是數組的拷貝和指標的調整。

基本上所有的流程有個大致的了解,下面從整體的角度回顧一下Netty對象池的實現。

 

整體實現

結構

整個設計上核心的幾點:

  1. Stack相當於是一級緩存,同一個線程內的使用和回收都將使用一個Stack
  2. 每個線程都會有一個自己對應的Stack,如果回收的線程不是Stack的線程,將元素放入到Queue中
  3. 所有的Queue組合成一個鏈表,Stack可以從這些鏈表中回收元素(實現了多線程之間共享回收的實例)

 

 引用自網上的一幅圖。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM