使用 LinkedBlockingQueue 實現簡易版線程池


前一陣子在做聯系人的導入功能,使用POI組件解析Excel文件后獲取到聯系人列表,校驗之后批量導入。單從技術層面來說,導入操作通常情況下是一個比較耗時的操作,而且如果聯系人達到幾萬、幾十萬級別,必須拆分成為子任務來執行。綜上,可以使用線程池來解決問題。技術選型上,沒有采用已有的 ThreadPoolExecutor 框架,而使用了自制的簡易版線程池。該簡易版的線程池,其實也是一個簡易版的【生產者-消費者】模型,任務的加入就像是生產的過程,任務的處理就像是消費的過程。我們在這里不去討論方案的合理性,只是從技術層面總結一下在實現簡易版線程池的過程中,我所學到的知識。
 
代碼放在Github上,分享一下: https://github.com/Julius-Liu/threadpool

 

一、線程池設計

我們首先使用數組 ArrayList 來作為線程池的存儲結構,例如數組大小為10就意味着這是一個大小為10的線程池。然后我們使用 LinkedBlockingQueue(鏈式阻塞隊列)來存放線程的參數。示意圖如下:

 

當線程池里的線程初始化完成后,我們希望線程都處於【飢餓】狀態,隨時等待參數傳入,然后執行。所以,此時線程應該處於阻塞狀態,如下圖所示:
 
當我們將一個執行任務(一個參數)交給線程池以后,線程池會安排一個線程接收參數,這個線程會進入運行狀態。線程執行完以后,線程又會因為參數隊列為空而進入阻塞狀態。某線程的執行狀態如下圖所示,執行完的阻塞態,如上圖所示。

 

假設線程池中有3個線程,我們連續扔了3個參數給線程池,線程池會輪詢獲取線程,將參數塞給他們,然后這些線程會進入運行狀態。運行完成后回歸阻塞狀態。如下圖所示:

 

如下圖所示,假設線程池中只有3個線程,我們連續發8個參數給線程池,那么池會輪流分配參數。線程在收到參數后就會執行。“消耗”掉一個參數后,會繼續消耗下一個參數,直到參數列表為空為止。

 

二、為什么使用 LinkedBlockingQueue

1. BlockingQueue

我們必須先來說說為什么使用阻塞隊列 BlockingQueue。BlockingQueue 隊列為空時,嘗試獲取隊頭元素的操作會阻塞,一直等到隊列中有元素時再返回。這個阻塞的特性,正是我們需要的,我們可以讓線程一直等待元素插入,一旦插入立即執行。BlockingQueue 也支持在添加元素時,如果隊列已滿,那么等到隊列可以放入新元素時再放入。如此一來,我們交給線程池的任務就不會丟失,哪怕超過了隊列的容量。
 
所以我們定下方案,采用阻塞隊列來作為數據結構,然后我們來調研阻塞隊列常用的5種實現,看看選擇哪種實現來完成線程池。
 

2. ArrayBlockingQueue

ArrayBlockingQueue 是一個用數組實現的有界阻塞隊列,其內部按先進先出的原則對元素進行排序,其中put方法和take方法為添加和刪除的阻塞方法。可以說 ArrayBlockingQueue 是 阻塞隊列的最直觀的實現。
 

3. DelayQueue

DelayQueue是一個無界阻塞隊列,延遲隊列提供了在指定時間才能獲取隊列元素的功能,隊列頭元素是最接近過期的元素。沒有過期元素的話,使用poll()方法會返回null值,超時判定是通過getDelay(TimeUnit.NANOSECONDS)方法的返回值小於等於0來判斷。
 
DelayQueue阻塞隊列在我們系統開發中也常常會用到,例如緩存系統的設計。緩存中的對象,超過了空閑時間,需要從緩存中移出;例如任務調度系統,需要准確的把握任務的執行時間。我們可能需要通過線程處理很多時間上要求很嚴格的數據,如果使用普通的線程,我們就需要遍歷所有的對象,一個個檢查看數據是否過期。首先這樣在執行上的效率不會太高,其次就是這種設計的風格也大大的影響了數據的精度。一個需要12:00點執行的任務可能12:01 才執行,這樣對數據要求很高的系統有更大的弊端。使用 DelayQueue 可以做到精准觸發。
 
由上可知,延遲隊列不是我們需要的阻塞隊列實現。
 

4. LinkedBlockingQueue

LinkedBlockingQueue是一個由鏈表實現的有界隊列阻塞隊列,但大小默認值為Integer.MAX_VALUE,也可以在初始化的時候指定 capacity。和 ArrayBlockingQueue 一樣,其中put方法和take方法為添加和刪除的阻塞方法。
 

5. PriorityBlockingQueue

優先級阻塞隊列通過使用堆這種數據結構實現將隊列中的元素按照某種排序規則進行排序,從而改變先進先出的隊列順序,提供開發者改變隊列中元素的順序的能力。隊列中的元素必須是可比較的,即實現Comparable接口,或者在構建函數時提供可對隊列元素進行比較的Comparator對象。不可以放null,會報空指針異常,也不可放置無法比較的元素;add方法添加元素時,是自下而上的調整堆,取出元素時,是自上而下的調整堆順序。
 
我們放入參數隊列中的參數都是平級的,不涉及優先級,因此我們不考慮優先級阻塞隊列。
 

6. SynchronousQueue

同步隊列實際上不是一個真正的隊列,因為它不會為隊列中元素維護存儲空間。與其他隊列不同的是,它維護一組線程,這些線程在等待着把元素加入或移出隊列。同步隊列是輕量級的,不具有任何內部容量,我們可以用來在線程間安全的交換單一元素。
因為同步隊列沒有存儲功能,因此put和take會一直阻塞,直到有另一個線程已經准備好參與到交付過程中。僅當有足夠多的消費者,並且總是有一個消費者准備好獲取交付的工作時,才適合使用同步隊列。
 
應用場景,我們來看一下Java並發包里的 newCachedThreadPool 方法:
 1 package java.util.concurrent;
 2 
 3 /**
 4  * 帶有緩存的線程池
 5  */
 6 public static ExecutorService newCachedThreadPool() {
 7     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 8                                   60L, TimeUnit.SECONDS,
 9                                   new SynchronousQueue<Runnable>());
10 }

 

Executors.newCachedThreadPool() 方法返回的 ThreadPoolExecutor 實例,其內部的阻塞隊列使用的就是同步隊列。由於ThreadPoolExecutor內部實現任務提交的時候調用的是工作隊列的非阻塞式入隊列方法(offer方法),因此,在使用同步隊列作為工作隊列的前提下,客戶端代碼向線程池提交任務時,而線程池中又沒有空閑的線程能夠從同步隊列隊列實例中取一個任務,那么相應的offer方法調用就會失敗(即任務沒有被存入工作隊列)。此時,ThreadPoolExecutor會新建一個新的工作者線程用於對這個入隊列失敗的任務進行處理(假設此時線程池的大小還未達到其最大線程池大小)。
 
如上所述,同步隊列沒有內部容量來存放參數,因此我們不選擇同步隊列。
 

7. 阻塞隊列選擇

研究了阻塞隊列的5中實現以后,候選者就在 ArrayBlockingQueue 和 LinkedBlockingQueue 兩者中。其實要實現本文的簡易版線程池,使用數組阻塞隊列和鏈接阻塞隊列都可以,如果你要考慮一些極端情況下的性能問題,那么透徹的研究兩者的使用場景就非常有必要。數組阻塞隊列和鏈接阻塞隊列的成員變量和方法都很相似,相同點我們就先不說了。下面我們來看看兩者的不同點:
  1. 隊列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小,而LinkedBlockingQueue可以是有界的也可以是無界的(Integer.MAX_VALUE)。對於后者而言,當添加速度大於移除速度時,在無界的情況下,可能會造成內存溢出等問題。
  2. 數據存儲容器不同,ArrayBlockingQueue采用的是數組作為數據存儲容器,而LinkedBlockingQueue采用的則是以Node節點作為連接對象的鏈表。
  3. 由於ArrayBlockingQueue采用的是數組的存儲容器,因此在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而LinkedBlockingQueue則會生成一個額外的Node對象。這可能在長時間內需要高效並發地處理大批量數據的時,對於GC可能存在較大影響。
  4. 實現隊列添加或移除的鎖不一樣,ArrayBlockingQueue實現的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReentrantLock鎖,而LinkedBlockingQueue實現的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量,也意味着在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。
 

三、LinkedBlockingQueue 底層方法

我們來調研一下 LinkedBlockingQueue,看看哪些變量和方法可以使用。
先來看一下 LinkedBlockingQueue 的數據結構,有一個直觀的了解:

 

說明:
  1. LinkedBlockingQueue繼承於AbstractQueue,它本質上是一個FIFO(先進先出)的隊列。
  2. LinkedBlockingQueue實現了BlockingQueue接口,它支持多線程並發。當多線程競爭同一個資源時,某線程獲取到該資源之后,其它線程需要阻塞等待。
  3. LinkedBlockingQueue是通過單鏈表實現的。
    • head是鏈表的表頭。取出數據時,都是從表頭head處獲取。
    • last是鏈表的表尾。新增數據時,都是從表尾last處插入。
    • count是鏈表的實際大小,即當前鏈表中包含的節點個數。
    • capacity是列表的容量,它是在創建鏈表時指定的。
    • putLock是插入鎖,takeLock是取出鎖;notEmpty是“非空條件”,notFull是“未滿條件”。通過它們對鏈表進行並發控制。
 
我們來看一下 LinkedBlockingQueue 常用的變量:

 1 // 容量
 2 private final int capacity;
 3 
 4 // 當前數量
 5 private final AtomicInteger count = new AtomicInteger(0);
 6 
 7 // 鏈表的表頭
 8 transient Node<E> head; 
 9 
10 // 鏈表的表尾
11 private transient Node<E> last; 
12 
13 // 用於控制刪除元素的【取出鎖】和鎖對應的【非空條件】
14 private final ReentrantLock takeLock = new ReentrantLock();
15 private final Condition notEmpty = takeLock.newCondition();
16 
17 // 用於控制添加元素的【插入鎖】和鎖對應的【非滿條件】
18 private final ReentrantLock putLock = new ReentrantLock();
19 private final Condition notFull = putLock.newCondition();

 

這里的兩把鎖,takeLock 和 putLock,和兩個條件,notEmpty 和 notFull 是我們考察的重點。
LinkedBlockingQueue在實現“多線程對競爭資源的互斥訪問”時,對於“插入”和“取出(刪除)”操作分別使用了不同的鎖
  • 對於插入操作,通過 putLock(插入鎖)進行同步
  • 對於取出操作,通過 takeLock(取出鎖)進行同步
 
此外,插入鎖putLock和notFull(非滿條件)相關聯,取出鎖takeLock和notEmpty(非空條件)相關聯。通過notFull條件和notEmpty條件更細膩的控制putLock 和 takeLock。
 
舉例說明,若某線程(線程A)要取出數據時,隊列正好為空,則該線程會執行notEmpty.await()進行等待;當其它某個線程(線程B)向隊列中插入了數據之后,會調用notEmpty.signal()喚醒“notEmpty上的等待線程”。此時,線程A會被喚醒從而得以繼續運行。 此外,線程A在執行取數據前,會獲取takeLock,在取數據執行完畢再釋放takeLock。
 
若某線程(線程H)要插入數據時(put操作),隊列已滿,則該線程會它執行notFull.await()進行等待;當其它某個線程(線程I)取出數據之后,會調用notFull.signal()喚醒“notFull上的等待線程”。此時,線程H就會被喚醒從而得以繼續運行。 此外,線程H在執行插入操作前,會獲取putLock,在插入操作執行完畢才釋放putLock。
 

LinkedBlockingQueue 常用函數

 1 // 創建一個容量為 Integer.MAX_VALUE 的 LinkedBlockingQueue
 2 LinkedBlockingQueue()
 3 
 4 // 創建一個容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含給定 collection 的元素,元素按該 collection 迭代器的遍歷順序添加
 5 LinkedBlockingQueue(Collection<? extends E> c)
 6 
 7 // 創建一個具有給定(固定)容量的 LinkedBlockingQueue
 8 LinkedBlockingQueue(int capacity)
 9 
10 // 從隊列徹底移除所有元素
11 void clear()
12 
13 // 將指定元素插入到此隊列的尾部(如果立即可行且不會超出此隊列的容量),在成功時返回 true,如果此隊列已滿,則返回 false
14 boolean offer(E e)
15 
16 // 將指定元素插入到此隊列的尾部,如有必要,則等待指定的時間以使空間變得可用
17 boolean offer(E e, long timeout, TimeUnit unit)
18 
19 // 獲取但不移除此隊列的頭;如果此隊列為空,則返回 null
20 E peek()
21 
22 // 獲取並移除此隊列的頭,如果此隊列為空,則返回 null
23 E poll()
24 
25 // 獲取並移除此隊列的頭部,在指定的等待時間前等待可用的元素(如果有必要)
26 E poll(long timeout, TimeUnit unit)
27 
28 // 將指定元素插入到此隊列的尾部,如有隊列滿,則等待空間變得可用
29 void put(E e)
30 
31 // 返回理想情況下(沒有內存和資源約束)此隊列可接受並且不會被阻塞的附加元素數量
32 int remainingCapacity()
33 
34 // 從此隊列移除指定元素的單個實例(如果存在)
35 boolean remove(Object o)
36 
37 // 返回隊列中的元素個數
38 int size()
39 
40 // 獲取並移除此隊列的頭部,在元素變得可用之前一直等待(如果有必要)
41 E take()

 

我們看到 offer(E e) 和 put(E e) 都是往隊尾插入元素,poll() 和 take() 都是取出隊頭的元素,但是它們之間還是有細微的差別,我們接下來重點看看這4個方法的源碼。
 
下面來看一下 offer(E e) 的源碼:

 1 /**
 2  * 將指定元素插入到此隊列的尾部(如果立即可行且不會超出此隊列的容量)
 3  * 在成功時返回 true,如果此隊列已滿,則返回 false
 4  * 如果使用了有容量限制的隊列,推薦使用add方法,add方法在失敗的時候只是拋出異常
 5  */
 6 public boolean offer(E e) {
 7     if (e == null) throw new NullPointerException();
 8     final AtomicInteger count = this.count;
 9     if (count.get() == capacity)
10         // 如果隊列已滿,則返回false,表示插入失敗
11         return false;
12     int c = -1;
13     Node<E> node = new Node<E>(e);
14     final ReentrantLock putLock = this.putLock;
15     // 獲取 putLock
16     putLock.lock();
17     try {
18         // 再次對【隊列是不是滿】的進行判斷,如果不是滿的,則插入節點
19         if (count.get() < capacity) {
20             enqueue(node);                 // 在隊尾插入節點
21             c = count.getAndIncrement();   // 當前節點數量+1,並返回插入之前節點數量
22             if (c + 1 < capacity)
23                 // 如果在插入元素之后,隊列仍然未滿,則喚醒notFull上的等待線程
24                 notFull.signal();
25         }
26     } finally {
27         // 釋放 putLock
28         putLock.unlock();
29     }
30     if (c == 0)
31         // 如果在插入節點前,隊列為空,那么插入節點后,喚醒notEmpty上的等待線程
32         signalNotEmpty();
33     return c >= 0;
34 }

 

下面來看看 put(E e) 的源碼:

 1 /**
 2  * 將指定元素插入到此隊列的尾部,如有隊列滿,則等待空間變得可用
 3  *
 4  * @throws InterruptedException {@inheritDoc}
 5  * @throws NullPointerException {@inheritDoc}
 6  */
 7 public void put(E e) throws InterruptedException {
 8     if (e == null) throw new NullPointerException();
 9     
10     int c = -1;
11     Node<E> node = new Node<E>(e);
12     final ReentrantLock putLock = this.putLock;
13     final AtomicInteger count = this.count;
14     putLock.lockInterruptibly();    // 可中斷地獲取 putLock
15     try {
16         // count 變量是被 putLock 和 takeLock 保護起來的,所以可以真實反映隊列當前的容量情況
17         while (count.get() == capacity) {
18             notFull.await();
19         }
20         enqueue(node);                // 在隊尾插入節點
21         c = count.getAndIncrement();  // 當前節點數量+1,並返回插入之前節點數量
22         if (c + 1 < capacity)
23             // 如果在插入元素之后,隊列仍然未滿,則喚醒notFull上的等待線程
24             notFull.signal();
25     } finally {
26         putLock.unlock();             // 釋放 putLock
27     }
28     if (c == 0)
29         // 如果在插入節點前,隊列為空,那么插入節點后,喚醒notEmpty上的等待線程
30         signalNotEmpty();
31 }

 

兩者都用到了 signalNotEmpty(),下面來看一下源碼:

 1 /**
 2  * 通知一個等待的take。該方法應該僅僅從put/offer調用,否則一般很難鎖住takeLock
 3  */
 4 private void signalNotEmpty() {
 5     final ReentrantLock takeLock = this.takeLock;
 6     takeLock.lock();           // 獲取 takeLock
 7     try {
 8         notEmpty.signal();     // 喚醒notEmpty上的等待線程,意味着現在可以獲取元素了
 9     } finally {
10         takeLock.unlock();    // 釋放 takeLock
11     }
12 }

 

下面來看看 poll() 的源碼:

 1 /**
 2  * 獲取並移除此隊列的頭,如果此隊列為空,則返回 null
 3  */
 4 public E poll() {
 5     final AtomicInteger count = this.count;
 6     if (count.get() == 0)
 7         return null;
 8     E x = null;
 9     int c = -1;
10     final ReentrantLock takeLock = this.takeLock;
11     takeLock.lock();            // 獲取 takeLock
12     try {
13         if (count.get() > 0) {
14             x = dequeue();                 // 獲取隊頭元素,並移除
15             c = count.getAndDecrement();   // 當前節點數量-1,並返回移除之前節點數量
16             if (c > 1)
17                 // 如果在移除元素之后,隊列中仍然有元素,則喚醒notEmpty上的等待線程
18                 notEmpty.signal();
19         }
20     } finally {
21         takeLock.unlock();      // 釋放 takeLock
22     }
23     if (c == capacity)
24         // 如果在移除節點前,隊列是滿的,那么移除節點后,喚醒notFull上的等待線程
25         signalNotFull();
26     return x;
27 }

 

下面來看看 take() 的源碼:

 1 /**
 2  * 取出並返回隊列的頭。若隊列為空,則一直等待
 3  */
 4 public E take() throws InterruptedException { 
 5     E x; 
 6     int c = -1; 
 7     final AtomicInteger count = this.count; 
 8     final ReentrantLock takeLock = this.takeLock; 
 9     // 獲取 takeLock,若當前線程是中斷狀態,則拋出InterruptedException異常 
10     takeLock.lockInterruptibly(); 
11     try { 
12         // 若隊列為空,則一直等待
13        while (count.get() == 0) { 
14            notEmpty.await(); 
15        } 
16        x = dequeue();                  // 從隊頭取出元素 
17        c = count.getAndDecrement();    // 取出元素之后,節點數量-1;並返回移除之前的節點數量
18        if (c > 1) 
19            // 如果在移除元素之后,隊列中仍然有元素,則喚醒notEmpty上的等待線程
20            notEmpty.signal();
21     } finally { 
22         takeLock.unlock();             // 釋放 takeLock
23     } 
24     
25     if (c == capacity) 
26         // 如果在取出元素之前,隊列是滿的,就在取出元素之后,喚醒notFull上的等待線程
27         signalNotFull(); 
28     return x;
29 }

 

兩者都用到了signalNotFull(),signalNotFull()的源碼如下:

 1 /**
 2  * 喚醒notFull上的等待線程,只能從 poll 或 take 調用
 3  */
 4 private void signalNotFull() { 
 5     final ReentrantLock putLock = this.putLock; 
 6     putLock.lock();           // putLock 上鎖
 7     try { 
 8         notFull.signal();     // 喚醒notFull上的等待線程,意味着可以插入元素了
 9     } finally { 
10         putLock.unlock();    // putLock 解鎖
11     }
12 }

 

 
從上面的4個常用函數來看,我們想要在隊列為空的時候,將獲取這個動作阻塞,因此我們選擇【take方法】而不是【poll方法】。值得注意的是帶有參數的poll方法可以更精細地控制當隊列為空時,獲取動作阻塞多久。在本文中我們不考慮這種做法,直接讓獲取操作在 notEmpty 上等待。對於插入操作,我們采用【offer方法】而不是【put方法】,前者在隊列滿的時候返回false,后者在隊列滿的時候會在 notFull 上等待。在本文中,我們把線程池做的簡單一些,如果隊列滿就提示重試。
 

四、簡易版線程池代碼實現

具備了 LinkedBlockingQueue 的底層代碼解讀以后,我們來實現簡易版線程池。
其實在簡易版線程池初期,由於對 LinkedBlockingQueue 的底層方法不熟悉,因此對線程手動 wait 和上鎖。具體來說,根據隊列size的情況來決定線程是否要進入wait方法,然后在插入參數的時候,使用 synchronized 關鍵字鎖住整個隊列,再offer。這種做法,完全沒有考慮已有的 takeLock,putLock,notEmpty條件和notFull條件。所以后來仔細研究了鏈接阻塞隊列的特性,修改了線程池的實現,算是做了正確的事。
 

1. 注冊成為 Spring Bean

我們希望在Springboot 程序啟動的時候,將線程池初始化。可以使用 Spring 提供的 InitializingBean 接口的 afterPropertiesSet 方法,在所有基礎屬性初始化完成后,進行線程池的初始化。
 1 package cn.com.gkmeteor.threadpool.utils;
 2 
 3 @Component
 4 public class ThreadPoolUtil implements InitializingBean {
 5 
 6     public static int POOL_SIZE = 10;
 7 
 8     @Autowired
 9     private ThreadExecutorService threadExecutorService;   // 具體的線程處理類
10 
11     private List<ThreadWithQueue> threadpool = new ArrayList<>();
12 
13     /**
14      * 在所有基礎屬性初始化完成后,初始化當前類
15      *
16      * @throws Exception
17      */
18     @Override
19     public void afterPropertiesSet() throws Exception {
20         for (int i = 0; i < POOL_SIZE; i++) {
21             ThreadWithQueue threadWithQueue = new ThreadWithQueue(i, threadExecutorService);
22             this.threadpool.add(threadWithQueue);
23         }
24     }
25 }

 

2. 輪詢獲取一個線程

我們希望將任務輪流分給線程池中的線程。要實現這個比較簡單,直接兩行代碼搞定。
1 public static int POOL_SIZE = 10;  // 線程池容量
2 index = (++index) % POOL_SIZE;     // index 是當前選中的線程下標

 

3. 參數入隊和出隊,線程運行和阻塞

主要使用 queue.offer(param) 和 String param = queue.take() 這兩個方法,具體來看下面的代碼:

 1 package cn.com.gkmeteor.threadpool.utils;
 2 
 3 import cn.com.gkmeteor.threadpool.service.ThreadExecutorService;
 4 import org.slf4j.Logger;
 5 import org.slf4j.LoggerFactory;
 6 
 7 import java.util.concurrent.BlockingQueue;
 8 
 9 /**
10  * 帶有【參數阻塞隊列】的線程
11  */
12 public class ThreadWithQueue extends Thread {
13 
14     public static int CAPACITY = 10;
15 
16     private Logger logger = LoggerFactory.getLogger(ThreadWithQueue.class);
17 
18     private BlockingQueue<String> queue;
19 
20     private ThreadExecutorService threadExecutorService;    // 線程運行后的業務邏輯處理
21 
22     private String threadName;
23 
24     public String getThreadName() {
25         return threadName;
26     }
27 
28     public void setThreadName(String threadName) {
29         this.threadName = threadName;
30     }
31 
32     /**
33      * 構造方法
34      *
35      * @param i                     第幾個線程
36      * @param threadExecutorService 線程運行后的業務邏輯處理
37      */
38     public ThreadWithQueue(int i, ThreadExecutorService threadExecutorService) {
39         queue = new java.util.concurrent.LinkedBlockingQueue<>(CAPACITY);
40         threadName = "Thread(" + i + ")";
41 
42         this.threadExecutorService = threadExecutorService;
43 
44         this.start();
45     }
46 
47     /**
48      * 將參數放到線程的參數隊列中
49      *
50      * @param param 參數
51      * @return
52      */
53     public String paramAdded(String param) {
54         String result = "";
55         if(queue.offer(param)) {
56             logger.info("參數已入隊,{} 目前參數個數 {}", this.getThreadName(), queue.size());
57             result = "參數已加入線程池,等待處理";
58         } else {
59             logger.info("隊列已達最大容量,請稍后重試");
60             result = "線程池已滿,請稍后重試";
61         }
62         return result;
63     }
64 
65     public synchronized int getQueueSize() {
66         return queue.size();
67     }
68 
69     @Override
70     public void run() {
71         while (true) {
72             try {
73                 String param = queue.take();
74                 logger.info("{} 開始運行,參數隊列中還有 {} 個在等待", this.getThreadName(), this.getQueueSize());
75                 if (param.startsWith("contact")) {
76                     threadExecutorService.doContact(param);
77                 } else if (param.startsWith("user")) {
78                     threadExecutorService.doUser(param);
79                 } else {
80                     logger.info("參數無效,不做處理");
81                 }
82                 logger.info("{} 本次處理完成", this.getThreadName());
83             } catch (Exception e) {
84                 e.printStackTrace();
85             }
86         }
87     }
88 }

 

了解了鏈接阻塞隊列的底層方法后,使用起來就底氣十足。具體來說:

offer方法會往隊尾添加元素,如果隊列已滿,那么就會返回false,我在這時告訴調用者,線程池已滿,請稍后重試。
take方法會取出隊首元素,如果隊列為空則一直等待。所以當所有線程初始化完成后,第一次運行的時候都會阻塞在 String param = queue.take(),一旦有參數入隊,才會繼續執行。又因為 while(true) 循環,會不斷地take,根據隊列中參數的情況來運行或阻塞。
 

五、總結

本文使用 LinkedBlockingQueue 實現了一個簡易版的線程池,該線程池使用在聯系人導入的任務中。同時閱讀了鏈接阻塞隊列和數組阻塞隊列的源碼,對阻塞隊列有所了解,僅僅做到了會使用阻塞隊列。
 

六、參考資料

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM