FutureTask詳解


1 基本概念

1.1 Callable與Future

Runnable封裝一個異步運行的任務,可以把它想象成為一個沒有參數和返回值的異步方法。Callable與Runnable類似,但是有返回值。Callable接口是一個參數化的類型,只有一個方法call。

public interface Callable<V> {
	V call() throws Exception;
}

類型參數是返回值的類型。例如,

Callable<Integer>表示一個最終返回Integer對象的異步計算。

Future保存異步計算的結果。可以啟動一個計算,將Future對象交給某個線程,然后忘掉它。Future對象的所有者在結果計算好之后就可以獲得它。
Future接口具有下面的方法:

public interface Future<V> {
	boolean cancel(boolean mayInterruptIfRunning);
	boolean isCancelled();
	boolean isDone();
	V get() throws InterruptedException, ExecutionException;
	V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

第一個get方法的調用被阻塞,知道計算完成。如果在計算完成之前,第二個get方法的調用超時,拋出一個TimeoutException異常。如果運行該計算的線程被中斷,兩個方法都將拋出InterruptedException。如果計算已經完成,那么get方法立即返回。

如果計算還在進行,isDone方法返回false;如果完成了,則返回true。

可以用cancel方法取消該計算。如果計算還沒有開始,它被取消且不再開始。如果計算處於運行之中,那么如果mayInterrupt參數為true,它就被中斷。

1.2 FutureTask

FutureTask包裝器是一種非常便利的機制,同時實現了Future和Runnable接口。

類圖如下:

FutureTask的狀態轉換過程:

 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED

1.3 FutureTask的執行過程

創建一個futureTask對象task
提交task到調度器executor等待調度或者在另外一個線程中執行task

等待調度中...

如果此時currentThread調取執行結果task.get(),會有幾種情況
if task 還沒有被executor調度或正在執行中
    阻塞當前線程,並加入到一個阻塞鏈表中waitNode
else if task被其它Thread取消,並取消成功 或task處於中斷狀態
    throw exception
else if task執行完畢,返回執行結果,或執行存在異常,返回異常信息
    
        
如果此時有另外一個線程調用task.get()
    
執行過程同上

2 應用場景

1. Future用於異步獲取執行結果或者取消任務。
2. 在高並發場景下確保任務只執行一次。

3 基本例子

Callable<Integer> myComputation = ...;
FutureTask<Integer> task = new FutureTask<Integer>(myComputation);
Thread t = new Thread(task);
t.start();
...
Integer result = task.get(); //獲取結果

4 FutureTask源碼分析

4.1 核心狀態

 /**
 * The run state of this task, initially NEW.  The run state
 * transitions to a terminal state only in methods set,
 * setException, and cancel.  During completion, state may take on
 * transient values of COMPLETING (while outcome is being set) or
 * INTERRUPTING (only while interrupting the runner to satisfy a
 * cancel(true)). Transitions from these intermediate to final
 * states use cheaper ordered/lazy writes because values are unique
 * and cannot be further modified.
 * 
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

4.2 創建FutureTask

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
    
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

4.3 獲取執行結果

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

4.4 執行方法

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

4.5 設置狀態

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

5 高級示例

public class Memoizer<A, V> implements Computable<A, V> {
	private final ConcurrentMap<A, Future<V>> cache = new ConcurrentMap<A, Future>>();
	private final Computable<A, V> c;
	
	public Memoizer(Computable<A, V> c) {
		this.c = c;
	}
	
	public C computer(final A arg) throws InterruptedException {
		while(true) {
			Future<V> f = cache.get(arg);
			if(f == null) {
				Callable<V> eval = new Callable<V>() {
					public V call() throws InterruptedException {
						return c.compute(arg);
					}
				};
				FutureTask<V> ft = new FutureTask<V>(eval);
				f = cache.putIfAbsent(arg, ft);
				if(f == null) {
					f = ft;
					ft.run();
				}
			}
			
			try {
				return f.get();
			} catch(CancellationException e) {
				cache.remove(arg, f);
			} catch(ExecutionException e) {
				throw launderThrowable(e.getCause());
			}
		}
	}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM