【Java多線程】Executor框架的詳解


在Java中,使用線程來異步執行任務。Java線程的創建與銷毀需要一定的開銷,如果我們為每一個任務創建一個新線程來執行,這些線程的創建與銷毀將消耗大量的計算資源。同時,為每一個任務創建一個新線程來執行,這種策略可能會使處於高負荷狀態的應用最終崩潰。

Java線程既是工作單元,也是執行單元。從JDK1.5開始,把工作單元與執行機制分離開來。工作單元包括Runnable 和 Callable,而執行機制由Executor框架提供。

Executor框架簡介

Executor框架的兩級調度模型

在HotSpot VM的線程模型中,Java線程被一對一映射為本地操作系統線程。Java線程啟動時會創建一個本地操作系統線程;當Java線程終止時,這個操作系統線程也會被回收。操作系統會調用所有線程並將他們分配給可用的CPU。

可以將此種模式分為兩層,在上層,Java多線程程序通常把應用程序分解為若干任務,然后使用用戶級的調度器(Executor框架)講這些任務映射為固定數量的線程;在底層,操作系統內核將這些線程映射到硬件處理器上。

兩級調度模型的示意圖:

這里寫圖片描述

從圖中可以看出,該框架用來控制應用程序的上層調度(下層調度由操作系統內核控制,不受應用程序的控制)。

 

Executor框架的結構和成員

Executor框架的結構

1. 任務

包括被執行任務需要實現的接口:Runnable接口和Callable接口

2. 任務的執行

包括任務執行機制的核心接口Executor,以及繼承自Executor的ExecutorService接口。

Executor框架有兩個關鍵類實現了ExecutorService接口:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor

3. 異步計算的結果

包括Future和實現Future接口的FutureTask類。

Executor框架的類與接口

示意圖

  • Executor是一個接口,他是Executor框架的基礎,它將任務的提交與任務的執行分離。
  • ThreadPoolExecutor是線程池的核心實現類,用來執行被提交的任務。
  • ScheduledThreadPoolExecutor是一個實現類,可以在給定的延遲后運行命令,或者定期執行命令。ScheduledThreadPoolExecutor 比 Timer 更靈活,功能更強大。
  • Future接口和它的實現FutureTask類,代表異步計算的結果。
  • Runnable和Callable接口的實現類,都可以被ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 執行。

 

Executor框架的使用

 先來看個圖:

  1. 主線程首先要創建實現 Runnable接口或者Callable接口的任務對象。工具類Executors可以把一個Runnable對象封裝為一個Callable對象
    Executors.callable(Runnale task); 
    或
    Executors.callable(Runnable task, Object resule); 
  2. 然后可以把Runnable對象直接交給ExecutorService執行
    ExecutorServicel.execute(Runnable command);
    或者也可以把Runnable對象或Callable對象提交給ExecutorService執行
    ExecutorService.submit(Runnable task);

    如果執行ExecutorService.submit(...),ExecutorService將返回一個實現Future接口的對象(到目前為止的JDK中,返回的是FutureTask對象)。由於FutureTask實現了Runnable接口,我們也可以創建FutureTask類,然后直接交給ExecutorService執行。  

  3. 最后,主線程可以執行FutureTask.get()方法來等待任務執行完成。主線程也可以執行FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務的執行。

 

 ThreadPoolExecutor詳解

 Executor框架最核心的類是ThreadPoolExecutor

 ThreadPoolExecutor的組件構成

  • corePool:核心線程池的大小
  • maximumPool:最大線程池的大小
  • BlockingQueue:用來暫時保存任務的工作隊列
  • RejectedExecutionHandler:當ThreadPoolExecutor已經關閉或ThreadPoolExecutor已經飽和時(達到了最大線程池的大小且工作隊列已滿),execute()方法將要調用的Handler。

 

Executor 可 以 創 建 3 種 類 型 的 ThreadPoolExecutor 線 程 池:

 1. FixedThreadPool

創建固定長度的線程池,每次提交任務創建一個線程,直到達到線程池的最大數量,線程池的大小不再變化。

這個線程池可以創建固定線程數的線程池。特點就是可以重用固定數量線程的線程池。它的構造源碼如下:

public static ExecutorService newFixedThreadPool(int nThreads) {  
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, 
                                      TimeUnit.MILLISECONDS,  
                                      new LinkedBlockingQueue<Runnable>());  
}  
  • FixedThreadPool的corePoolSize和maxiumPoolSize都被設置為創建FixedThreadPool時指定的參數nThreads。
  • 0L則表示當線程池中的線程數量操作核心線程的數量時,多余的線程將被立即停止
  • 最后一個參數表示FixedThreadPool使用了無界隊列LinkedBlockingQueue作為線程池的做工隊列,由於是無界的,當線程池的線程數達到corePoolSize后,新任務將在無界隊列中等待,因此線程池的線程數量不會超過corePoolSize,同時maxiumPoolSize也就變成了一個無效的參數,並且運行中的線程池並不會拒絕任務。

FixedThreadPool運行圖如下

執行過程如下:

1.如果當前工作中的線程數量少於corePool的數量,就創建新的線程來執行任務。

2.當線程池的工作中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。

3.線程執行完1中的任務后會從隊列中去任務。

注意LinkedBlockingQueue是無界隊列,所以可以一直添加新任務到線程池。

 

2. SingleThreadExecutor  

SingleThreadExecutor是使用單個worker線程的Executor。特點是使用單個工作線程執行任務。它的構造源碼如下:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

  

SingleThreadExecutor的corePoolSize和maxiumPoolSize都被設置1。
其他參數均與FixedThreadPool相同,其運行圖如下:

 

執行過程如下:

1.如果當前工作中的線程數量少於corePool的數量,就創建一個新的線程來執行任務。

2.當線程池的工作中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。

3.線程執行完1中的任務后會從隊列中去任務。

注意:由於在線程池中只有一個工作線程,所以任務可以按照添加順序執行。

 

 3. CachedThreadPool

 CachedThreadPool是一個”無限“容量的線程池,它會根據需要創建新線程。特點是可以根據需要來創建新的線程執行任務,沒有特定的corePool。下面是它的構造方法:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

  

CachedThreadPool的corePoolSize被設置為0,即corePool為空;maximumPoolSize被設置為Integer.MAX_VALUE,即maximum是無界的。這里keepAliveTime設置為60秒,意味着空閑的線程最多可以等待任務60秒,否則將被回收。
 
CachedThreadPool使用沒有容量的SynchronousQueue作為主線程池的工作隊列,它是一個沒有容量的阻塞隊列。每個插入操作必須等待另一個線程的對應移除操作。這意味着,如果主線程提交任務的速度高於線程池中處理任務的速度時,CachedThreadPool會不斷創建新線程。極端情況下,CachedThreadPool會因為創建過多線程而耗盡CPU資源。其運行圖如下:

 

執行過程如下:

1.首先執行SynchronousQueue.offer(Runnable task)。如果在當前的線程池中有空閑的線程正在執行SynchronousQueue.poll(),那么主線程執行的offer操作與空閑線程執行的poll操作配對成功,主線程把任務交給空閑線程執行。,execute()方法執行成功,否則執行步驟2

2.當線程池為空(初始maximumPool為空)或沒有空閑線程時,配對失敗,將沒有線程執行SynchronousQueue.poll操作。這種情況下,線程池會創建一個新的線程執行任務。

3.在創建完新的線程以后,將會執行poll操作。當步驟2的線程執行完成后,將等待60秒,如果此時主線程提交了一個新任務,那么這個空閑線程將執行新任務,否則被回收。因此長時間不提交任務的CachedThreadPool不會占用系統資源。

SynchronousQueue是一個不存儲元素阻塞隊列,每次要進行offer操作時必須等待poll操作,否則不能繼續添加元素。

 

參考書籍:《Java並發編程的藝術》,《Java並發編程實戰》,《Java高並發程序設計》

更多內容:http://www.cnblogs.com/study-everyday/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM