「java.util.concurrent並發包」之 ThreadPoolExecutor


一 異步用new Thread? 大寫的"low"!!

new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
}).start();

你還在像上面這么用嗎,太low 了。弊端多多:

1.  每次new Thread新建對象性能差。

2. 線程缺乏統一管理,可能無限制新建線程,相互之間競爭,及可能占用過多系統資源導致死機或oom。

3. 缺乏更多功能,如定時執行、定期執行、線程中斷。

 

相比new Thread,Java提供的四種線程池的好處與此相對,在於:
1. 重用存在的線程,減少對象創建、消亡的開銷,性能佳。

2. 可有效控制最大並發線程數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。

3. 提供定時執行、定期執行、單線程、並發數控制等功能。

 

 

二 底層java.util.concurrent.ThreadPoolExecutor

無論創建哪種線程池 必須要調用ThreadPoolExecutor, 👇列舉一個構造方法 




當一個任務通過execute(Runnable)方法欲添加到線程池時: 

如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。 
如果此時線程池中的數量等於corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。 
如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。 
如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。 

也就是:處理任務的優先級為: 
核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。 

當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。 

拒絕策略:

 

三 幾種典型線程池的源碼分析

Java通過Executors提供四種線程池,分別為(juc.Executors包下):

newCachedThreadPool      創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程(60s不執行任務),若無可回收,則新建線程。
newFixedThreadPool         創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。 
newScheduledThreadPool  創建一個定長線程池,支持定時及周期性任務執行。
newSingleThreadExecutor  創建一個單線程化的線程池,它只會用唯一的工作線程來串行執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。

 1. FixedThreadPool

  • FixedThreadPool的corePoolSize和maxiumPoolSize都被設置為創建FixedThreadPool時指定的參數nThreads。
  • FixedThreadPool使用了無界隊列LinkedBlockingQueue作為線程池的工作隊列,由於是無界的,當線程池的線程數達到corePoolSize后,新任務將在無界隊列中等待,因此線程池的線程數量不會超過corePoolSize,同時maxiumPoolSize也就變成了一個無效的參數,並且運行中的線程池並不會拒絕任務。
  • FixedThreadPool運行圖如下

    執行過程如下:

    1.如果當前工作中的線程數量少於corePool的數量,就創建新的線程來執行任務。

    2.當線程池的工作中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。

    3.線程執行完1中的任務后會從隊列中取任務。

    注意LinkedBlockingQueue是無界隊列,所以可以一直添加新任務到線程池。

 

2. SingleThreadExecutor  

SingleThreadExecutor是使用單個worker線程的Executor。特點是使用單個工作線程執行任務。它的構造源碼如下:

SingleThreadExecutor的corePoolSize和maxiumPoolSize都被設置1。
其他參數均與FixedThreadPool相同,其運行圖如下:

 

執行過程如下:

1.如果當前工作中的線程數量少於corePool的數量,就創建一個新的線程來執行任務。

2.當線程池的工作中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。

3.線程執行完1中的任務后會從隊列中取任務。

注意:由於在線程池中只有一個工作線程,所以任務可以按照添加順序執行。

 

 3. CachedThreadPool

 CachedThreadPool是一個”無限“容量的線程池,它會根據需要創建新線程。特點是可以根據需要來創建新的線程執行任務,沒有特定的corePool。下面是它的構造方法:

CachedThreadPool的corePoolSize被設置為0,即corePool為空;maximumPoolSize被設置為Integer.MAX_VALUE,即maximum是無界的。這里keepAliveTime設置為60秒,意味着空閑的線程最多可以等待任務60秒,否則將被回收。
 
CachedThreadPool使用沒有容量的SynchronousQueue作為主線程池的工作隊列,它是一個沒有容量的阻塞隊列。每個插入操作必須等待另一個線程的對應移除操作。這意味着,如果主線程提交任務的速度高於線程池中處理任務的速度時,CachedThreadPool會不斷創建新線程。極端情況下,CachedThreadPool會因為創建過多線程而耗盡CPU資源。其運行圖如下:

 

執行過程如下:

1.首先執行SynchronousQueue.offer(Runnable task)。如果在當前的線程池中有空閑的線程正在執行SynchronousQueue.poll(),那么主線程執行的offer操作與空閑線程執行的poll操作配對成功,主線程把任務交給空閑線程執行。execute()方法執行成功,否則執行步驟2

2.當線程池為空(初始maximumPool為空)或沒有空閑線程時,配對失敗,將沒有線程執行SynchronousQueue.poll操作。這種情況下,線程池會創建一個新的線程執行任務。

3.在創建完新的線程以后,將會執行poll操作。當步驟2的線程執行完成后,將等待60秒,如果此時主線程提交了一個新任務,那么這個空閑線程將執行新任務,否則被回收。因此長時間不提交任務的CachedThreadPool不會占用系統資源。

SynchronousQueue是一個不存儲元素阻塞隊列,每次要進行offer操作時必須等待poll操作,否則不能繼續添加元素。

 

 

 4. 手動調用new ThreadPoolExecutor

 

 

 

四 引申的幾個問題 

1. 無限大CachedThreadPool的OOM

     ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 1; i <= 2100; i++) {
            executorService.submit(() -> {
                try {
                    Thread.sleep(5000);
                } catch (Exception e) {
                    //ignore
                }
            });
        }
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.java:717)
    at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.balfish.hotel.train.concurrent.countDownLatch.CountDownLatchMain.main(CountDownLatchMain.java:31)

可以看出來是堆外內存溢出,因為我們新建的線程都在工作(代碼中用sleep模擬在工作中),newCachedThreadPool 只會重用空閑並且可用的線程,所以上述代碼只能不停地創建新線程,

在 64-bit JDK 1.7 中 -Xss 默認是 1024k,也就是 1M,那就是需要 2100*1M = 2.1G 的堆外內存空間來給線程使用,機器內存分配不夠創建新的線程,所以就 OOM 了。

newCachedThreadPool最大值初始化時默認為Integer.MAX_VALUE,一般來說機器都沒那么大內存給它不斷使用。那么我們一般去重寫一個方法限制一下這個最大值,或者看下newFixedThreadPool是否滿足

 

2. ArrayBlockingQueue 和 LinkedBlockingQueue 的區別

(1)一把鎖 vs 分離鎖

ArrayBlockingQueue中的鎖是沒有分離的,即生產者和消費者用的是一個鎖

LinkedBlockingQueue的鎖是分離的,即生產用的是putLock,消費用的是takeLock

(2)數組 vs 鏈表

ArrayBlockingQueue基於數組,生產和消費的時候,直接將枚舉對象插入或移除,不會有額外的對象實例的空間開銷

LinkedBlockingQueue基於鏈表,生產和消費的時候,需要把枚舉轉換為Node<E>進行插入或移除,有額外的Node對象開銷。這在大批量並發處理數據時,對GC有一定影響

(3)隊列長度

ArrayBlockingQueue是有界的,必須指定隊列大小

LinkedBlockingQueue是無界的,可以不指定隊列的大小,默認是Integer.MAX_VALUE。(也可以指定隊列大小,從而成為有界的)

(4)隊列效率

ArrayBlockingQueue快。LinkedBlockingQueue用默認大小且生產速度大於消費速度時候,可能會OOM

 

3. shutdown和shutdownNow

可以調用線程池的shutdown或者shutdownNow方法來關閉線程池。他們的原理是遍歷線程池的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法停止

區別:

shutdown方法將執行平緩的關閉過程:不再接收新的任務,同時等待已提交的任務執行完成, 包括那些還未開始執行的任務。

shutdownNow方法將執行粗暴的關閉過程:它將嘗試取消所有運行中的任務,並且不再啟動隊列中尚未開始執行的任務。

只要調用了這兩個關閉方法中的任意一個,isShutdown方法就會返回true,當所有的任務都已關閉后,才表示線程池關閉成功,這時調用isTerminated方法會返回true。至於應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown方法來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow方法。

 

4. 繼承覆寫ThreadPoolExecutor

可以通過繼承線程池來自定義線程池,重寫線程池的beforeExecute, afterExecute和terminated方法。在執行任務的線程中將調用beforeExecute和afterExecute等方法,在這些方法中還可以添加日志、計時、監視或者統計信息收集的功能。無論任務是從run中正常返回,還是拋出一個異常而返回,afterExecute都會被調用。如果任務在完成后帶有一個Error,那么就不會調用afterExecute。如果beforeExecute拋出一個RuntimeException,那么任務將不被執行,並且afterExecute也不會被調用。在線程池完成關閉時調用terminated,也就是在所有任務都已經完成並且所有工作者線程也已經關閉后,terminated可以用來釋放Executor在其生命周期里分配的各種資源,此外還可以執行發送通知、記錄日志或者finalize統計等操作


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM