站在使用者的角度,future是一個經常在多線程環境下使用的Runnable,使用它的好處有兩個:
1. 線程執行結果帶有返回值
2. 提供了一個線程超時的功能,超過超時時間拋出異常后返回。
那,怎么實現future這種超時控制呢?來看看代碼:
FutureTask的實現只是依賴了一個內部類Sync實現的,Sync是AQS (AbstractQueuedSynchronizer)的子類,這個類承擔了所有future的功能,AbstractQueuedSynchronizer的作者是大名鼎鼎的並發編程大師Doug Lea,它的作用遠遠不止實現一個Future這么簡單,后面在說。
下面,我們從一個future提交到線程池開始,直到future超時或者執行結束來看看future都做了些什么。怎么做的。
首先,向線程池ThreadPoolExecutor提交一個future:
ThreadPoolExecutor將提交的任務用FutureTask包裝一下:
然后嘗試將包裝后的Future用Thread類包裝下后啟動,
紅色標記的地方表示,當當前線程池的大小小於corePoolSize時,將任務提交,否則將該任務加入到workQueue中去,如果workQueue裝滿了,則嘗試在線程數小於MaxPoolSize的條件下提交該任務。
順便說明下,我們使用線程池時,常常看到有關有界隊列,無界隊列作為工作隊列的字眼:使用無界隊列時,線程池的大小永遠不大於corePoolSize,使用有界隊列時的maxPoolSize才有效,原因就在這里,如果是
無界隊列,紅框中的add永遠為true 下方的addIfUnderMaximumPoolSize怎么也走不到了,也就不會有線程數量大於MaxPoolSize的情況。
言歸正傳,看看addIfUnderCorePoolSize 中做了什么事:
new了一個Thread,將我們提交的任務包裝下后就直接啟動了
我們知道,線程的start方法會調用我們runnable接口的run方法,因此不難猜測FutureTask也是實現了Runnable接口的
FutureTask的run()方法中是這么寫:
innerRun方法先使用原子方式更改了一下自己的一個標志位state(用於標示任務的執行情況)
然后紅色框的方法 實現回調函數call的調用,並且將返回值作為參數傳遞下去,放置在一個叫做result的泛型變量中,
然后future只管等待一段時間后去拿result這個變量的值就可以了。 至於怎么實現的“等待一段時間再去拿” 后面馬上說明。
innerSet在經過一系列的狀態判斷后,最終將V這個call方法返回的值賦值給了result
說到這里,我們知道,future是通過將call方法的返回值放在一個叫做result的變量中,經過一段時間的等待后再去拿出來返回就可以了。
怎么實現這個 “等一段時間”呢?
要從Sync的父類AbstractQueuedSynchronizer這個類說起:
我們知道AbstractQueuedSynchronizer 后者的中文名字叫做 同步器,顧名思義,是用來控制資源占用的一種方式。對於FutureTask來說,“資源”就是result,線程執行的結果。思路就是通過控制對result這個資源的訪問來決定是否需要馬上去取得result這個結果,當超時時間未到,或者線程未執行結束時,是不能去取result的。當線程正常執行結束后,一系列的標志位會被修改,並告訴等待future執行結果的各個線程,可以來獲取result了。
這里會涉及到 獨占鎖和共享鎖的概念。
獨占鎖:同一時間只有一個線程獲取鎖。再有線程嘗試加鎖,將失敗。 典型例子 reentrantLock
共享鎖:同一時間可以有多個線程獲取鎖。 典型例子,本例中的FutureTask
為什么說他們?因為Sync本質上就是想完成一個共享鎖的功能,所以Sync繼承了AbstractQueuedSynchronizer 所以Sync的方法使用的是AbstractQueuedSynchronizer的共享鎖的API
首先,我們明白,future結束有兩種狀態:
1. 線程正常執行完畢,通知等待結果的主線程對應於future.get()方法。
2. 線程還未執行完畢,等待結果的主線程已經等不到了(超時),拋出一個TimeOutException后不再等待。對應於future.get(long timeout, TimeUnit unit)
下面我們依次看看對於這兩種狀態,我們是怎么處理的:
從上圖中可以得知,線程在執行完畢后會將執行的結果放到result中, 紅色框中同時提到了releaseShared 方法,我們從這里進入AbstractQueuedSynchronizer
當result已經被賦值,或者FutureTask為cancel狀態時,FutureTask會嘗試去釋放共享鎖(可以同時有多個線程調用future.get() 方法,也就是會有多個線程在等待future執行結果,而furue在執行完畢后會依次喚醒各個線程)
如果嘗試成功,則開始真正的釋放鎖,這里是AbstractQueuedSynchronizer 比較精妙的地方, “嘗試”動作都定義為抽象方法,交個各個子類去定義“嘗試成功的含義” 而真正的釋放則自己實現,這種復雜規則交個子類,流程交給自己的思路很值得借鑒。
再看FutureTask的 “嘗試釋放”的規則:
沒啥好說,怎么嘗試都成功
接着AbstractQueuedSynchronizer 開始了真正的釋放喚醒工作:
private
void
doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for
(;;) {
Node h = head;
//把頭元素取出來,保持頭元素的引用,防止head被更改
if
(h !=
null
&& h != tail) {
int
ws = h.waitStatus;
if
(ws == Node.SIGNAL) {
//如果狀態位為:需要一個信號去喚醒 注釋原話:/** waitStatus value to indicate successor's thread needs unparking */
if
(!compareAndSetWaitStatus(h, Node.SIGNAL,
0
))
//修改狀態位
continue
;
// loop to recheck cases
unparkSuccessor(h);
//如果修改成功,則通過頭元素找到一個線程,並且喚醒它(喚醒動作是通過JNI方法去調用的)
}
else
if
(ws ==
0
&&
!compareAndSetWaitStatus(h,
0
, Node.PROPAGATE))
continue
;
// loop on failed CAS
}
if
(h == head)
// loop if head changed
break
;
}
}
|
循環遍歷后,知道已經沒有結點需要喚醒則返回,依次return后,future的run方法執行完畢。
以上是針對future線程的,我們知道,FutureTask已經將執行結果放在了result中,並且按等的先后順序依喚醒了等待隊列上的線程。
那,猜測future.get方法就不難了,對於帶超時的get方法:最大的可能性就是不斷的檢查future的一個狀態位,看它是否執行完畢,執行完則獲取結果返回,否則,再阻塞自己一段時間。
對於不待超時的,就上來就先嘗試獲取結果,拿不到就阻塞自己,直到上述的innerSet方法喚醒它。
究竟是不是這樣呢?一起來看看:
因為innerGet(long nanosTimeout) 和innerGet()流程大致相同,所以我們重點講解innerGet(long nanosTimeout) ,在唯一一個有區別的地方說明下即可。
如下圖所示,對於innerGet(long nanosTimeout) 方法,FutureTask采用的方法是直接加鎖或者每隔一段時間嘗試加鎖,如果成功,則返回true,則如上圖所示,直接返回result,主線程拿到執行結果。
否則,拋出超時異常。
對於tryAcquireShared 方法,比較簡單,直接看future是否執行完畢
如果沒有結束,則進入doAcquireSharedNanos方法:
private
boolean
doAcquireSharedNanos(
int
arg,
long
nanosTimeout)
throws
InterruptedException {
long
lastTime = System.nanoTime();
final
Node node = addWaiter(Node.SHARED);
//在隊列尾部增加一個結點,我的理解是,用來標明這個隊列是共享者隊列還是獨占隊列
try
{
for
(;;) {
final
Node p = node.predecessor();
//拿出剛才新增結點的前一個結點:實際有效的隊尾結點。
if
(p == head) {
int
r = tryAcquireShared(arg);
//嘗試獲取鎖。
if
(r >=
0
) {
//
setHeadAndPropagate(node, r);
//返回值大於1 對於FutureTask代表任務已經被cancel了,則更改隊列頭部結點。
p.next =
null
;
// help GC 將p結點脫離隊列,幫助GC
return
true
;
//返回true后 上述中可以知道當前線成會拋出超時異常 確定下會不會喚醒其他節點?
}
}
if
(nanosTimeout <=
0
) {
//如果設置的超時時間小於等於0 則取消獲取鎖 cancelAcquire(node); return false; } if (nanosTimeout > spinForTimeoutThreshold && //等待的時間必須大於一個自旋鎖的周期時間
shouldParkAfterFailedAcquire(p, node))
// 遍歷隊列,找到需要沉睡的第一個節點
LockSupport.parkNanos(
this
, nanosTimeout);
// 調用JNI方法,沉睡當前線程
long
now = System.nanoTime();
nanosTimeout -= now - lastTime;
// 更新等待時間 循環遍歷
lastTime = now;
if
(Thread.interrupted())
break
;
}
}
catch
(RuntimeException ex) {
cancelAcquire(node);
throw
ex;
}
// Arrive here only if interrupted
cancelAcquire(node);
throw
new
InterruptedException();
}
|
這樣通過AQS的協作,所有調用future.get(long timeout, TimeUnit unit)的線程都會按順序等待,直到線成執行完被喚醒或者超時時間到 主動拋出異常。
總結
至此為止FutureTask的解析已經基本結束了,可以看到。它依靠AQS的共享鎖實現了對線程執行結果的訪問控制。和我們通常意義上的訪問控制(並發訪問某個資源,獲取失敗時,沉睡自己等待喚醒或者超時后返回)基本是一致的,不外乎維護了一個等待資源的列表。將等待資源的線程通過鏈表的方式串了起來。
當然AQS的功能遠不僅如此,它還提供了一套獨占鎖的API,幫助使用者實現獨占鎖的功能。
最常用的Reentrantlock就是使用這套API做的。
有機會的話再和大家分享下它的實現。
http://www.liuinsect.com/2014/02/17/futuretask-%e6%ba%90%e7%a0%81%e8%a7%a3%e6%9e%90/