揭開Future的神秘面紗——結果獲取


前言

  在前面的兩篇博文中,已經介紹利用FutureTask任務的執行流程,以及利用其實現的cancel方法取消任務的情況。本篇就來介紹下,線程任務的結果獲取。

系列目錄

利用get方法獲取程序運行結果

  我們知道利用Future接口的最重要的操作就是要獲取任務的結果,而此操作對應的方法就是get。但是問題來了,如果我調用get方法的時候,任務還沒有完成呢?答案就是,等它完成,當前線程將被阻塞,直到任務完成(注意,這里說的完成,指的是任務結束,因為異常而結束也算),get方法返回。主線程(不是執行任務的線程)才被喚醒,然后繼續運行。

 

靈活的get方法(帶超時時間的get)

  有人可能會問,如果我調用get方法的時候,任務離完成還需要很長時間,那么我主線程不是會浪費一些時間?是的,如果主線程比較忙的話,這樣確實主線程的效率。不過還有一個有參的get方法,此方法以等待時長為參數,如果時長結束,任務還沒完成,主線程將繼續執行,然后會在之后的某個時間再來獲取任務結果。(當然如果主線程依賴這個任務結果才能繼續執行,那么只能老老實實地等了

FutureTask的阻塞模型

  要想了解get方法的具體實現,必須先弄清楚,它是如何阻塞的。前篇博文已經提到,FutureTask有類型為WaitNode字段waiters,實際上這個waiters引用的是一個以WaitNode為節點的單向鏈表的頭節點。如圖所示:

waitNode類代碼如下:

static final class WaitNode {
    volatile Thread thread; //線程
    volatile WaitNode next; //下一個節點
    //構造函數獲取當前執行線程的引用
    WaitNode() { thread = Thread.currentThread(); }
}

WaitNode保留線程引用的作用是什么?

答案是用於任務完成后喚醒等待線程。當FutureTask執行完callable的run方法后,將執行finishCompletion方法通知所有等待線程

private void finishCompletion() {
    //遍歷等待節點
    for (WaitNode q; (q = waiters) != null;) {
        //將FutureTask的waiters引用置null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { 
            //喚醒所有等待線程
            for (;;) {
                //取出節點對應的線程
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t); //喚醒對應線程
                }
                //獲取下一個節點
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    //調用鈎子函數done,此為空方法,子類可根據需求進行實現
    done();

    callable = null;       
}

線程的阻塞方式——park和unPark

  park/unPark也是用於控制線程等待狀態的。我們熟悉的,用於控制線程等待狀態的還有wait/notify。wait/notify是某個對象的條件隊列,要阻塞線程,或者說要加入等待隊列,必須先獲取對象的鎖。

       與wait()/notify不同的是,park和unpark直接操作線程,無需獲取對象的鎖,個人認為這是這里使用park/unPark,而不是wait/notifyAll的原因,因為獲取鎖需要額外的開銷

get方法的具體實現

以下是FutureTask中get方法的實現

public V get() throws InterruptedException, ExecutionException {
    //獲取當前任務狀態
    int s = state;
    //如果是NEW或者COMPLETING,也就是還沒有結束,就調用awaitDone進行阻塞
    if (s <= COMPLETING)
        s = awaitDone(false, 0L); //注意,這里的參數,表示非超時等待,如果任務未結束,程序將一直卡在這里
    //如果awaitDone返回,也就是任務已經結束,根據任務狀態,返回結果
    return report(s);
}

以下是get方法中調用到的awaitDone的實現

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    //根據超時時間,計算結束時間點
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    //等待節點
    WaitNode q = null;
    //是否加入等待隊列
    boolean queued = false;
    //這里並不是通過自旋,使方法無法返回。而是利用自旋CAS, 改變狀態。如果成功,一次就夠了
    for (;;) {
        //如果此線程被中斷,把從節點從等待隊列中移除
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        //如果狀態大於COMPLETING,也就是任務已結束,返回任務狀態
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //第一次循環,q是null,創建節點
        else if (q == null)
            q = new WaitNode();
        //如果還未加入等待隊列,就加入。加入等待隊列的目的是,當任務完成的時候能夠通過句柄及時喚醒正在等待的線程。注意:加入隊列的時候,還沒有掛起。
        else if (!queued)
            //q.next = waiters 表達式的返回值 是左側的值,也就是waiters
            //意思是,如果當前對象的waiters的值是waiters, 就將他賦值為q
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q); 
        //如果是超時等待,則調用parkNanos, 線程將在指定時間后被喚醒。目的是先掛起線程,時間到了再喚醒出來,此時還在for循環中,將再次執行符合條件的if塊
        else if (timed) {
            nanos = deadline - System.nanoTime();
//程序被喚醒后,通過這里跳出循環
if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } //如果不是超時等待,且已經加入等待隊列,這時候利用park將當前線程掛起 else LockSupport.park(this); } }

很多人可能會覺得這個循環體,看着有點迷糊,我剛開始也看得頭大。但是我們可以根據幾種情境,來查看這幾種情境下代碼的執行情況。

注:第二個for循環內,第二個if-else塊是一個大塊,每次只執行一個。

幾種執行情境

一、當前線程成功加入等待隊列,且被阻塞,一段時間后任務完成,線程被喚醒

 

 

二、當前線程加入隊列后,還沒被阻塞,任務就已經完成了

三、因為其他線程加入等待隊列的影響,當前線程未能加入等待隊列

這里說明一下,如果其他線程在此線程之前,比較接近的時間,加入了等待隊列,由於內存可見性的原因,當前線程看到的waiters值沒有及時改變,故與其實際值不同,CAS操作就將失敗。

為什么一定要CAS成功?答案是,如果不成功,出現線程安全問題,鏈表的結構就會一塌糊塗。這里不細談。

 

根據任務狀態獲取結果

   我們已經知道,FutureTask有一個Object字段的outcome,也就是任務執行的結果。當任務完成后,會將結果賦值給它。以下是FutureTask的run方法:

public void run() {
    //任務開始執行后,設置FutureTask的runner字段,指明執行它的線程
    if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))
        return;
    try {
        //獲取具體任務
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran; //任務是否已被運行完
            try {
                //運行任務
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                //如果運行任務過程中出現異常,則ran=false 表示沒有運行完成
                ran = false;
                //設置異常 => 將任務狀態設置為異常,並將異常信息賦值給outcome, 也就是任務結果
                //這個方法會調用finishCompletion
                setException(ex);
            }
            //如果運行完成,把結果賦值給outcome
            if (ran)
                set(result); //這個方法會調用finishCompletion
        }
    } finally {
        //既然線程已經"完成"當前任務,就放棄引用,防止影響它執行其他任務
        runner = null; 
        //重新獲取任務狀態
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

由前文可知,當任務"完成"的時候,獲取結果的線程將被喚醒。回到get方法,它將獲取到任務的狀態,並根據任務狀態獲取結果。也就是report方法:

private V report(int s) throws ExecutionException {
    //獲取結果
    Object x = outcome;
    //如果任務正常完成
    if (s == NORMAL)
        //強制轉換為對應類型並返回
        return (V)x;
    //如果任務狀態為CANCELLED、INTERRUPTING、INTERRUPTED表明是通過cacel方法取消了
    //返回已取消異常
    if (s >= CANCELLED)
        throw new CancellationException();
    //如果是因為異常中斷的話,拋出具體異常信息
    throw new ExecutionException((Throwable)x);
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM