ThreadPoolExecutor詳解


一、源碼分析(基於JDK1.6)

  ThreadExecutorPool是使用最多的線程池組件,了解它的原始資料最好是從從設計者(Doug Lea)的口中知道它的來龍去脈。在Jdk1.6中,ThreadPoolExecutor直接繼承了AbstractExecutorService, 並層級實現了ExecutorService和Executor接口。

1.Executor

  Executor是用來執行提交的Runnable任務的對象,並以接口的形式定義,提供一種提交任務(submission task)與執行任務(run task)之間的解耦方式,還包含有線程使用與周期調度的詳細細節等。Executor常常用來代替早期的線程創建方式,如new Thread(new(RunnableTask())).start(),在實際中可以用如下的方式來提交任務到線程池里,Executor會自動執行 你的任務.

  1. Executor executor = anExecutor;  
  2. executor.execute(new RunnableTask1());  

  Executor接口中定義的方法如下:

  1. /** 
  2. 在接下來的某個時刻執行提交的command任務。由於Executor不同的實現,執行的時候可能在一個新線程中或由一個線程池里的線程執行,還可以是由調用者線程執行 
  3. */  
  4. void execute(Runnable command);  

  2.ExecutorService

  ExecutorService接口擴展了Executor,提供管理線程池終止的一組方法,還提供了產生Future的方法,Future是用於追蹤一個或多個異步任務的對象,並能返回異步任務的計算結果。

  ExecutorService關閉后將不再接收新的任務,ExecutorService提供了兩種不同類型的關閉方法,shutdown方法允許執行完之前提交的任務才終止,而shutdownNow將不再執行等待的任務,並試圖終止當前執行的任務。ExecutorService終止后,內部已沒有活動的任務,沒有等待的任務,也不能再提交新任務,沒有使用ExecutorService需要回收相應的資源。

  submit方法是基於Executor.execute()方法之上的,通過創建並返回一個Future對象就可以實現取消執行或者等待執行完成。invokeAny和invokeAll通常是用於批量執行,可以提交一個task集合並等待task的逐個完成。

  ExecutorService接口中定義的方法如下:

  1. //不再接收新的task,執行完之前提交的task后,開始有序的終止線程。 
  2. void shutdown();  
  3. /** 
  4. 試圖終止所有活動的執行任務,停止對等待任務的處理,並返回待執行的Runnable列表。 
  5. 但是,不能保證能夠終止掉所有的正在執行的任務。比如,在典型的實現中會調用Thread.interupt來作取消,但是一些不能響應中斷的task將永遠不會被終止 
  6. */  
  7. List<Runnable> shutdownNow();  
  8. //如果Executor調用shutdown或者shutdownNow將返回true
  9. boolean isShutdown(); 
  10. //所有的任務都關閉后,線程池才會關閉成功。屆時返回true
  11. boolean isTerminated(); 
  12. /** 
  13. 發起關閉請求后,將一直阻塞等待關閉,直到所有的task已執行完成。 
  14. 如果超時返回,或者當前線程中斷時, 則返回false 
  15. */  
  16. boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;  
  17. //提交一個等待返回值的task,返回的Future表示task執行后的待定結果。執行成功后,Future的get方法將返回實際的結果。
  18. <T> Future<T> submit(Callable<T> task);  
  19. <T> Future<T> submit(Runnable task, T result);  
  20. //提交一個Runnable任務並返回一個Future。不過Future.get方法將返回null
  21. Future<?> submit(Runnable task);  
  22. // 執行提交的task集合。當執行完成后,返回task各自的Future。對應返回的Future集合,Future.isDone方法將返回true
  23. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>tasks)throws InterruptedException;  
  24. //執行提交的task集合,返回一個task成功執行后的結果,而其他沒有執行完成的task將被取消
  25. <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;     

  3. ThreadPoolExecutor

  ThreadPoolExecutor使用線程池里的線程來執行每個提交的task,一般通過Executors的工廠方法來創建並配置相應的參數。

  線程池處理了兩個問題:1.改善了性能。當執行大量的異步任務時,減少了每次調用的開銷。2.提供了一直資源管理的方式。線程池內部的線程可以得到有效的復用。每個ThreadPoolExecutor內部還維護了一些基礎的統計量,如完成的任務總數.ThreadPoolExecutor在很多場景下都有其用武之地,它提供了很多可調整的參數與可擴展的鈎子方法。一般建議用Executors提供的便利的工廠方法來創建相應的ThreadPoolExecutor。如可以創建無限多線程newCachedThreadPool方法,創建固定大小的newFixedThreadPool方法,還有創建單個后台線程newSingleThreadExecutor的方法。這些預定義的方法能滿足大 分的使用場景,然而當需要手動配置與調整線程池時,則需要知曉內部的究竟。

  Ø 核心線程池大小(corePoolSize)與最大線程池大小(maximumPoolSize)

  ThreadPoolExecutor會自動調節池子里線程的大小。通過execute方法提交新任務時,如果當前的池子里線程的大小小於核心線程 corePoolSize時,則會創建新線程來處理新的請求,即使當前的工作者線程是空閑的。如果運行的線程數是大於corePoolSize但小於 maximumPoolSize,而且當任務隊列已經滿了時,則會創建新線程。通過設置corePoolSize等於maximumPoolSize,便 可以創建固定大小的線程池數量。而設置maximumPoolSize為一個不受限制的數量如Integer.MAX,便可以創建一個適應任意數量大的並發任務的線程池。常規情況下,可以根據構造方法來設置corePoolSize與maximumPoolSize,但也可以通過 setCorePoolSize和setMaximumPoolSize方法動態的修改其值。

  Ø 按需構造

  默認情況下,核心線程是在開始接收新任務時才初始創建,但是可以使用prestartCoreThread或prestartAllCoreThreads方法來動態的預開啟所有的線程數。

  Ø  創建新線程

  新線程是使用java.util.concurrent.ThreadFactory來創建的,如果沒有指定其他的方式,則是使用Executors.defaultThreadFactory方法,默認創建的線程擁有相同線程組與優先級且都是非后台線程。

  Ø  存活時間(Keep-alive times)

  若當前池里的線程數量超過corePoolSize,超出的線程如果是空閑的,將在存活指定的keepAliveTime時間后終止。這種機制主要是為減少資源的消耗,如果后期有新的活動任務,則又構造新的線程。該參數也可以通過setKeepAliveTime方法動態的修改,如果該參數設置為 Long.MAX_VALUE,則空閑的線程將一直存活。

  Ø  阻塞隊列

  超出一定數量的任務會轉移隊列中,隊列與池里的線程大小的關聯表現在:如運行的線程數小於corePoolSize線程數,Executor會優先添加線程來執行task,而不會添加到隊列中。如運行的線程已大於 corePoolSize,Executor會把新的任務放於隊列中,如隊列已到最大時,ThreadPoolExecutor會繼續創建線程,直到超過 maximumPoolSize。最后,線程超過maximumPoolSize時,Executor將拒絕接收新的task.

  而添加任務到隊列時,有三種常規的策略:

1. 直接傳遞。SynchronousQueue隊列的默認方式,一個存儲元素的阻塞隊列而是直接投遞到線程中。每一個入隊操作必須等到另一個線程調用移除操作,否則入隊將一直阻塞。當處理一些可能有內部依賴的任務時,這種策略避免了加鎖操作。直接傳遞一般不能限制maximumPoolSizes以避免拒絕 接收新的任務。如果新增任務的速度大於任務處理的速度就會造成增加無限多的線程的可能性。

2. 無界隊列。如LinkedBlockingQueue,當核心線程正在工作時,使用不用預先定義大小的無界隊列將使新到來的任務處理等到中,所以如果線程數是小於corePoolSize時,將不會創建有入隊操作。這種策略將很適合那些相互獨立的任務,如Web服務器。如果新增任務的速度大於任務處理的速度就會造成無界隊列一直增長的可能性。

3. 有界隊列。如ArrayBlockingQueue,當定義了maximumPoolSizes時使用有界隊列可以預防資源的耗盡,但是增加了調整和控制隊列的難度,隊列的大小和線程池的大小是相互影響的,使用很大的隊列和較小的線程池會減少CPU消耗、操作系統資源以及線程上下文開銷,但卻人為的降低了吞吐量。如果任務是頻繁阻塞型的(I/O),系統是可以把時間片分給多個線程的。而采用較小的隊列和較大的線程池,雖會造成CPU繁忙,但卻會遇到調度開銷,這也會降低吞吐量。

  Ø  飽和策略(拒絕接收任務)

  當Executor調用shutdown方法后或者達到工作隊列的最容量時,線程池則已經飽和了,此時則不會接收新的task。但無論是何種情 況,execute方法會調用RejectedExecutionHandler#rejectedExecution方法來執行飽和策略,在線程池內部預定義了幾種處理策略:

1. 終止執行(AbortPolicy)。默認策略, Executor會拋出一個RejectedExecutionException運行異常到調用者線程來完成終止。

2. 調用者線程來運行任務(CallerRunsPolicy)。這種策略會由調用execute方法的線程自身來執行任務,它提供了一個簡單的反饋機制並能降低新任務的提交頻率。

3. 丟棄策略(DiscardPolicy)。不處理,直接丟棄提交的任務。

4. 丟棄隊列里最近的一個任務(DiscardOldestPolicy)。如果Executor還未shutdown的話,則丟棄工作隊列的最近的一個任務,然后執行當前任務。

  終於可以走近代碼了,在ThreadPoolExecutor,我們直接跳躍到核心的字段和方法處。

  ThreadPoolExecutor類分析:

  1. /* 
  2. 一 個ThreadPoolExecutor管理着一系列控制字段。首先需要保證在加鎖的區域中, 執行的控制字段才能被改變,這些字段包含有 runState,poolSize, corePoolSize, and maximumPoolSize。然后這些字段被聲明為volatile類 型,因此保證了內存可見性(任何線程都有對其的內存可見性)。 
  3. 而一些其他的字段是表示用戶控制的參數,不會影響執行的結果,也聲明為volatile類型,允許用戶在執行時異步的改變。這些字段有包含有:allowCoreThreadTimeOut, keepAliveTime。除此之外,內部的飽和策略處理器、線程工廠也不會在加鎖區域內被更 改。 
  4. 大量的volatile類型聲明主要是保證在不用加鎖的條件下,很多操作的結果都具有內存的可見性,如工作隊列的任務入隊和出隊操作。 
  5. */  
  6. /** 
  7. runState提供生命周期的控制,有如下值: 
  8. RUNNING: 接收新任務並正在隊列中的任務 
  9. SHUTDOWN: 不再接收新的任務,但是仍在處理隊列中的任務 
  10. STOP:不再接收新的任務,不再處理隊列中的任務並中斷正在執行的任務 
  11. TERMINATED:同STOP一樣,同時所有的線程已被終止 
  12. runState會單一的增加,但不需要每個值都命中,他們可有如下的轉換順序: 
  13. RUNNING -> SHUTDOWN 調用shutdown方法后 
  14. (RUNNING or SHUTDOWN) -> STOP 調用shutdownNow()后 
  15. SHUTDOWN -> TERMINATED 隊列和線程池里的任務已完,線程已終止 
  16. STOP -> TERMINATED 線程池已空,線程已終止 
  17. */  
  18. volatile int runState;  
  19. static final int RUNNING= 0;  
  20. static final int SHUTDOWN= 1;  
  21. static final int STOP = 2;  
  22. static final int TERMINATED = 3;
  23. //用於存儲任務並把任務投遞給工作者線程的阻塞隊列。我們不需用workQueue.poll()方法返回null,因為此時可認為隊列已為空,因此有時候必須再作檢查
  24. private final BlockingQueue<Runnable> workQueue;
  25. /** 
  26. 更新poolSize, corePoolSize, maximumPoolSize, runState, and workers者線程的鎖 
  27. 在1.6中是一把重入鎖,在1.7時已經修改為直接繼承AQS。 
  28. */  
  29. private final ReentrantLock mainLock = new ReentrantLock();  
  30. //用於輔助awaitTermination方法的等待條件
  31. private final Condition termination = mainLock.newCondition();  
  32. //池里的工作者線程,當持有mainLock時才能讀取 
  33. private final HashSet<Worker> workers = new HashSet<Worker>();  
  34. // 以納秒為單位的超時時間,如果allowCoreThreadTimeOut設置為true,則空閑的多余corePoolSize的線程將會在存活keepAliveTime時長后終止。
  35. private volatile long  keepAliveTime;  
  36. // 默認為false, 保留空閑的核心線程
  37. private volatile boolean allowCoreThreadTimeOut;  
  38. private volatile int   corePoolSize;  
  39. private volatile int   maximumPoolSize;  
  40. private volatile int   poolSize;  
  41. //飽和執行策略或者shutdown后拒絕執行的策略 
  42. private volatile RejectedExecutionHandler handler;  
  43. //創建新線程的工廠。 
  44. private volatile ThreadFactory threadFactory;  

  接下來便是一組按需指定的構造方法

  分析一下最重要的execute方法。
  1. 如果當前池里運行的線程數量小於corePoolSize,則創建新線程(需要獲取全局鎖)   2. 如果當前的線程數量大於corePoolSize,則將任務加入BlockQueue中。   3. 如果隊列已滿,但是線程數小於最大線程數量,則繼續添加線程(需要再次獲取全局鎖)   4. 已達到最大線程數量,任務隊列也已經滿了,任務將被拒絕,並調用RejectExecutionHanlder的rejectExecution方法。   ThreadPoolExecutor采用上述步驟的總體思路為,在執行execute方法時,盡可能的避免獲取全局鎖(影響性能),在   ThreadPoolExecutor完成核心線程的開啟后,幾乎所有的execute方法都是執行步驟2,巧妙地是,步驟2不需要獲取全局鎖。

  1. public void execute(Runnable command) {  
  2.         if (command == null) //不能是空任務  
  3.             throw new NullPointerException();  
  4.     //如果還沒有達到corePoolSize,則添加新線程來執行任務  
  5.         if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
  6.          //如果已經達到corePoolSize,則不斷的向工作隊列中添加任務  
  7.             if (runState == RUNNING && workQueue.offer(command)) {  
  8.             //線程池已經沒有任務  
  9.                 if (runState != RUNNING || poolSize == 0)   
  10.                     ensureQueuedTaskHandled(command);  
  11.             }  
  12.          //如果線程池不處於運行中或者工作隊列已經滿了,但是當前的線程數量還小於允許最大的maximumPoolSize線程數量,則繼續創建線程來執行任務  
  13.             else if (!addIfUnderMaximumPoolSize(command))  
  14.             //已達到最大線程數量,任務隊列也已經滿了,則調用飽和策略執行處理器  
  15.                 reject(command); // is shutdown or saturated  
  16.         }  
  17. }  
  18.   
  19. private boolean addIfUnderCorePoolSize(Runnable firstTask) {  
  20.         Thread t = null;  
  21.         final ReentrantLock mainLock = this.mainLock;  
  22.         mainLock.lock();  
  23.         //更改幾個重要的控制字段需要加鎖  
  24.         try {  
  25.             //池里線程數量小於核心線程數量,並且還需要是運行時  
  26.             if (poolSize < corePoolSize && runState == RUNNING)  
  27.                 t = addThread(firstTask);  
  28.         } finally {  
  29.             mainLock.unlock();  
  30.         }  
  31.         if (t == null)  
  32.             return false;  
  33.         t.start(); //創建后,立即執行該任務  
  34.         return true;  
  35.     }  
  36.   
  37. private Thread addThread(Runnable firstTask) {  
  38.         Worker w = new Worker(firstTask);  
  39.         Thread t = threadFactory.newThread(w); //委托線程工廠來創建,具有相同的組、優先級、都是非后台線程  
  40.         if (t != null) {  
  41.             w.thread = t;  
  42.             workers.add(w); //加入到工作者線程集合里  
  43.             int nt = ++poolSize;  
  44.             if (nt > largestPoolSize)  
  45.                 largestPoolSize = nt;  
  46.         }  
  47.         return t;  
  48.     }  

  再看看工作者線程的設計。

  線程池內部可以直接用一個線程集合來復用線程,但Doug先生用Worker再次封裝一下,估計其設計的初衷有:划分職責,只做單純的任務消費者,執行完成后可追尋一些統計量。Worker執行任務時,需要先獲取runLock,此處的目的是在任務的執行過程中防止worker線程被中斷。然后雙重檢查是否線程池已停止或者中斷。最好開始執行任務,此時調用用戶自定的鈎子方法,可在執行前和執行后作相應的處理。在Worker自身的run方法體中,需要先獲取任務,調用實際的runTask。在獲取任務的操作中,由於是阻塞獲取,則保證了線程的最終存活的可能。

  1. public void run() {  
  2.             try {  
  3.                 Runnable task = firstTask;  
  4.                 firstTask = null;  
  5.                 while (task != null || (task = getTask()) != null) { //調用阻塞獲取任務的方法,如果沒有則會一直阻塞於此,處理等待狀態  
  6.                     runTask(task);  
  7.                     task = null;  
  8.                 }  
  9.             } finally {  
  10.                 workerDone(this);  
  11.             }  
  12.         }  

最好看看從隊列中獲取任務的方法。

  1. Runnable getTask() {  
  2.         for (;;) {  
  3.             try {  
  4.                 int state = runState;  
  5.                 if (state > SHUTDOWN)  
  6.                     return null;  
  7.                 Runnable r;  
  8.                 if (state == SHUTDOWN)  // Help drain queue  
  9.                     r = workQueue.poll(); //poll方法不會阻塞  
  10.             //keepAliveTime的超時終止體現在此處,超時后poll方法會返回r.然后回到worker的run方法里,由於沒有可用的任務,超時返回的r會為null,因此worker的run方法會直接退出循環和導致線程結束。  
  11.                 else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  
  12.                     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  
  13.                 else  
  14.                     r = workQueue.take(); //阻塞等待  
  15.                 if (r != null)  
  16.                     return r;  
  17.                 if (workerCanExit()) {  
  18.                     if (runState >= SHUTDOWN) // Wake up others  
  19.                         interruptIdleWorkers();  
  20.                     return null;  
  21.                 }  
  22.                 // Else retry  
  23.             } catch (InterruptedException ie) {  
  24.                 // On interruption, re-check runState  
  25.             }  
  26.         }  
  27.     }  

至於此,ThreadPoolExecutor的核心源碼已經分析完成。

二、再回頭來分析原理

執行示意圖

三、線程池的使用

  1. 通過構造方法來初始化
  1. public ThreadPoolExecutor(int corePoolSize,  
  2.                               int maximumPoolSize,  
  3.                               long keepAliveTime,  
  4.                               TimeUnit unit,  
  5.                               BlockingQueue<Runnable> workQueue)   

  2.或使用Executors的工廠方法

  1. public static ExecutorService newFixedThreadPool(int nThreads) {  
  2.         return new ThreadPoolExecutor(nThreads, nThreads,  
  3.                                       0L, TimeUnit.MILLISECONDS,  
  4.                                       new LinkedBlockingQueue<Runnable>());  
  5. public static ExecutorService newCachedThreadPool() {  
  6.         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
  7.                                       60L, TimeUnit.SECONDS,  
  8.                                       new SynchronousQueue<Runnable>());  
  3. 調用execute方法,提交Runnable
  1. executorPool.execute(new TaskAction(taskId));  

 四、之前的一些疑惑

  初步接觸線程池時,有些問題也一直得不到解答,看完源碼后,便得到了一下解答。

  1. 線程為什么能夠一直存活?

  每個Woker是一個Runnable,同時會綁定到一個線程上。在執行Worker的run方法時,會去隊列中獲取任務,但是獲取任務是阻塞的獲取,如果沒有則線程會一直等待,因此不會被終止。最終還是會轉移到重入鎖上並有內部同步器來完成執行阻塞的操作。

  2. 參數keepAliveTime的具體原理?

  在ThreadPoolExecutor的getTask方法中,如果當前池里線程的數量大於核心數量或者設置 allowCoreThreadTimeOut為true的話,則調用的是阻塞隊列的 poll(long timeout,TimeUnit unit)方法,該方法等待指定的時間后會直接返回。worker線程會獲取到返回的任務,如果為空的話,則退出循環,因此線程便結束了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM