FutureTask解析(轉)


站在使用者的角度,future是一個經常在多線程環境下使用的Runnable,使用它的好處有兩個:
1. 線程執行結果帶有返回值
2. 提供了一個線程超時的功能,超過超時時間拋出異常后返回。

那,怎么實現future這種超時控制呢?來看看代碼:

F1

FutureTask的實現只是依賴了一個內部類Sync實現的,Sync是AQS (AbstractQueuedSynchronizer)的子類,這個類承擔了所有future的功能,AbstractQueuedSynchronizer的作者是大名鼎鼎的並發編程大師Doug Lea,它的作用遠遠不止實現一個Future這么簡單,后面在說。

下面,我們從一個future提交到線程池開始,直到future超時或者執行結束來看看future都做了些什么。怎么做的。
首先,向線程池ThreadPoolExecutor提交一個future:

F2

ThreadPoolExecutor將提交的任務用FutureTask包裝一下:

F3

F4

然后嘗試將包裝后的Future用Thread類包裝下后啟動,

紅色標記的地方表示,當當前線程池的大小小於corePoolSize時,將任務提交,否則將該任務加入到workQueue中去,如果workQueue裝滿了,則嘗試在線程數小於MaxPoolSize的條件下提交該任務。

F5

順便說明下,我們使用線程池時,常常看到有關有界隊列,無界隊列作為工作隊列的字眼:使用無界隊列時,線程池的大小永遠不大於corePoolSize,使用有界隊列時的maxPoolSize才有效,原因就在這里,如果是
無界隊列,紅框中的add永遠為true 下方的addIfUnderMaximumPoolSize怎么也走不到了,也就不會有線程數量大於MaxPoolSize的情況。

言歸正傳,看看addIfUnderCorePoolSize 中做了什么事:
new了一個Thread,將我們提交的任務包裝下后就直接啟動了

F6

我們知道,線程的start方法會調用我們runnable接口的run方法,因此不難猜測FutureTask也是實現了Runnable接口的

F7

F8

FutureTask的run()方法中是這么寫:

F9

innerRun方法先使用原子方式更改了一下自己的一個標志位state(用於標示任務的執行情況)
然后紅色框的方法 實現回調函數call的調用,並且將返回值作為參數傳遞下去,放置在一個叫做result的泛型變量中,
然后future只管等待一段時間后去拿result這個變量的值就可以了。 至於怎么實現的“等待一段時間再去拿” 后面馬上說明。

F10

innerSet在經過一系列的狀態判斷后,最終將V這個call方法返回的值賦值給了result

F11

說到這里,我們知道,future是通過將call方法的返回值放在一個叫做result的變量中,經過一段時間的等待后再去拿出來返回就可以了。

怎么實現這個 “等一段時間”呢?

要從Sync的父類AbstractQueuedSynchronizer這個類說起:

我們知道AbstractQueuedSynchronizer 后者的中文名字叫做 同步器,顧名思義,是用來控制資源占用的一種方式。對於FutureTask來說,“資源”就是result,線程執行的結果。思路就是通過控制對result這個資源的訪問來決定是否需要馬上去取得result這個結果,當超時時間未到,或者線程未執行結束時,是不能去取result的。當線程正常執行結束后,一系列的標志位會被修改,並告訴等待future執行結果的各個線程,可以來獲取result了。

這里會涉及到 獨占鎖和共享鎖的概念。

獨占鎖:同一時間只有一個線程獲取鎖。再有線程嘗試加鎖,將失敗。 典型例子 reentrantLock
共享鎖:同一時間可以有多個線程獲取鎖。 典型例子,本例中的FutureTask

為什么說他們?因為Sync本質上就是想完成一個共享鎖的功能,所以Sync繼承了AbstractQueuedSynchronizer 所以Sync的方法使用的是AbstractQueuedSynchronizer的共享鎖的API

首先,我們明白,future結束有兩種狀態:
1. 線程正常執行完畢,通知等待結果的主線程對應於future.get()方法。
2. 線程還未執行完畢,等待結果的主線程已經等不到了(超時),拋出一個TimeOutException后不再等待。對應於future.get(long timeout, TimeUnit unit)

下面我們依次看看對於這兩種狀態,我們是怎么處理的:
從上圖中可以得知,線程在執行完畢后會將執行的結果放到result中, 紅色框中同時提到了releaseShared 方法,我們從這里進入AbstractQueuedSynchronizer

F12

當result已經被賦值,或者FutureTask為cancel狀態時,FutureTask會嘗試去釋放共享鎖(可以同時有多個線程調用future.get() 方法,也就是會有多個線程在等待future執行結果,而furue在執行完畢后會依次喚醒各個線程)
如果嘗試成功,則開始真正的釋放鎖,這里是AbstractQueuedSynchronizer 比較精妙的地方, “嘗試”動作都定義為抽象方法,交個各個子類去定義“嘗試成功的含義” 而真正的釋放則自己實現,這種復雜規則交個子類,流程交給自己的思路很值得借鑒。

F13

再看FutureTask的 “嘗試釋放”的規則:

沒啥好說,怎么嘗試都成功

F14

接着AbstractQueuedSynchronizer 開始了真正的釋放喚醒工作:

private void doReleaseShared() {
  /*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/ for (;;) {
      Node h = head; //把頭元素取出來,保持頭元素的引用,防止head被更改      if (h != null && h != tail) {
         int ws = h.waitStatus;
         if (ws == Node.SIGNAL) { //如果狀態位為:需要一個信號去喚醒 注釋原話:/** waitStatus value to                  indicate successor's thread needs unparking */         if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) //修改狀態位              continue ; // loop to recheck cases         unparkSuccessor(h); //如果修改成功,則通過頭元素找到一個線程,並且喚醒它(喚醒動作是通過JNI方法去調用的)         }
        else if (ws == 0 &&
              !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE))
        continue ; // loop on failed CAS      }
      if (h == head) // loop if head changed          break ;
    }
}

循環遍歷后,知道已經沒有結點需要喚醒則返回,依次return后,future的run方法執行完畢。

以上是針對future線程的,我們知道,FutureTask已經將執行結果放在了result中,並且按等的先后順序依喚醒了等待隊列上的線程。
那,猜測future.get方法就不難了,對於帶超時的get方法:最大的可能性就是不斷的檢查future的一個狀態位,看它是否執行完畢,執行完則獲取結果返回,否則,再阻塞自己一段時間。
對於不待超時的,就上來就先嘗試獲取結果,拿不到就阻塞自己,直到上述的innerSet方法喚醒它。
究竟是不是這樣呢?一起來看看:

因為innerGet(long nanosTimeout) 和innerGet()流程大致相同,所以我們重點講解innerGet(long nanosTimeout) ,在唯一一個有區別的地方說明下即可。

如下圖所示,對於innerGet(long nanosTimeout) 方法,FutureTask采用的方法是直接加鎖或者每隔一段時間嘗試加鎖,如果成功,則返回true,則如上圖所示,直接返回result,主線程拿到執行結果。
否則,拋出超時異常。

對於tryAcquireShared 方法,比較簡單,直接看future是否執行完畢

如果沒有結束,則進入doAcquireSharedNanos方法:

private boolean doAcquireSharedNanos( int arg, long nanosTimeout) throws InterruptedException {
  
     long lastTime = System.nanoTime();
     final Node node = addWaiter(Node.SHARED); //在隊列尾部增加一個結點,我的理解是,用來標明這個隊列是共享者隊列還是獨占隊列     try {
         for (;;) {
             final Node p = node.predecessor(); //拿出剛才新增結點的前一個結點:實際有效的隊尾結點。             if (p == head) {
                 int r = tryAcquireShared(arg); //嘗試獲取鎖。                 if (r >= 0 ) { //                     setHeadAndPropagate(node, r); //返回值大於1 對於FutureTask代表任務已經被cancel了,則更改隊列頭部結點。                 p.next = null ; // help GC 將p結點脫離隊列,幫助GC             return true ; //返回true后 上述中可以知道當前線成會拋出超時異常 確定下會不會喚醒其他節點?         }
         }
         if (nanosTimeout <= 0 ) { //如果設置的超時時間小於等於0 則取消獲取鎖 cancelAcquire(node); return             false; } if (nanosTimeout > spinForTimeoutThreshold && //等待的時間必須大於一個自旋鎖的周期時間             shouldParkAfterFailedAcquire(p, node)) // 遍歷隊列,找到需要沉睡的第一個節點             LockSupport.parkNanos( this , nanosTimeout); // 調用JNI方法,沉睡當前線程             long now = System.nanoTime();
             nanosTimeout -= now - lastTime; // 更新等待時間 循環遍歷             lastTime = now;
             if (Thread.interrupted())
                 break ;
         }
     } catch (RuntimeException ex) {
         cancelAcquire(node);
         throw ex;
     }
         // Arrive here only if interrupted         cancelAcquire(node);
         throw new InterruptedException();
     }

這樣通過AQS的協作,所有調用future.get(long timeout, TimeUnit unit)的線程都會按順序等待,直到線成執行完被喚醒或者超時時間到 主動拋出異常。

總結

至此為止FutureTask的解析已經基本結束了,可以看到。它依靠AQS的共享鎖實現了對線程執行結果的訪問控制。和我們通常意義上的訪問控制(並發訪問某個資源,獲取失敗時,沉睡自己等待喚醒或者超時后返回)基本是一致的,不外乎維護了一個等待資源的列表。將等待資源的線程通過鏈表的方式串了起來。

當然AQS的功能遠不僅如此,它還提供了一套獨占鎖的API,幫助使用者實現獨占鎖的功能。
最常用的Reentrantlock就是使用這套API做的。
有機會的話再和大家分享下它的實現。

http://www.liuinsect.com/2014/02/17/futuretask-%e6%ba%90%e7%a0%81%e8%a7%a3%e6%9e%90/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM