Java並發(基礎知識)—— Executor框架及線程池


      在Java並發(基礎知識)—— 創建、運行以及停止一個線程中講解了兩種創建線程的方式:直接繼承Thread類以及實現Runnable接口並賦給Thread,這兩種創建線程的方式在線程比較少的時候是沒有問題的,但是當需要創建大量線程時就會出現問題,因為這種使用方法把線程創建語句隨意地散落在代碼中,無法統一管理線程,我們將無法管理創建線程的數量,而過量的線程創建將直接使系統崩潰。

      從高內聚角度講,我們應該創建一個統一的創建以及運行接口,為我們管理這些線程,這個統一的創建與運行接口就是JDK 5的Executor框架。

Executor框架                                                                                  

      在Java類庫中,任務執行的主要抽象不是Thread,而是Executor,該接口定義如下:

public interface Executor {
    void execute(Runnable command);
}

  雖然Executor是一個簡單的接口,但它卻為靈活且強大的異步任務執行框架提供了基礎,該框架能夠支持多種不同類型的任務執行策略,它提供了一種標准的方法將任務的提交過程與執行過程解耦開來。

      Executor基於生產者-消費者模式,提交任務的操作相當於生產者(生成待完成的工作單元),執行任務的線程則相當於消費者(執行工作單元)。如果要在一個程序中實現一個生產者-消費者模式,那么最簡單的方式就是使用Executor。

      Executor接口定義了提交任務的方法,但卻沒有定義關閉的方法,ExecutorService接口擴展了Executor接口,添加了一些用於生命周期管理的方法:

public interface ExecutorService extends Executor {
    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
  
    ...    
}

  ExecutorService的生命周期有3種狀態:運行、關閉和已終止。ExecutorService在初始創建時處於運行狀態。shutdown方法將執行平緩的關閉過程:不再接受新的任務,同時等待已經提交的任務執行完成——包括那些還未開始執行的任務。shutdownNow方法將執行粗暴的關閉過程:它將嘗試取消所有執行中的任務,並且不再啟動隊列中尚未開始執行的任務。

      在所有任務都完成后,ExecutorService將轉入終止狀態。可以調用awaitTermination來等待ExecutorService到達終止狀態,或者通過調用isTerminated來輪詢ExecutorService是否已經終止。通常在調用awaitTermination之后會立即調用shutdown,從而產生同步地關閉ExecutorService的效果。

線程池                                                                                         

      Executor框架的核心是線程池。線程池是指管理一組同構工作線程的資源池,在"線程池中執行任務"比"為每個任務分配一個線程"優勢更多。通過重用現有的線程而不是創建新線程,可以在處理多個請求時分攤在線程創建和銷毀過程中產生的巨大開銷。另一個額外的好處是,當請求到達時,工作線程通常已經存在,因此不會由於等待創建線程而延遲任務的執行,從而提高了響應性。通過適當調整線程池大小,可以創建足夠多的線程以便使處理器保持忙碌,同時還可以防止過多線程相互競爭資源而使應用程序耗盡內存而失敗。

      ThreadPoolExecutor定義了一個線程池,該類的聲明如下:

public class ThreadPoolExecutor extends AbstractExecutorService { ... }

public abstract class AbstractExecutorService implements ExecutorService { ... }

  可以看到,ThreadPoolExecutor繼承自AbstractExecutorService,AbstractExecutorService實現了ExecutorService接口,所以ThreadPoolExecutor也間接實現了ExecutorService接口。

      ThreadPoolExecutor定義了很多構造函數,以下代碼給出了該類最重要的構造函數:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) { ... }

  corePoolSize、maximumPoolSize、keepAliveTime以及unit這幾個參數分別定義了線程池的基本大小、最大大小以及存活時間。corePoolSize定義了線程池的基本大小,也就是線程池的目標大小,即在沒有任務執行時線程池的大小,並且只有在工作隊列滿了的情況下才會創建超出這個數量的線程。maximumPoolSize定義了線程池的最大大小,表示線程池可同時活動線程數量上限。keepAliveTime和unit共同定義了線程的存活時間,如果某個線程的空閑時間超過了存活時間,那么將被標記為可回收的,並且當線程池的當前大小超過基本大小時,這個線程將被終止。

      workQueue參數包含Runnable的阻塞隊列,當線程池達到基本大小時,新提交的任務將放入這個阻塞隊列中,阻塞隊列的實現包含三種:無界隊列、有界隊列以及同步移交隊列。

      threadFactory參數用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字,方便定位問題。

      handler參數定義了線程池飽和策略。當有界隊列被填滿后,並且線程池活動線程達到最大線程數,飽和策略開始生效。JDK提供了幾種不同的RejectedExecutionHandler實現,分別是AbortPolicy、DiscardPolicy、DiscardOldestPolicy以及CallerRunsPolicy。AbortPolicy是默認的飽和策略,該策略將拋出未檢查的RejectedExecutionException。DiscardPolicy策略會把新提交的任務直接拋棄,而DiscardOldestPolicy策略會拋棄隊列首部最老的任務。CallerRunsPolicy策略實現了一種調節機制,該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者,從而降低新任務的流量,它不會在線程池中的某個線程中執行新提交的任務,而是在一個調用了execute的線程中執行該任務。

Executors靜態工廠方法                                                                      

      從上一節內容看出,ThreadPoolExecutor的新建需要傳入很多參數,使用起來極不方便。為了便於使用,Executors為我們提供了幾個靜態工廠方法,大大簡化了線程池的創建,它們分別是:

  • newFixedThreadPool:newFixedThreadPool將創建一個固定大小的線程池,每當提交一個任務就創建一個線程,直到達到線程池的最大數量,這時線程池的規模將不再變化;
  • newCachedThreadPool:newCachedThreadPool將創建一個可緩存的線程池,如果線程池的當前規模超過了處理需求,那么將回收空閑線程;而當需求增加時,可以添加新的線程,線程池的規模不存在任何限制。
  • newSingleThreadExecutor:newSingleThreadExecutor是一個單線程的Executor,它創建單個工作者線程執行任務,如果這個線程異常結束,會創建另一個線程代替。

      以newCachedThreadPool為例,我們可以看看這些靜態工廠方法的內部實現:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

  可以看到,這些靜態工廠方法最終還是調用的ThreadPoolExecutor的構造函數,指定了線程池基本大小為0,最大大小為Integer值上限,線程存活時間為60s,阻塞隊列是一個SynchronousQueue。從這些參數可以知道,當線程提交newCachedThreadPool的線程池時,由於基本大小為0,所以肯定大於基本大小,然后任務會進入阻塞隊列,而SynchronousQueue內部沒有任何容量,且當前線程數未達到最大線程數,所以任務將立即執行。任務執行完有60s的超時時間,如果在這段時間內有新任務調用,那么新任務將直接在這個線程上運行。

總結                                                                                                    

      線程池的使用能夠幫助我們統一管理線程,提高線程的可管理性,在寫多線程代碼時,我們應該優先使用線程池方式創建線程。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM