【Java並發編程】Callable、Future和FutureTask的實現


啟動線程執行任務,如果需要在任務執行完畢之后得到任務執行結果,可以使用從Java 1.5開始提供的Callable和Future
下面就分析一下Callable、Future以及FutureTask的具體實現及使用方法
源碼分析基於JDK 1.7

一、Callable 與 Runnable

java.lang.Runnable是一個接口,只有一個run()方法

public interface Runnable {
    public abstract void run();
}

run()方法的返回值是void,故在執行完任務后無法返回任何結果

Callable是java.util.concurrent包下的,也是一個接口,也只有一個call()方法,類似於java.lang.Runnable的run()方法,實現Callable接口的類和實現Runnable接口的類都是可以被其它線程執行的任務

public interface Callable<V> {
    V call() throws Exception;
}

可以看到call()方法是有返回值的,可以將執行的結果返回

Callable和Runnable的區別:
1、Callable中定義的是call()方法,Runnable中定義的是run()方法
2、Callable中的call()方法可以返回執行任務后的結果,Runnable中的run()方法無法獲得返回值
3、Callable中的call()方法定義了throws Exception拋出異常,拋出的異常可以在主線程Future.get()時被主線程捕獲;Runnable中的run()方法沒有定義拋出異常,運行任務時發生異常時也會上拋,因為即使不加默認也會上拋RuntimeException,但異常無法被主線程獲取
4、運行Callable任務可以拿到一個Future對象代表異步運算的結果

二、Future

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future是java.util.concurrent包下的一個接口,代表着一個異步計算的結果,可以通過get()獲取線程執行的返回值,cancel()取消任務執行,isCancelled()isDone()獲得任務執行的情況

boolean cancel(boolean mayInterruptIfRunning)

嘗試取消任務的執行,取消成功返回true,取消失敗返回false
mayInterruptIfRunning表示是否允許中斷正在執行的任務
1、如果任務還未開始,cancel返回true,且任務永遠不會被執行
2、如果任務正在執行,根據mayInterruptIfRunning的值判斷是否需要中斷執行中的任務,且如果mayInterruptIfRunning為true,會調用中斷邏輯,返回true;如果mayInterruptIfRunning為false,不會調用線程中斷,只是將任務取消
3、如果任務結束(可能是正常完成、異常終止、被取消),返回false
4、如果cancel()操作返回true,后續調用isDone()、isCancelled()都返回true

boolean isCancelled()

表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回true

boolean isDone()

表示任務是否已經完成,則返回true,注意:正常完成、異常 或 取消操作都代表任務完成

V get() 和 V get(long timeout, TimeUnit unit)

get()用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回
get(long timeout, TimeUnit unit)用來獲取執行結果,如果在指定時間內還沒獲取到結果,會拋出TimeoutException

Future提供了三種功能:
1、獲取任務執行的結果
2、取消任務
3、判斷任務是否完成 或 是否取消

因為Future只是一個接口,所以是無法直接用來創建對象使用的,因此就有了下面的FutureTask

三、FutureTask

public class FutureTask<V> implements RunnableFuture<V>

FutureTask實現了RunnableFuture接口,那么RunnableFuture又是什么呢?

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

RunnableFuture接口繼承了RunnableFuture,所以它既是一個可以讓線程執行的Runnable任務,又是一個可以獲取Callable返回值的Future

FutureTask的屬性

/** The run state of this task */
private volatile int state;
private static final int NEW          = 0; 
private static final int COMPLETING   = 1; 
private static final int NORMAL       = 2; 
private static final int EXCEPTIONAL  = 3; 
private static final int CANCELLED    = 4; 
private static final int INTERRUPTING = 5; 
private static final int INTERRUPTED  = 6; 

/** The underlying callable; nulled out after running */
private Callable<V> callable;

/** The result to return or exception to throw from get() */
private Object outcome;

/** The thread running the callable; CASed during run() */
private volatile Thread runner;

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

state 是任務的運行狀態

  • 初始化時是NEW
  • 任務終止的狀態有NORMAL(正常結束)、EXCEPTIONAL(異常結束)、CANCELLED(被取消)、INTERRUPTED(執行中被中斷),這些狀態是通過set()setExceptioncancel()方法觸發的
  • COMPLETING 和 INTERRUPTING是兩個中間狀態,當正常結束設置outcome屬性前是COMPLETING,設置后變成NORMAL;當中斷運行中線程前是INTERRUPTING,調用thread.interrupt()后是INTERRUPTED

可能的狀態轉換:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

callable 是線程執行的有返回值的任務
outcome 是任務執行后的結果或異常
waiters 表示等待獲取結果的阻塞線程,鏈表結構,后等待線程的會排在鏈表前面

FutureTask的構造方法

FutureTask有兩個構造方法:
FutureTask(Callable callable)

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

構造方法參數是Callable定義的任務,並將state置為NEW,只有當state為NEW時,callable才能被執行

FutureTask(Runnable runnable, V result)

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

參數為Runnable和帶泛型的result對象,由於Runnable本身是沒有返回值的,故線程的執行結果通過result返回
可以看到通過runnable和result封裝了個Callable,實際上是new RunnableAdapter<T>(task, result),這個Adapter適配器將Runnable和result轉換成Callable,並返回result

FutureTask.run()的實現

線程運行時真正執行的方法,Callable.call()會在其中執行,並包含設置返回值或異常的邏輯

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

1、任務執行狀態不是NEW,直接返回;將runner屬性從null->當前線程不成功,直接返回
2、調用call()方法,調用成功,使用set()設置返回值
3、調用過程發生異常,使用setException()保存異常


set() 和 setException()

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

set()setException()的實現基本一樣,都是先將任務運行狀態從NEW->COMPLETING,分別設置返回值或異常給outcome,再將狀態分別置為NORMAL和EXCEPTIONAL,最后調用finishCompletion()依次喚醒等待獲取結果的阻塞線程


finishCompletion()實現

/**
 * Removes and signals all waiting threads, invokes done(), and nulls out callable.
 */
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
    	//將成員變量waiters置為null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
        	//循環喚醒WaitNode中的等待線程
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    //由子類實現的方法
    done();

    callable = null;        // to reduce footprint
}

1、執行FutureTask類的get方法時,會把主線程封裝成WaitNode節點並保存在waiters鏈表中
2、FutureTask任務執行完成后,通過UNSAFE設置waiters的值為null,並通過LockSupport.unpark方法依次喚醒等待獲取結果的線程

FutureTask.get()的實現

get()方法有兩個實現,一個是一直等待獲取結果,直到任務執行完;一個是等待指定時間,超時后任務還未完成會上拋TimeoutException

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    
    return report(s);
}

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
        
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
        
    return report(s);
}

內部通過awaitDone()對主線程進行阻塞,具體實現如下:

/**
 * Awaits completion or aborts on interrupt or timeout.
 *
 * @param timed true if use timed waits
 * @param nanos time to wait, if timed
 * @return state upon completion
 */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L; //截止時間
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
    	//如果主線程已經被中斷,removeWaiter(),並上拋InterruptedException
    	//注意:Thread.interrupted()后會導致線程的中斷狀態為false
        if (Thread.interrupted()) {
            removeWaiter(q); //線程被中斷的情況下,從waiters鏈表中刪除q
            throw new InterruptedException();
        }

        int s = state;
        //如果任務已經完成(可能是正常完成、異常、中斷),直接返回,即還沒有開始等待,任務已經完成了
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        //如果任務正在完成,讓出CPU資源,等待state變成NORMAL或EXCEPTIONAL
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //s<COMPLETING 且 還沒有創建WaitNode
        else if (q == null)
            q = new WaitNode();
        //s<COMPLETING 且 已經創建WaitNode,但還沒有入隊
        else if (!queued)
        	/**
        	 * 1、將當前waiters賦值給q.next,即“q-->當前waiters”
        	 * 2、CAS,將waiters屬性,從“當前waiters-->q”
        	 * 所以后等待的會排在鏈表的前面,而任務完成時會從鏈表前面開始依次喚醒等待線程
        	 */
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        //所有准備工作完成,判斷等待是否需要計時
        else if (timed) {
            nanos = deadline - System.nanoTime();
            //如果已經等待超時,remove當前WaiterNode
            if (nanos <= 0L) {
                removeWaiter(q); //等待超時的情況下,從waiters鏈表中刪除q
                return state;
            }
            LockSupport.parkNanos(this, nanos); //掛起一段時間
        }
        else
            LockSupport.park(this); //一直掛起,等待喚醒
    }
}

1、判斷主線程是否被中斷,如果被中斷,將當前WaitNode節點從waiters鏈表中刪除,並上拋InterruptedException
2、如果任務已經完成(可能是正常完成、異常、中斷),直接返回(即還沒有開始等待,任務已經完成了,就返回了)
3、如果任務正在完成,讓出CPU資源,等待state變成NORMAL或EXCEPTIONAL
4、如果任務沒有被中斷,也沒有完成,new WaitNode()
5、如果任務沒有被中斷,也沒有完成,也創建了WaitNode,使用UNSAFE.CAS()操作將WaitNode加入waiters鏈表
6、所有准備工作完畢,通過LockSupport的park或parkNanos掛起線程


WaitNode就是一個簡單的鏈表節點,記錄這等待的線程和下一個WaitNode

/**
 * Simple linked list nodes to record waiting threads in a Treiber
 * stack.  See other classes such as Phaser and SynchronousQueue
 * for more detailed explanation.
 */
static final class WaitNode {
    volatile Thread thread; //等待的線程
    volatile WaitNode next; //下一個WaitNode
    WaitNode() { thread = Thread.currentThread(); }
}

##**FutureTask.cancel()的實現** ```java public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false;
if (mayInterruptIfRunning) {
    if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
        return false;
    
    Thread t = runner;
    if (t != null)
        t.interrupt(); //中斷線程
    
    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
}
else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
    return false;

finishCompletion();

return true;

}

1、如果任務不是運行狀態,直接返回false失敗
2、如果mayInterruptIfRunning==true,中斷運行中的任務,使用CAS操作將狀態**NEW-->INTERRUPTING**,再調用runner.interrupt(),最后將狀態置為INTERRUPTED
3、如果mayInterruptIfRunning==false,將任務置為CANCELLED取消狀態
4、調用`finishCompletion()`依次喚醒等待獲取結果的線程,返回true取消成功
<br/>
#四、使用示例
```java
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestFuture {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        Task task = new Task(); //callable任務
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主線程在執行任務");
         
        try {
            System.out.println("task運行結果:"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任務執行完畢");
    }
    
    static class Task implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
            System.out.println("子線程在進行計算");
            Thread.sleep(3000);
            int sum = 0;
            for(int i=0;i<100;i++)
                sum += i;
            return sum;
        }
    }
}

運行結果:

子線程在進行計算
主線程在執行任務
task運行結果:4950
所有任務執行完畢

如果只是想控制在某些情況下可以將任務取消,可以使用Future<?> future = executor.submit(runnable),這樣返回結果肯定為null,但可以使用future.cancel()取消任務執行

五、總結

1、有了Runnable,為什么還需要Callable,它們的區別是什么?

Runnable和Callable都表示執行的任務,但不同的是Runnable.run()方法沒有返回值,Callable.call()有返回值
但其實線程在執行任務時還是執行的Runnable.run()方法,所以在使用ThreadPoolExecutor.submit()時會將Callable封裝為FutureTask,而FutureTask是Runnable和Future的實現類
所以在執行Callable的任務時,線程其實是執行FutureTask這個Runnable的run()方法,其中封裝了調用Callable.call()並返回結果的邏輯

執行Runnable任務如果發生異常,主線程無法知曉;而執行Callable任務如果發生異常,在Future.get()時會拋出java.util.concurrent.ExecutionException,其中封裝了真實異常

2、Future.get()是如何獲取線程返回值的?

首先得益於Callable.call()方法定義了返回值,提交Callable任務后,Callable會被封裝成FutureTask,其既可以作為Runnable被執行,也可以作為Future獲取返回值,FutureTask.run()方法會調用Callable.call()中的任務代碼
在任務執行完成前,如果主線程使用Future.get(),其實是調用FutureTask.get(),其中會判斷任務狀態尚未結束,將主線程加入waiters等待鏈表,並掛起主線程
待任務執行結束后,FutureTask會喚醒所有等待獲取返回值的線程,此時主線程的FutureTask.get()就會返回了

所以,主線程和運行線程是通過FutureTask作為橋梁獲取線程返回值的

3、Future.cancel()真的能取消任務的執行嗎?

首先答案是“不一定”,根據JDK中的方法注釋“Attempts to cancel execution of this task”,即嘗試去取消執行的任務
如果任務正在執行,且調用cancel()時參數mayInterruptIfRunning傳的是true,那么會對執行線程調用interrupt()方法
那么問題就變成了interrupt()方法能中斷線程執行嗎?
interrupt()方法不會中斷正在運行的線程。這一方法實際上完成的是在線程受到阻塞時拋出一個中斷信號,這樣線程就得以退出阻塞的狀態。更確切的說,如果線程被Object.wait()、Thread.join()、Thread.sleep()等阻塞,那么它將接收到一個中斷異常(InterruptedException),從而提早地終結被阻塞狀態。
如果線程沒有被阻塞,調用interrupt()將不起作用
那么即使線程正在阻塞狀態,並拋出了InterruptedException,線程能否真的取消執行還要看代碼中是否捕獲了InterruptedException和有沒有做相應的對中斷標示的判斷邏輯


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM