JAVA線程池ThreadPoolExecutor的分析和使用(新手踩坑和推薦方案)


一、Java中的ThreadPoolExecutor類

java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現源碼。
在ThreadPoolExecutor類中提供了四個構造方法:

 1 public class ThreadPoolExecutor extends AbstractExecutorService {
 2     .....
 3     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
 4             BlockingQueue<Runnable> workQueue);
 5  
 6     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
 7             BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 8  
 9     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
10             BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
11  
12     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
13         BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
14     ...
15 }
View Code

從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,通過觀察每個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工作。
下面解釋下一下構造器中各個參數的含義:
corePoolSize:核心池的大小,這個參數跟后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;
maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程;
keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性。
workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:
    ArrayBlockingQueue  有界隊列。當使用有限的maximumPoolSizes時,有界隊列有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度的降低CPU使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞,則系統可能為超過您許可的更多線程安排時間,使用小型隊列通常要求較大的池大小,CPU使用率較高,但是可能遇到不可接受的調度開銷,這樣可會降低吞吐量。
    LinkedBlockingQueue  無界隊列。使用無界隊列將導致在所有corePoolSize線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過corePoolSize(因此,maximumPoolSize的值也就無效了)。
    SynchronousQueue  直接提交。它將任務直接提交給線程而不保持它們。在此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界maximumPoolSizes以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增加的可能性。

threadFactory:線程工廠,主要用來創建線程;

handler:表示當拒絕處理任務時的策略,有以下四種取值:

    ThreadPoolExecutor.AbortPolicy  默認策略,新任務提交時直接拋出未檢查的異常RejectedExecutionException,該異常可由調用者捕獲。

    ThreadPoolExecutor.DiscardPolicy  新提交的任務被拋棄但不拋出異常

    ThreadPoolExecutor.DiscardOldestPolicy  丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)

    ThreadPoolExecutor.CallerRunsPolicy  為調節機制,既不拋棄任務也不拋出異常,而是將某些任務回退到調用者。不會在線程池的線程中執行新的任務,而是在調用exector的線程中運行新的任務。

二、ThreadPoolExecutor的坑
    1)  ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    一般人會以為如果請求的任務超過3個時,就會自動創建6個線程來執行,其實此線程池最多只會產生3個線程,為什么?
    當前線程數優先與corePoolSize 比較,大於corePoolSize ,則與workQueue容量(capacity)比較,但此處LinkedBlockingQueue默認構造器的capacity值為Integer.MAX_VALUE,你產生的任務數永遠不會超過capacity值,那么新任務會一直加入workQueue,而不會創建新的線程,所以你會看到線程池里的線程數永遠為3,而不是你期望的6。

三、使用建議

    1) 服務器有超大內存,想節省CPU(線程池核心作務數為3,新任務無限加入workQueue,但是內存會一直暴漲,線程池中線程數永遠控制在3個以節省CPU)
        this.threadPoolExecutor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

    2) 服務器CPU資源豐富(線程池核心作務數為10,workQueue緩存的任務控制在20,超過20線程池就會創建新線程最多到100個,但是CPU使用率會暴漲。注:如果線程數量達到100,則新安排的任務根據策略CallerRunsPolicy則會在調用者線程中同步執行)
        this.threadPoolExecutor = new ThreadPoolExecutor(10, 100, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20));
        this.threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    3) 均衡方案--推薦(線程池核心作務數為3,workQueue緩存的任務控制在20,超過20線程池就會創建新線程最多到6個,如果線程數量達到6,則新安排的任務根據策略CallerRunsPolicy則會在調用者線程中同步執行)
        this.threadPoolExecutor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20));
        this.threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM