前言
在前面的兩篇博文中,已經介紹利用FutureTask任務的執行流程,以及利用其實現的cancel方法取消任務的情況。本篇就來介紹下,線程任務的結果獲取。
系列目錄
- 揭開Future的神秘面紗——任務取消
- 揭開Future的神秘面紗——任務執行
- 揭開Future的神秘面紗——結果獲取
利用get方法獲取程序運行結果
我們知道利用Future接口的最重要的操作就是要獲取任務的結果,而此操作對應的方法就是get。但是問題來了,如果我調用get方法的時候,任務還沒有完成呢?答案就是,等它完成,當前線程將被阻塞,直到任務完成(注意,這里說的完成,指的是任務結束,因為異常而結束也算),get方法返回。主線程(不是執行任務的線程)才被喚醒,然后繼續運行。
靈活的get方法(帶超時時間的get)
有人可能會問,如果我調用get方法的時候,任務離完成還需要很長時間,那么我主線程不是會浪費一些時間?是的,如果主線程比較忙的話,這樣確實主線程的效率。不過還有一個有參的get方法,此方法以等待時長為參數,如果時長結束,任務還沒完成,主線程將繼續執行,然后會在之后的某個時間再來獲取任務結果。(當然如果主線程依賴這個任務結果才能繼續執行,那么只能老老實實地等了)
FutureTask的阻塞模型
要想了解get方法的具體實現,必須先弄清楚,它是如何阻塞的。前篇博文已經提到,FutureTask有類型為WaitNode字段waiters,實際上這個waiters引用的是一個以WaitNode為節點的單向鏈表的頭節點。如圖所示:
waitNode類代碼如下:
static final class WaitNode { volatile Thread thread; //線程 volatile WaitNode next; //下一個節點 //構造函數獲取當前執行線程的引用 WaitNode() { thread = Thread.currentThread(); } }
WaitNode保留線程引用的作用是什么?
答案是用於任務完成后喚醒等待線程。當FutureTask執行完callable的run方法后,將執行finishCompletion方法通知所有等待線程
private void finishCompletion() { //遍歷等待節點 for (WaitNode q; (q = waiters) != null;) { //將FutureTask的waiters引用置null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //喚醒所有等待線程 for (;;) { //取出節點對應的線程 Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); //喚醒對應線程 } //獲取下一個節點 WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //調用鈎子函數done,此為空方法,子類可根據需求進行實現 done(); callable = null; }
線程的阻塞方式——park和unPark
park/unPark也是用於控制線程等待狀態的。我們熟悉的,用於控制線程等待狀態的還有wait/notify。wait/notify是某個對象的條件隊列,要阻塞線程,或者說要加入等待隊列,必須先獲取對象的鎖。
與wait()/notify不同的是,park和unpark直接操作線程,無需獲取對象的鎖,個人認為這是這里使用park/unPark,而不是wait/notifyAll的原因,因為獲取鎖需要額外的開銷。
get方法的具體實現
以下是FutureTask中get方法的實現
public V get() throws InterruptedException, ExecutionException { //獲取當前任務狀態 int s = state; //如果是NEW或者COMPLETING,也就是還沒有結束,就調用awaitDone進行阻塞 if (s <= COMPLETING) s = awaitDone(false, 0L); //注意,這里的參數,表示非超時等待,如果任務未結束,程序將一直卡在這里 //如果awaitDone返回,也就是任務已經結束,根據任務狀態,返回結果 return report(s); }
以下是get方法中調用到的awaitDone的實現
private int awaitDone(boolean timed, long nanos) throws InterruptedException { //根據超時時間,計算結束時間點 final long deadline = timed ? System.nanoTime() + nanos : 0L; //等待節點 WaitNode q = null; //是否加入等待隊列 boolean queued = false; //這里並不是通過自旋,使方法無法返回。而是利用自旋CAS, 改變狀態。如果成功,一次就夠了 for (;;) { //如果此線程被中斷,把從節點從等待隊列中移除 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; //如果狀態大於COMPLETING,也就是任務已結束,返回任務狀態 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); //第一次循環,q是null,創建節點 else if (q == null) q = new WaitNode(); //如果還未加入等待隊列,就加入。加入等待隊列的目的是,當任務完成的時候能夠通過句柄及時喚醒正在等待的線程。注意:加入隊列的時候,還沒有掛起。 else if (!queued) //q.next = waiters 表達式的返回值 是左側的值,也就是waiters //意思是,如果當前對象的waiters的值是waiters, 就將他賦值為q queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //如果是超時等待,則調用parkNanos, 線程將在指定時間后被喚醒。目的是先掛起線程,時間到了再喚醒出來,此時還在for循環中,將再次執行符合條件的if塊 else if (timed) { nanos = deadline - System.nanoTime();
//程序被喚醒后,通過這里跳出循環 if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } //如果不是超時等待,且已經加入等待隊列,這時候利用park將當前線程掛起 else LockSupport.park(this); } }
很多人可能會覺得這個循環體,看着有點迷糊,我剛開始也看得頭大。但是我們可以根據幾種情境,來查看這幾種情境下代碼的執行情況。
注:第二個for循環內,第二個if-else塊是一個大塊,每次只執行一個。
幾種執行情境
一、當前線程成功加入等待隊列,且被阻塞,一段時間后任務完成,線程被喚醒
二、當前線程加入隊列后,還沒被阻塞,任務就已經完成了
三、因為其他線程加入等待隊列的影響,當前線程未能加入等待隊列
這里說明一下,如果其他線程在此線程之前,比較接近的時間,加入了等待隊列,由於內存可見性的原因,當前線程看到的waiters值沒有及時改變,故與其實際值不同,CAS操作就將失敗。
為什么一定要CAS成功?答案是,如果不成功,出現線程安全問題,鏈表的結構就會一塌糊塗。這里不細談。
根據任務狀態獲取結果
我們已經知道,FutureTask有一個Object字段的outcome,也就是任務執行的結果。當任務完成后,會將結果賦值給它。以下是FutureTask的run方法:
public void run() { //任務開始執行后,設置FutureTask的runner字段,指明執行它的線程 if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread())) return; try { //獲取具體任務 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; //任務是否已被運行完 try { //運行任務 result = c.call(); ran = true; } catch (Throwable ex) { result = null; //如果運行任務過程中出現異常,則ran=false 表示沒有運行完成 ran = false; //設置異常 => 將任務狀態設置為異常,並將異常信息賦值給outcome, 也就是任務結果 //這個方法會調用finishCompletion setException(ex); } //如果運行完成,把結果賦值給outcome if (ran) set(result); //這個方法會調用finishCompletion } } finally { //既然線程已經"完成"當前任務,就放棄引用,防止影響它執行其他任務 runner = null; //重新獲取任務狀態 int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
由前文可知,當任務"完成"的時候,獲取結果的線程將被喚醒。回到get方法,它將獲取到任務的狀態,並根據任務狀態獲取結果。也就是report方法:
private V report(int s) throws ExecutionException { //獲取結果 Object x = outcome; //如果任務正常完成 if (s == NORMAL) //強制轉換為對應類型並返回 return (V)x; //如果任務狀態為CANCELLED、INTERRUPTING、INTERRUPTED表明是通過cacel方法取消了 //返回已取消異常 if (s >= CANCELLED) throw new CancellationException(); //如果是因為異常中斷的話,拋出具體異常信息 throw new ExecutionException((Throwable)x); }