1. 背景與簡介
Future是Java執行異步任務時的常用接口。我們通常會往ExecutorService中提交一個Callable/Runnable並得到一個Future對象,Future對象表示異步計算的結果,支持獲取結果,取消計算等操作。在Java提供的Executor框架中,Future的默認實現為java.util.concurrent.FutureTask。本文針對FutureTask的源碼進行分析與解讀。
可以看到,FutureTask實現了RunnableFuture, 而RunnableFuture的JavaDoc對Runnable接口的run方法有了更精確的描述:run方法將該Future設置為計算的結果,除非計算被取消。
2. 源碼解讀
下面開始針對FutureTask的實現源碼進行解讀。
2.1 生命周期狀態
FutureTask內置一個被volatile修飾的state變量。
按照生命周期的階段可以分為:
- NEW 初始狀態
- COMPLETING 任務已經執行完(正常或者異常),准備賦值結果
- NORMAL 任務已經正常執行完,並已將任務返回值賦值到結果
- EXCEPTIONAL 任務執行失敗,並將異常賦值到結果
- CANCELLED 取消
- INTERRUPTING 准備嘗試中斷執行任務的線程
- INTERRUPTED 對執行任務的線程進行中斷(未必中斷到)
這里先給出自制的狀態流轉圖。
可以看到NEW為起始狀態,而NORMAL, EXCEPTIONAL, CANCELLED, INTERRUPTED這些狀態為終止狀態,而COMPLETING和INTERRUPTING為中間暫時狀態。
2.2 內部結構
- Callable callable
內部封裝的Callable對象。如果通過構造函數傳的是Runnable對象,FutureTask會通過調用Executors#callable,把Runnable對象封裝成一個callable。 - Object outcome
用於保存計算結果或者異常信息。 - volatile Thread runner
用來運行callable的線程。 - volatile WaitNode waiters
FutureTask中用了Trieber Stack來保存等待的線程。
2.3 run方法
public void run() {
/*
* state為NEW且對runner變量CAS成功。
* 對state的判斷寫在前面,是一種優化。
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
/*
* 是否成功運行。
* 之所以用了這樣一個標志位,而不是把set方法寫在try中call調用的后一句,
* 是為了不想捕獲set方法出現的異常。
*
* 舉例來說,子類覆蓋了FutureTask的done方法,
* set -> finishCompletion -> done會拋出異常,
* 然而實際上提交的任務是有正常的結果的。
*/
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
/*
*
* 要清楚,即便在runner被清為null后,仍然有可能有線程會進入到run方法的外層try塊。
* 舉例:線程A和B都在執行第一行的if語句讀到state == NEW,線程A成功cas了runner,並執行到此處。
* 在此過程中線程B都沒拿到CPU時間片。此時線程B一旦拿到時間片就能進到外層try塊。
*
* 為了避免線程B重復執行任務,必須在set/setException方法被調用,才能把runner清為null。
* 這時候其他線程即便進入到了外層try塊,也一定能夠讀到state != NEW,從而避免任務重復執行。
*/
runner = null;
/*
* 因為任務執行過程中由於cancel方法的調用,狀態為INTERRUPTING,
* 令當前線程等待INTERRUPTING狀態變為INTERRUPTED。
* 這是為了不想讓中斷操作逃逸出run方法以至於線程在執行后續操作時被中斷。
*/
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
// 通過CAS狀態來確認計算沒有被取消,結果也沒有被設置過。
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
// 通過CAS狀態來確認計算沒有被取消,結果也沒有被設置過。
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
// 必須將棧頂CAS為null,否則重讀棧頂並重試。
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 遍歷並喚醒棧中節點對應的線程。
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
// 將next域置為null,這樣對GC友好。
q.next = null;
q = next;
}
break;
}
}
/*
* done方法是暴露給子類的一個鈎子方法。
*
* 這個方法在ExecutorCompletionService.QueueingFuture中的override實現是把結果加到阻塞隊列里。
* CompletionService誰用誰知道,奧秘全在這。
*/
done();
/*
* callable置為null主要為了減少內存開銷,
* 更多可以了解JVM memory footprint相關資料。
*/
callable = null;
}
private void handlePossibleCancellationInterrupt(int s) {
/*
* 這里的主要目的就是等調用cancel方法的線程完成中斷。
*
* 以防止中斷操作逃逸出run或者runAndReset方法,影響后續操作。
*
* 實際上,當前調用cancel方法的線程不一定能夠中斷到本線程。
* 有可能cancel方法里讀到runner是null,甚至有可能是其它並發調用run/runAndReset方法的線程。
* 但是也沒辦法判斷另一個線程在cancel方法中讀到的runner到底是什么,所以索性自旋讓出CPU時間片也沒事。
*/
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();
/*
* 下面的代碼在JDK8中已經被注釋掉了。
* 因為在原來的設計中,是想把cancel方法設置的中斷位給清除的。
* 但是實際上也應該允許調用FutureTask的線程使用中斷作為線程間通信的機制,
* 這里沒辦法區分中斷位到底是不是來自cancel方法的調用。
*/
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
2.4 get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
// NEW或者COMPLETING。
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 完成賦值
if (s > COMPLETING) {
// 如果q已經被初始化了,為了GC需要清q.thread。
if (q != null)
q.thread = null;
return s;
}
// COMPLETING是一個很短暫的狀態,調用Thread.yield期望讓出時間片,之后重試循環。
else if (s == COMPLETING)
Thread.yield();
// 初始化節點,重試一次循環。
else if (q == null)
q = new WaitNode();
// queued記錄是否已經入棧,此處准備將節點壓棧。
else if (!queued)
/*
* 這是Treiber Stack算法入棧的邏輯。
* Treiber Stack是一個基於CAS的無鎖並發棧實現,
* 更多可以參考https://en.wikipedia.org/wiki/Treiber_Stack
*/
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果有時限,判斷是否超時,未超時則park剩下的時間。
else if (timed) {
nanos = deadline - System.nanoTime();
// 超時,移除棧中節點。
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
/**
* 清理用於保存等待線程棧里的節點。
* 所謂節點無效就是內部的thread為null,
* 一般有以下幾種情況:
* 1. 節點調用get超時。
* 2. 節點調用get中斷。
* 3. 節點調用get拿到task的狀態值(> COMPLETING)。
*
* 此方法干了兩件事情:
* 1. 置標記參數node的thread為null。
* 2. 清理棧中的無效節點。
*
* 如果在遍歷過程中發現有競爭則重新遍歷棧。
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// 如果當前節點仍有效,則置pred為當前節點,繼續遍歷。
if (q.thread != null)
pred = q;
/*
* 當前節點已無效且有前驅,則將前驅的后繼置為當前節點的后繼實現刪除節點。
* 如果前驅節點已無效,則重新遍歷waiters棧。
*/
else if (pred != null) {
pred.next = s;
if (pred.thread == null)
continue retry;
}
/*
* 當前節點已無效,且當前節點沒有前驅,則將棧頂置為當前節點的后繼。
* 失敗的話重新遍歷waiters棧。
*/
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
/**
* 導出結果。
*/
private V report(int s) throws ExecutionException {
Object x = outcome;
// 正常執行完計算任務。
if (s == NORMAL)
return (V)x;
// 取消。
if (s >= CANCELLED)
throw new CancellationException();
// 執行計算任務時發生異常。
throw new ExecutionException((Throwable)x);
}
2.5 cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
/*
* 在狀態還為NEW的時候,根據參數中的是否允許傳遞,
* 將狀態流轉到INTERRUPTING或者CANCELLED。
*/
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
// 中斷runner線程。
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 該方法上文已經分析過。
finishCompletion();
}
return true;
}
3. FutureTask存在的問題
至此已經將FutureTask的源碼解讀分析完畢,在讀過源碼之后,我個人認為JDK8u111的FutureTask源碼存在兩個問題,目前還需要進一步確認。
3.1 cancel(true)調用interrupt的線程對象
FutureTask的run方法的進入條件是
state == NEW && UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
假設有兩個線程A和B調用run方法,線程C調用cancel方法。
時刻1: 線程A和B同時讀到state == NEW。
時刻2: 線程A成功對runner變量CAS進入run方法主體。
時刻3: 線程C調用cancel方法,成功將狀態CAS為CANCELLED。
時刻4: 線程A調用finally中的runner = null。
時刻5: 線程B開始執行run方法第一句if的后半句,成功將runner變量CAS到線程B。
時刻6: 線程C讀到runner為線程B,准備對線程B進行interrupt()
時刻7: 線程A調用handlePossibleCancellationInterrupt等待狀態從INTERRUPTING流轉至INTERRUPTED。
時刻8: 線程B被中斷。
這里的問題是,調用cancel方法的線程C中斷的是實質上沒有對callable進行call調用的線程B,而線程A還試圖防止中斷操作逃逸出run方法。
這個東西在Future的JavaDoc上說了很含糊,如下所示:
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
上面的情況到底線程A和B哪個算是the thread executing this task
說不清。
3.2 內存占用問題
通過閱讀源碼,發現FutureTask還是存在一個隱形的內存占用問題的,或者按照《Effective Java》上說的應該叫無意識的對象保留。
這個問題就是在FutureTask計算完成后,可能內部用於保存等待線程的棧留有一些已經無用的等待節點。
時刻1: 某線程調用get,已經入等待棧,此時waiters為該線程對應節點。
時刻2: 有大量線程通過調用get試圖獲取計算結果,get -> awaitDone方法中,經過兩輪循環都讀到狀態是NEW的話,此時它們節點已經被初始化過了,但還沒開始入隊。
時刻3: 有線程調用run方法,通過run -> set -> finishCompletion,將waiters置為null,並喚醒了已經入棧的那個線程。
時刻4: 調用awaitDone方法的那些線程再試圖入隊的話,后面循環會發現狀態已經是NORMAL了,但是waiters棧此時不為空,而且再也沒法被清掉了。
這樣下來,該FutureTask內部可能會留有一些的無效節點。具體會留多少實際上取決於那個瞬間有多少線程准備執行以及多少能夠成功CAS。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);