1 基本概念
1.1 Callable與Future
Runnable封裝一個異步運行的任務,可以把它想象成為一個沒有參數和返回值的異步方法。Callable與Runnable類似,但是有返回值。Callable接口是一個參數化的類型,只有一個方法call。
public interface Callable<V> {
V call() throws Exception;
}
類型參數是返回值的類型。例如,
Callable<Integer>表示一個最終返回Integer對象的異步計算。
Future保存異步計算的結果。可以啟動一個計算,將Future對象交給某個線程,然后忘掉它。Future對象的所有者在結果計算好之后就可以獲得它。
Future接口具有下面的方法:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
第一個get方法的調用被阻塞,知道計算完成。如果在計算完成之前,第二個get方法的調用超時,拋出一個TimeoutException異常。如果運行該計算的線程被中斷,兩個方法都將拋出InterruptedException。如果計算已經完成,那么get方法立即返回。
如果計算還在進行,isDone方法返回false;如果完成了,則返回true。
可以用cancel方法取消該計算。如果計算還沒有開始,它被取消且不再開始。如果計算處於運行之中,那么如果mayInterrupt參數為true,它就被中斷。
1.2 FutureTask
FutureTask包裝器是一種非常便利的機制,同時實現了Future和Runnable接口。
類圖如下:
FutureTask的狀態轉換過程:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
1.3 FutureTask的執行過程
創建一個futureTask對象task
提交task到調度器executor等待調度或者在另外一個線程中執行task
等待調度中...
如果此時currentThread調取執行結果task.get(),會有幾種情況
if task 還沒有被executor調度或正在執行中
阻塞當前線程,並加入到一個阻塞鏈表中waitNode
else if task被其它Thread取消,並取消成功 或task處於中斷狀態
throw exception
else if task執行完畢,返回執行結果,或執行存在異常,返回異常信息
如果此時有另外一個線程調用task.get()
執行過程同上
2 應用場景
1. Future用於異步獲取執行結果或者取消任務。
2. 在高並發場景下確保任務只執行一次。
3 基本例子
Callable<Integer> myComputation = ...;
FutureTask<Integer> task = new FutureTask<Integer>(myComputation);
Thread t = new Thread(task);
t.start();
...
Integer result = task.get(); //獲取結果
4 FutureTask源碼分析
4.1 核心狀態
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
4.2 創建FutureTask
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
4.3 獲取執行結果
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
4.4 執行方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
4.5 設置狀態
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
5 高級示例
public class Memoizer<A, V> implements Computable<A, V> {
private final ConcurrentMap<A, Future<V>> cache = new ConcurrentMap<A, Future>>();
private final Computable<A, V> c;
public Memoizer(Computable<A, V> c) {
this.c = c;
}
public C computer(final A arg) throws InterruptedException {
while(true) {
Future<V> f = cache.get(arg);
if(f == null) {
Callable<V> eval = new Callable<V>() {
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.putIfAbsent(arg, ft);
if(f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch(CancellationException e) {
cache.remove(arg, f);
} catch(ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}
}