FutureTask是一個支持取消行為的異步任務執行器。該類實現了Future接口的方法。
如:
- 取消任務執行
- 查詢任務是否執行完成
- 獲取任務執行結果(”get“任務必須得執行完成才能獲取結果,否則會阻塞直至任務完成)。
注意:一旦任務執行完成或取消任務,則不能執行取消任務或者重新啟動任務。(除非一開始就使用runAndReset模式運行任務)
FutureTask實現了Runnable接口和Future接口,因此FutureTask可以傳遞到線程對象Thread或Excutor(線程池)來執行。
如果在當前線程中需要執行比較耗時的操作,但又不想阻塞當前線程時,可以把這些作業交給FutureTask,另開一個線程在后台完成,當當前線程將來需要時,就可以通過FutureTask對象獲得后台作業的計算結果或者執行狀態。
示例
1 public class FutureTaskDemo { 2 public static void main(String[]args)throws InterruptedException { 3 FutureTask < Integer > ft = new FutureTask < > (new Callable < Integer > () { 4 @Override 5 public Integer call()throws Exception { 6 int num = new Random().nextInt(10); 7 TimeUnit.SECONDS.sleep(num); 8 return num; 9 } 10 }); 11 Thread t = new Thread(ft); 12 t.start(); 13 //這里可以做一些其它的事情,跟futureTask任務並行,等需要futureTask的運行結果時,可以調用get方法獲取 14 try { 15 //等待任務執行完成,獲取返回值 16 Integer num = ft.get(); 17 System.out.println(num); 18 } catch (Exception e) { 19 e.printStackTrace(); 20 } 21 } 22 }
FutureTask 源碼分析
JDK1.8自己實現了一個同步等待隊列,在結果返回之前,所有的線程都被阻塞,存放到等待隊列中。
下面我們來分析下JDK1.8的FutureTask 源碼
FutureTask 類結構
1 public class FutureTask<V> implements RunnableFuture<V> { 2 /** * 當前任務的運行狀態。 3 * 4 * 可能存在的狀態轉換 5 * NEW -> COMPLETING -> NORMAL(有正常結果) 6 * NEW -> COMPLETING -> EXCEPTIONAL(結果為異常) 7 * NEW -> CANCELLED(無結果) 8 * NEW -> INTERRUPTING -> INTERRUPTED(無結果) 9 */ 10 private volatile int state; 11 private static final int NEW = 0; //初始狀態 12 private static final int COMPLETING = 1; //結果計算完成或響應中斷到賦值給返回值之間的狀態。 13 private static final int NORMAL = 2; //任務正常完成,結果被set 14 private static final int EXCEPTIONAL = 3; //任務拋出異常 15 private static final int CANCELLED = 4; //任務已被取消 16 private static final int INTERRUPTING = 5; //線程中斷狀態被設置ture,但線程未響應中斷 17 private static final int INTERRUPTED = 6; //線程已被中斷 18 19 //將要執行的任務 20 private Callable<V> callable; //用於get()返回的結果,也可能是用於get()方法拋出的異常 21 private Object outcome; // non-volatile, protected by state reads/writes //執行callable的線程,調用FutureTask.run()方法通過CAS設置 22 private volatile Thread runner; //棧結構的等待隊列,該節點是棧中的最頂層節點。 23 private volatile WaitNode waiters; 24 ....
FutureTask實現的接口信息如下:
RunnableFuture 接口
1 public interface RunnableFuture<V> extends Runnable, Future<V> { 2 void run(); 3 }
RunnableFuture 接口基礎了Runnable和Future接口
Future 接口
1 public interface Future<V> { 2 //取消任務 3 boolean cancel(boolean mayInterruptIfRunning); 4 //判斷任務是否已經取消 5 boolean isCancelled(); 6 //判斷任務是否結束(執行完成或取消) 7 boolean isDone(); 8 //阻塞式獲取任務執行結果 9 V get() throws InterruptedException, ExecutionException; 10 //支持超時獲取任務執行結果 11 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 12 }
run 方法
1 public void run() { 2 //保證callable任務只被運行一次 3 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) 4 return; 5 try { 6 Callable < V > c = callable; 7 if (c != null && state == NEW) { 8 V result; 9 boolean ran; 10 try { 11 //執行任務,上面的例子我們可以看出,call()里面可能是一個耗時的操作,不過這里是同步的 12 result = c.call(); 13 //上面的call()是同步的,只有上面的result有了結果才會繼續執行 14 ran = true; 15 } catch (Throwable ex) { 16 result = null; 17 ran = false; 18 setException(ex); 19 } 20 if (ran) 21 //執行完了,設置result 22 set(result); 23 } 24 } 25 finally { 26 runner = null; 27 int s = state; 28 //判斷該任務是否正在響應中斷,如果中斷沒有完成,則等待中斷操作完成 29 if (s >= INTERRUPTING) 30 handlePossibleCancellationInterrupt(s); 31 } 32 }
1.如果state狀態不為New或者設置運行線程runner失敗則直接返回false,說明線程已經啟動過,保證任務在同一時刻只被一個線程執行。
2.調用callable.call()方法,如果調用成功則執行set(result)方法,將state狀態設置成NORMAL。如果調用失敗拋出異常則執行setException(ex)方法,將state狀態設置成EXCEPTIONAL,喚醒所有在get()方法上等待的線程。
3.如果當前狀態為INTERRUPTING(步驟2已CAS失敗),則一直調用Thread.yield()直至狀態不為INTERRUPTING
set方法
1 protected void set(V v) { 2 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { 3 outcome = v; 4 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state 5 finishCompletion(); 6 } 7 }
- 首先通過CAS把state的NEW狀態修改成COMPLETING狀態。
- 修改成功則把v值賦給outcome變量。然后再把state狀態修改成NORMAL,表示現在可以獲取返回值。
- 最后調用finishCompletion()方法,喚醒等待隊列中的所有節點。
finishCompletion方法
1 private void finishCompletion() { 2 for (WaitNode q; (q = waiters) != null; ) { 3 //通過CAS把棧頂的元素置為null,相當於彈出棧頂元素 4 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { 5 for (; ; ) { 6 Thread t = q.thread; 7 if (t != null) { 8 q.thread = null; 9 LockSupport.unpark(t); 10 } 11 WaitNode next = q.next; 12 if (next == null) 13 break; 14 q.next = null; // unlink to help gc 15 q = next; 16 } 17 break; 18 } 19 } 20 done(); 21 callable = null; // to reduce footprint 22 }
把棧中的元素一個一個彈出,並通過 LockSupport.unpark(t)喚醒每一個節點,通知每個線程,該任務執行完成(可能是執行完成,也可能cancel,異常等)
runAndReset 方法
1 protected boolean runAndReset() { 2 if (state != NEW || 3 !UNSAFE.compareAndSwapObject(this, runnerOffset, 4 null, Thread.currentThread())) 5 return false; 6 boolean ran = false; 7 int s = state; 8 try { 9 Callable<V> c = callable; 10 if (c != null && s == NEW) { 11 try { 12 // 執行任務,和run方法不同的是這里不需要設置返回值 13 c.call(); // don't set result 14 ran = true; 15 } catch (Throwable ex) { 16 setException(ex); 17 } 18 } 19 } finally { 20 // runner must be non-null until state is settled to 21 // prevent concurrent calls to run() 22 runner = null; 23 // state must be re-read after nulling runner to prevent 24 // leaked interrupts 25 s = state; 26 if (s >= INTERRUPTING) 27 handlePossibleCancellationInterrupt(s); 28 } 29 //這里並沒有改變state的狀態,還是NEW狀態 30 return ran && s == NEW; 31 }
runAndReset()和run()方法最大的區別是 runAndReset 不需要設置返回值,並且不需要改變任務的狀態,也就是不改變state的狀態,一直是NEW狀態。
get方法
1 public V get()throws InterruptedException, ExecutionException { 2 int s = state; 3 if (s <= COMPLETING) 4 s = awaitDone(false, 0L); 5 return report(s); 6 }
如果state狀態小於等於COMPLETING,說明任務還沒開始執行或還未執行完成,然后調用awaitDone方法阻塞該調用線程。
如果state的狀態大於COMPLETING,則說明任務執行完成,或發生異常、中斷、取消狀態。直接通過report方法返回執行結果。
awaitDone 方法
1 private int awaitDone(boolean timed, long nanos)throws InterruptedException { 2 final long deadline = timed ? System.nanoTime() + nanos : 0L; 3 WaitNode q = null; 4 boolean queued = false; 5 for (; ; ) { 6 //如果該線程執行interrupt()方法,則從隊列中移除該節點,並拋出異常 7 if (Thread.interrupted()) { 8 removeWaiter(q); 9 throw new InterruptedException(); 10 } 11 int s = state; 12 //如果state狀態大於COMPLETING 則說明任務執行完成,或取消 13 if (s > COMPLETING) { 14 if (q != null) 15 q.thread = null; 16 return s; 17 } 18 //如果state=COMPLETING,則使用yield,因為此狀態的時間特別短,通過yield比掛起響應更快。 19 else if (s == COMPLETING) // cannot time out yet 20 Thread.yield(); 21 //構建節點 22 else if (q == null) 23 q = new WaitNode(); 24 //把當前節點入棧 25 else if (!queued) 26 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); 27 //如果需要阻塞指定時間,則使用LockSupport.parkNanos阻塞指定時間 28 //如果到指定時間還沒執行完,則從隊列中移除該節點,並返回當前狀態 29 else if (timed) { 30 nanos = deadline - System.nanoTime(); 31 if (nanos <= 0L) { 32 removeWaiter(q); 33 return state; 34 } 35 LockSupport.parkNanos(this, nanos); 36 } 37 //阻塞當前線程 38 else 39 LockSupport.park(this); 40 } 41 }
構建棧鏈表的節點元素,並將該節點入棧,同時阻塞當前線程等待運行主任務的線程喚醒該節點。
report方法
1 private V report(int s)throws ExecutionException { 2 Object x = outcome; 3 if (s == NORMAL) 4 return (V)x; 5 if (s >= CANCELLED) 6 throw new CancellationException(); 7 throw new ExecutionException((Throwable)x); 8 }
如果state的狀態為NORMAL,說明任務正確執行完成,直接返回計算后的值。
如果state的狀態大於等於CANCELLED,說明任務被成功取消執行、或響應中斷,直接返回CancellationException異常
否則返回ExecutionException異常。
總結
1.任務開始運行后,不能在次運行,保證只運行一次(runAndReset 方法除外)
2.任務還未開始,或者任務已被運行,但未結束,這兩種情況下都可以取消; 如果任務已經結束,則不可以被取消 。