本文部分摘自《Java 並發編程的藝術》
Excutor 框架
1. 兩級調度模型
在 HotSpot VM 的線程模型中,Java 線程被一對一映射為本地操作系統線程。在上層,Java 多線程程序通常應用分解成若干個任務,然后使用用戶級的調度器(Executor)將這些任務映射為固定數量的線程;在底層,操作系統內核將這些線程映射到硬件處理器。這種兩級調度模型的示意圖如圖所示:
從圖中可以看出,應用程序通過 Executor 框架控制上層調度,下層的調度則由操作系統內核控制
2. 框架結構
Executor 框架主要由三大部分組成:
-
任務
包括被執行任務需要實現的接口:Runnable 接口或 Callable 接口
-
任務的執行
包括任務執行機制的核心接口 Executor,以及繼承自 Executor 的 ExecutorService 接口。Executor 框架有兩個關鍵類實現了 ExecutorService 接口,分別是 ThreadPoolExecutor 和 ScheduleThreadPoolExecutor,它們都是線程池的實現類,可以執行被提交的任務
-
異步計算的結果
包括接口 Future 和實現 Future 接口的 FutureTask 類
3. 執行過程
主線程首先要創建實現 Runnable 或 Callable 接口的任務對象,可以使用工具類 Executors 把一個 Runnable 對象封裝為一個 Callable 對象
// 返回結果為 null
Executors.callable(Runnable task);
// 返回結果為 result
Executors.callable(Runnable task, T result);
然后把 Runnable 對象直接交給 ExecutorService 執行
ExecutorService.execute(Runnable command);
或者把 Runnable 對象或 Callbale 對象提交給 ExecutorService 執行
ExecutorService.submit(Runnable task);
ExecutorService.submit(Callable<T> task);
如果執行 ExecutorService.submit 方法,將會返回一個實現 Future 接口的對象 FutureTask。最后,主線程可以執行 FutureTask.get() 方法來等待任務執行完成,也可以執行 FutureTask.cancel(boolean mayInterruptIfRunning) 來取消此任務的執行
ThreadPoolExecutor
Executor 框架最核心的類是 ThreadPoolExecutor,它是線程池的實現類,有關介紹可以參考之前寫過的一篇文章
下面分別介紹三種 ThreadPoolExecutor
1. FixedThreadPool
FixedThreadPool 被稱為可重用固定線程數的線程池,下面是 FixedThreadPool 的源代碼實現
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被設置為創建 FixedThreadPool 時指定的參數 nThreads。當線程池中的線程數大於 corePoolSize 時,keepAliveTime 為多余的空閑線程等待新任務的最長時間,超過這個時間后多余的線程將被終止。這里把 keepAliveTime 設置為 0L,意味着多余的空閑線程會被立即終止
FixedThreadPool 的 execute() 運行示意圖如下所示
對上圖說明如下:
- 如果當前運行的線程少於 corePoolSize,則創建新線程來執行任務
- 線程池完成預熱之后(當前運行的線程數等於 corePoolSize),將任務加入 LinkedBlockingQueue
- 線程執行完 1 中的任務后,會在循環中反復從 LinkedBlockingQueue 獲取任務來執行
FixedThreadPool 使用無界隊列 LinkedBlockingQueue 作為線程池的工作隊列(隊列的容量為 Integer.MAX_VALUE),使用無界隊列作為工作隊列會對線程池帶來如下影響:當線程池中的線程數達到 corePoolSize 后,新任務將在無界隊列中等待,而無界隊列幾乎可以容納無限多的新任務,因此線程池中的線程數永遠不會超過 corePoolSize,因此 maximumPoolSize 就成了無效參數,keepAliveTime 也是無效參數,運行中的 FixThreadPool 不會拒絕任務
2. SingleThreadExecutor
SingleThreadExecutor 是使用單個 worker 線程的 Executor,下面是 SingleThreadExecutor 的源代碼實現
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor 的 corePoolSize 和 maximumPoolSize 被設置為 1,其他參數與 FixedThreadPool 相同。SingleThreadExecutor 使用無界隊列 LinkedBlockingQueue 作為線程池的工作隊列,其帶來的影響與 FixedThreadPool 相同,這里就不再贅述了
對上圖說明如下:
- 如果當前運行的線程數少於 corePoolSize(即線程池中無運行的線程),則創建一個新線程來執行任務
- 在線程池完成預熱之后(當前線程池中有一個運行的線程),將任務加入 LinkedBlockingQueue
- 線程執行完 1 中的任務后,會在一個無限循環中反復從 LinkedBlockingQueue 獲取任務來執行
3. CachedThreadPool
CachedThreadPool 是一個會根據需要創建新線程的線程池,下面是創建 CachedThreadPool 的源代碼
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool 的 corePoolSize 被設置為 0,即 corePool 為空。maximumPoolSize 被設置為 Integer.MAX_VALUE,即 maximumPool 是無界的。這里把 keepAliveTime 設置為 60L,意味着 CachedThreadPool 中的空閑線程等待新任務的最長時間為 60 秒,空閑線程超過 60 秒后將會被終止
CachedThreadPool 使用沒有容量的 SynchronousQueue 作為線程池的工作隊列,但 CachedThreadPool 的 maximumPool 是無界的。這意味着,如果主線程提交任務的速度高於 maximumPool 中線程處理任務的速度,CachedThreadPool 會不斷創建新線程。極端情況下,CachedThreadPool 會因為創建過多線程而耗盡 CPU 和內存資源
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 會把待調度的任務(ScheduledFutureTask)放到一個 DelayQueue 中。ScheduledFutureTask 主要包含三個成員變量
- long 型成員變量 time,表示這個任務將要被執行的具體時間
- long 型成員變量 sequenceNumber,表示這個任務被添加到 ScheduledThreadPoolExecutor 中的序號
- long 型成員變量 period,表示任務執行的間隔周期
DelayQueue 封裝了一個 PriorityQueue,這個 PriorityQueue 會對隊列中的 ScheduledFutureTask 進行排序。排序時,time 小的排在前面(時間早的任務將被先執行)。如果兩個 ScheduledFutureTask 的 time 相同,就比較 sequenceNumber,sequenceNumber 小的排在前面(如果兩個任務的執行時間相同,先提交的任務先執行)
下圖是 ScheduledThreadPoolExecutor 中的線程執行周期任務的過程
- 線程 1 從 DelayQueue 獲取已到期的 ScheduledFutureTask,到期任務是指 ScheduledFutureTask 的 time 大於等於當前時間
- 線程 1 執行這個 ScheduledFutureTask
- 線程 1 修改 ScheduledFutureTask 的 time 變量為下次將要被執行的時間
- 線程 1 把修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中
接下來我們看一下上圖中線程獲取任務的過程,源代碼如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll();
return x;
}
}
}
} finally {
lock.unlock();
}
}
獲取任務分為三大步驟:
- 獲取 Lock
- 獲取周期任務
- 如果 PriorityQueue 為空,當前線程到等待隊列中等待,否則執行下面的步驟
- 如果 PriorityQueue 的頭元素的 time 時間比當前時間大,到等待隊列等待 time 時間,否則執行下面的步驟
- 獲取 PriorityQueue 的頭元素,如果 PriorityQueue 不為空,則喚醒在等待隊列中等待的所有線程
- 釋放 Lock
ScheduledThreadPoolExecutor 在一個循環中執行步驟二,直到線程從 PriorityQueue 獲取到一個元素之后才會退出無限循環
最后我們再看把任務放入 DelayQueue 的過程,下面是源碼實現
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
q.offer(e);
if (first == null || e.compareTo(first) < 0) {
available.signalAll();
}
return true;
} finally {
lock.unlock();
}
}
添加任務分為三大步驟:
- 獲取 Lock
- 添加任務
- 向 PriorityQueue 添加任務
- 如果添加的任務是 PriorityQueue 的頭元素,喚醒在等待隊列中等待的所有線程
- 釋放 Lock
FutureTask
1. 簡介
Future 接口和實現 Future 接口的 FutureTask 類,代表異步計算的結果。FutureTask 除了實現 Future 接口外,還實現了 Runnable 接口。因此,FutureTask 可以交給 Executor 執行,也可以由調用線程直接執行 FutureTask.run()。根據 FutureTask.run() 方法被執行的時機,FutureTask可以處於下面三種狀態:
-
未啟動
FutureTask.run() 方法還沒有被執行之前,FutureTask 處於未啟動狀態,當創建一個 FutureTask,且沒有執行 FutureTask.run() 方法之前,這個 FutureTask 處於未啟動狀態
-
已啟動
FutureTask.run() 方法被執行的過程中,FutureTask 處於已啟動狀態
-
已完成
FutureTask.run() 方法執行完后正常結束,或被取消 FutureTask.cancel(…),或執行 FutureTask.run() 方法時拋出異常而結束,FutureTask 處於已完成狀態
下圖是 FutureTask 的狀態遷移圖
下圖是 get 方法和 cancel 方法的執行示意圖
- 當 FutureTask 處於未啟動或已啟動狀態時,執行 FutureTask.get() 方法將導致調用線程阻塞
- 當 FutureTask 處於已完成狀態時,執行 FutureTask.get() 方法將導致調用線程立即返回結果或拋出異常
- 當 FutureTask 處於未啟動狀態時,執行 FutureTask.cancel() 方法將導致此任務永遠不會被執行
- 當 FutureTask 處於已啟動狀態時,執行 FutureTask.cancel(true) 方法將以中斷執行此任務線程的方式來試圖停止任務
- 當 FutureTask 處於已啟動狀態時,執行 FutureTask.cancel(false) 方法將不會對正在執行此任務的線程產生影響(讓正在執行的任務運行完成)
- 當 FutureTask 處於已完成狀態時,執行 FutureTask.cancel(…) 方法將返回 false
2. 使用
可以把 FutureTask 交給 Executor 執行,也可以通過 ExecutorService.submit(...) 方法返回一個 FutureTask,然后執行 FutureTask.get() 方法或 FutureTask.cancel(...) 方法,還可以單獨使用 FutureTask
當一個線程需要等待另一個線程把某個任務執行完后它才能繼續執行,此時可以使用 FutureTask。假設有多個線程執行若干任務,每個任務最多只能被執行一次。當多個線程試圖同時執行同一個任務時,只允許一個線程執行任務,其他線程需要等待這個任務執行完后才能繼續執行
private final ConcurrentMap<Object, Future<String>> taskCache = new ConcurrentHashMap<>();
private String executionTask(final String taskName)
throws ExecutionException, InterruptedException {
while (true) {
Future<String> future = taskCache.get(taskName); // 1.1, 2.1
if (future == null) {
Callable<String> task = new Callable<String>() {
@Override
public String call() throws InterruptedException {
return taskName;
}
};
FutureTask<String> futureTask = new FutureTask<String>(task);
future = taskCache.putIfAbsent(taskName, futureTask); // 1.3
if (future == null) {
future = futureTask;
futureTask.run(); // 1.4 執行任務
}
}
try {
return future.get(); // 1.5, 2.2
} catch (CancellationException e) {
taskCache.remove(taskName, future);
}
}
}
上述代碼的執行示意圖如圖所示:
- 兩個線程試圖同時執行同一個任務,這里使用了線程安全的 ConcurrentHashMap 作為任務緩存可能到了注釋
- 兩個線程都執行到
// 1.1, 2.1
這行時,假設線程一首先得到 future,根據接下來的代碼可得知,線程一創建任務放入緩存,並執行,而線程二獲取線程一創建的任務,不需創建 - 兩個線程都在
// 1.5, 2.2
處等待結果,只有線程一執行完任務后,線程二才能從 future.get() 返回
3. 實現
FutureTask 的實現基於 AbstractQueuedSynchronizer(AQS)
FutureTask 聲明了一個內部私有的繼承 AQS 的子類 Sync,對 FutureTask 所有公有方法的調用都會委托給這個內部子類,FutureTask 的設計示意圖如下所示:
FutureTask.get() 方法會調用 AQS.acquireSharedInterruptibly(int arg) 方法,這個方法的執行過程如下:
- 調用 AQS.acquireSharedInterruptibly(int arg) 方法,該方法會回調在子類 Sync 中實現的 tryAcquireShared() 方法來判斷 acquire 操作是否可以成功。acquire 操作可以成功的條件為:state 為執行完成狀態 RAN 或已取消狀態 CANCELLED,且 runner 不為 null
- 如果成功,get() 方法立即返回,否則線程等待隊列中去等待其他線程執行 release 操作
- 當其他線程執行 release 操作(FutureTask.run() 或 FutureTask.cancel(…))喚醒當前線程后,當前線程再次執行 tryAcquireShared() 將返回正值 1,當前線程將離開線程等待隊列並喚醒它的后繼線程
- 最后返回計算的結果或拋出異常
FutureTask.run() 的執行過程如下:
- 執行在構造函數中指定的任務
- 以原子方式來更新同步狀態(調用 AQS.compareAndSetState(int expect,int update),設置 state 為執行完成狀態 RAN)。如果這個原子操作成功,就設置代表計算結果的變量 result 的值為 Callable.call() 的返回值,然后調用 AQS.releaseShared(int arg)
- AQS.releaseShared(int arg) 首先會回調在子類 Sync 中實現的 tryReleaseShared(arg) 來執行 release 操作(設置運行任務的線程 runner 為 null,然會返回 true),然后喚醒線程等待隊列中的第一個線程
- 調用 FutureTask.done()
當執行 FutureTask.get() 方法時,如果 FutureTask 不是處於執行完成狀態 RAN 或已取消狀態 CANCELLED,當前執行線程將到 AQS 的線程等待隊列中等待(見下圖的線程 A、B、C、D)。當某個線程執行 FutureTask.run() 方法或 FutureTask.cancel(...) 方法時,會喚醒線程等待隊列的第一個線程
假設開始時 FutureTask 處於未啟動狀態或已啟動狀態,等待隊列中已經有3個線程(A、B、C)在等待。此時,線程 D 執行 get() 方法將導致線程 D 也到等待隊列中去等待
當線程 E 執行 run() 方法時,會喚醒隊列中的第一個線程 A,線程 A 被喚醒后,首先把自己從隊列中刪除,然后喚醒它的后繼線程 B,最后線程 A 從 get() 方法返回。線程 B、C、D 重復 A 線程的處理流程。最終,在隊列中等待的所有線程都被級聯喚醒並從 get() 方法返回