Executor框架完整解讀


1 前言

Java的線程既是工作單元,也是執行機制。從JDK 5開始,把工作單元與執行機制分離開來。工作單元包括Runnable和Callable,而執行機制由Executor框架提供.

在HotSpot VM的線程模型中,Java線程被一對一映射為本地操作系統線程。Java線程啟動時會創建一個本地操作系統線程;當該Java線程終止時,這個操作系統線程也會被回收。操作系統會調度所有線程並將它們分配給可用的CPU。在上層,Java多線程程序通常把應用分解為若干個任務,然后使用用戶級的調度器(Executor框架)將這些任務映射為固定數量的線程;在底層,操作系統內核將這些線程映射到硬件處理器上。應用程序通過Executor框架控制上層的調度;而下層的調度由操作系統內核控制,下層的調度不受應用程序的控制。

2 Executor框架組成

Executor框架主要由3大部分組成如下:

  • ①任務: RunnableCallable接口及其實現類

  • ②任務執行器: 主要是Executor及擴展Executor的ExecutorService接口的一些實現類。Executor框架有兩個重要的實現類,一個是線程池執行器ThreadPoolExecutor、另一個是定時任務執行器ScheduledThreadPoolExecutor .

  • ③任務的結果: Future接口及其默認實現FutureTask

說明:

Runnable接口(無返回值)和Callable接口(有返回值)的實現類,都可以被ThreadPoolExecutorScheduledThreadPoolExecutor執行。

Executor是一個接口,它是Executor框架的基礎,它將任務的提交與任務的執行分離開來。ThreadPoolExecutor是線程池的核心實現類,用來執行被提交的任務。ScheduledThreadPoolExecutor是一個實現類,可以在給定的延遲后運行命令,或者定期執行命令 。ScheduledThreadPoolExecutor比Timer更靈活,功能更強大。 Future接口和實現Future接口的FutureTask類,代表異步任務的結果。

3 Runnable和Callable

Runnable接口和Callable接口的實現類,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor執行。它們之間的區別是Runnable沒有返回值,無法判斷任務是否完成,而Callable有一個返回結果。

除了可以自己創建實現Callable接口的對象外,還可以使用工廠類Executors將一個Runnable包裝成一個Callable.

callable(Runnable )方法包裝不需要結果的Runnable任務,任務完成后Future.get()會獲取一個null值;而callable(Runnable,T)可以包裝一個需要結果的Runnable任務,結果可以通過參數result指定,任務完成后Future.get()可以獲得這個結果result.

    public static Callable<Object> callable(Runnable task) 
    public static <T> Callable<T> callable(Runnable task, T result)

 

4 ThreadPoolExecutor

ThreadPoolExecutorExecutorService的最重要的實現類,ThreadPoolExecutor不直接實現ExecutorService接口,它直接繼承於AbstractExecutorService抽象類, AbstractExecutorServiceExecutorSerivice接口中的一些方法做過的默認實現 。

 

 之前的文章線程池ThreadPoolExecutor簡介對使用構造方法創建線程池已做詳細說明,這里介紹使用工廠類Executors來創建線程池。ThreadPoolExecutor可以使用工廠類Executors提供了一些靜態工廠方法,可以以此方便地創建一些常用配置的線程池。Executors可以創建3種類型的ThreadPoolExecutor .

1) 固定線程池

newFixedThreadPool系列方法創建固定線程數的線程池。它適用於為了滿足資源管理的需求,而需要限制當前線程數量的應用場景,它適用於負載比較重的服務器。

    //創建固定線程數的線程池
 public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
  }
 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(), threadFactory);
  }

這種線程池的corePoolSize和maximumPoolSize都被設置為參數nThreads。當線程池中的線程數大於corePoolSize時,keepAliveTime為非核心線程等待新任務的最長時間,超過這個時間后的這些線程將被終止。corePoolSize和maximumPoolSize參數設置為相同值,表示線程池中所有線程均是核心線程, 那么keepAliveTime參數就是無意義的.

 

 處理任務流程說明:

①當線程池中的線程數小於corePoolSize時,線程池會創建新線程去執行任務。

②在經過一段時間預熱后,線程數達到了corePoolSize(因為maximumPoolSize與corePoolSize相同,此時也達到了最大線程數,以后不會再創建線程),開始將任務放入工作隊列中。

③此后有新任務到達就向工作隊列中放入(LinkedBlockingQueue無參構造方法,創建的隊列容量是Integer.MAX_VALUE ,這種隊列幾乎不可能容量爆滿,不會拒絕任務,拒絕策略不起作用),若有線程處於空閑狀態則從工作隊列中獲取任務並執行。

2) 單線程池

newSingleThreadExecutor系列方法創建單個線程的線程池 它適用於保證任務按順序執行,並且在任何時候最多只有一個活動(正在執行)的任務。

  //創建單個線程的線程池
  public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,  new LinkedBlockingQueue<Runnable>(),threadFactory));
  }
  public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,  0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
  }

newSingleThreadExecutor方法不是直接返回ThreadPoolExecutor對象,它將ThreadPoolExecutor對象進行包裝成FinalizableDelegatedExecutorService對象,但實際的業務處理還是委托給ThreadPoolExecutor去實現。

這類線程池的corePoolSize和maximumPoolSize都被設為1,線程池中最多有一個線程,同樣地這里的keepAliveTime參數是無意義的,它使用一個無限大容量的阻塞隊列作為存放任務的容器。

 

 處理任務流程說明:

①當線程池中的無任何線程時,線程池會創建一個線程去執行任務。

②當線程池中有一個線程時,開始將任務放入工作隊列中。

③此后有新任務到達就向工作隊列中放入(LinkedBlockingQueue無參構造方法,創建的隊列容量是Integer.MAX_VALUE ,這種隊列幾乎不可能容量爆滿,不會拒絕任務,拒絕策略不起作用),若線程池中的唯一線程處於空閑狀態則從工作隊列中取出任務並執行。

3) 緩存線程池

newCachedThreadPool系列方法會根據需要創建新線程的線程池(但若之前創建的線程可用,則將復用這些線程) . 它對線程數沒有限制,會按需創建新線程,適用於執行很多的短期異步任務的小任務,或者是負載較輕的服務器。

    //根據需要創建新線程的線程池
  public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);
  }
  public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
  }

這種線程池的corePoolSize為零、maximumPoolSize為Integer.MAX_VALUE,表明線程池中沒有核心線程、所有線程均是非核心線程,且可允許的最大線程數是近乎無限大。keepAliveTime參數設為60,表明空閑線程最多等待60秒就被終止。

這里使用SynchronousQueue作為工作隊列,這種隊列是沒有容量的(當嘗試排隊時,只有正好有空閑線程正在等待接受任務時才會入隊成功),但可允許創建的最大線程數是無限大的。這意味着主線程提交任務的速度大於任務處理的速度,線程池就會創不斷建新線程,這樣可能導致創建的線程過多,系統資源被耗盡、程序崩潰。

 

 處理任務流程說明:

①因為核心線程數為0,所以線程池在啟動時核心線程池就已經滿了。在主線程提交第一個任務時,線程池就要將嘗試此任務入隊,由於SynchronousQueue的特殊性,只有當此時空閑線程也正在出隊,入隊與出隊兩者恰好匹配時,主線程會把任務交給空閑線程去執行。否則將進入下一步。

②當線程池中無任何線程或無空閑線程時,將沒有線程執行出隊操作。此時線程池會創建建一個新線程執行任務

③上一步中創建的新線程在執行完的任務后,會調用SynchronousQueue.poll等待任務出隊。這個空閑線程最多等待60秒時間,若主線程在60秒內提交了一個新任務,此空閑線程將獲取到這個任務並執行。若等待60秒后,還沒等到新任務到達,這個線程將被終止。

5 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor是定時任務執行器,它可以通過構造方法創建,也可通過工廠類Executors的靜態方法創建。

ScheduledThreadPoolExecutor的構造方法邏輯十分簡單,它們直接調用父類實現。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,  RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,  new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
} 

 

Executors可創建兩種類型的ScheduledThreadPoolExecutor

①newSingleThreadScheduledExecutor系列方法用於創建單個線程的定時任務執行器。它適用於按照固定順序執行周期性的定時任務,且最多同時執行一個任務的情況。

②newScheduledThreadPool系列方法用於創建給定個數線程的定時任務執行器。它適用於需要多個線程執行周期任務,同時又要限制線程數、防止創建線程過多耗費資源的情況。

   //單線程的定時任務執行器
     public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    //多線程的定時任務執行器
     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

 

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,並實現了表示定時執行器的接口ScheduledExecutorService。它主要用來在給定的延遲之后運行任務,或者定期執行任務.

 

 DelayedWorkQueue是ScheduledThreadPoolExecutor的一個靜態內部類,它是一個無界隊列,所以父類ThreadPoolExecutor中的maximumPoolSize、keepAliveTime這兩個參數無意義、沒有任何效果。

執行任務基本流程:

①當調用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法時,會向的DelayedWorkQueue添加一個實現了RunnableScheduledFuture接口的ScheduledFutureTask類型任務。

②線程池中的線程從DelayedWorkQueue中嘗試獲取到期任務,若沒有任務到期此線程將阻塞等待,直到真正獲取到一個任務,然后執行此任務。

 

 ScheduledThreadPoolExecutor會把待調度的任務ScheduledFutureTask放到一個DelayedWorkQueue中,我們來了解一下ScheduledFutureTask。

 ScheduledFutureTask是ScheduledThreadPoolExecutor的一個成員內部類,它繼承了FutureTask類,另外還實現了表示周期性任務的ScheduledFutureTask接口,ScheduledFutureTask有3 個重要的成員變量。

  • long型成員變量time,表示這個任務將要被執行的具體時間。

  • long型成員變量sequenceNumber,表示這個任務被添加到ScheduledThreadPoolExecutor中的序號。

  • long型成員變量period,表示任務執行的間隔周期

DelayedWorkQueue封裝了一個RunnableScheduledFuture數組,它利用這個數組實現一個基於堆排序的優先級隊列,其原理與PriorityQueue類似。ScheduledFutureTask任務會放入這個數組中,DelayedWorkQueue會對數組中的任務按照優先級排序。排序的基本原則是: time小的排在前面,如果兩個任務的time相同,就比較sequenceNumber,sequenceNumber小的排在前面。換句話說,時間早的任務先執行,若幾個任務同樣早,就看誰先提交,先提交的任務先執行。

  執行一個任務的完整步驟:

①某空閑線程從DelayedWorkQueue中獲取已到期的ScheduledFutureTask任務(到期任務是指ScheduledFutureTask的time大於等於當前時間)。

②此線程執行這個ScheduledFutureTask任務。

③此線程修改ScheduledFutureTask的time變量為下次將要被執行的時間。

④此線程把這個time被修改后的ScheduledFutureTask重新放回DelayedWorkQueue中。

6 Future

Future接口和FutureTask類用來表示異步任務的結果。當我們把Runnable接口或Callable接口的實現類提交(使用ExecutorServicesubmit系列方法提交任務)給ThreadPoolExecutor或ScheduledThreadPoolExecutor時,它們將返回一個Future類型的對象(實際返回FutureTask類型對象) , 在調用Future.get()方法時會阻塞等待任務完成后再返回任務的結果。

submit方法提交任務

    public Future<?> submit(Runnable task) 
    public <T> Future<T> submit(Runnable task, T result)
    public <T> Future<T> submit(Callable<T> task)

 

get方法等待任務執行完成並獲取結果

 public V get() throws InterruptedException, ExecutionException
 public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException

 

Future、FutureTask的詳細分析在之前的文章FutureTask源碼完整解讀已有說明,這里不再贅述。

參考:《Java並發編程的藝術》《Java的邏輯》 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM