Future是我們在使用java實現異步時最常用到的一個類,我們可以向線程池提交一個Callable,並通過future對象獲取執行結果。本篇文章主要講述了JUC中FutureTask中的一些實現原理。使用的jdk版本是1.7。
Future
Future是一個接口,它定義了5個方法:
boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
簡單說明一下接口定義
- boolean cancel(boolean mayInterruptInRunning) 取消一個任務,並返回取消結果。參數表示是否中斷線程。
- boolean isCancelled() 判斷任務是否被取消
- Boolean isDone() 判斷當前任務是否執行完畢,包括正常執行完畢、執行異常或者任務取消。
- V get() 獲取任務執行結果,任務結束之前會阻塞。
- V get(long timeout, TimeUnit unit) 在指定時間內嘗試獲取執行結果。若超時則拋出超時異常
寫個簡單demo:
public class FutureDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); Future future = executorService.submit(new Callable<Object>() { @Override public Object call() throws Exception { Long start = System.currentTimeMillis(); while (true) { Long current = System.currentTimeMillis(); if ((current - start) > 1000) { return 1; } } } }); try { Integer result = (Integer)future.get(); System.out.println(result); }catch (Exception e){ e.printStackTrace(); } } }
這里模擬了1s鍾的CPU空轉,當執行future.get()的時候,主線程阻塞了大約一秒后獲得結果。
當然我們也可以使用get(long timeout, TimeUnit unit)
try { Integer result = (Integer) future.get(500, TimeUnit.MILLISECONDS);
System.out.println(result); } catch (Exception e) { e.printStackTrace(); }
由於在500ms內沒有結果返回,所以拋出異常,打印異常堆棧如下
當然,如果我們把超時時間設置的長一些,還是可以得到預期的結果的。
FutureTask實現原理
下面我們介紹一下FutureTask內部的一些實現機制。下文從以下幾點敘述:
- 類繼承結構
- 核心成員變量
- 內部狀態轉換
- 核心方法解析
1 類繼承結構
首先我們看一下FutureTask的繼承結構:
FutureTask實現了RunnableFuture接口,而RunnableFuture繼承了Runnable和Future,也就是說FutureTask既是Runnable,也是Future。
2 核心成員變量
FutureTask內部定義了以下變量,以及它們的含義如下
- volatile int state:表示對象狀態,volatile關鍵字保證了內存可見性。futureTask中定義了7種狀態,代表了7種不同的執行狀態
private static final int NEW = 0; //任務新建和執行中 private static final int COMPLETING = 1; //任務將要執行完畢 private static final int NORMAL = 2; //任務正常執行結束 private static final int EXCEPTIONAL = 3; //任務異常 private static final int CANCELLED = 4; //任務取消 private static final int INTERRUPTING = 5; //任務線程即將被中斷 private static final int INTERRUPTED = 6; //任務線程已中斷
- Callable<V> callable:被提交的任務
- Object outcome:任務執行結果或者任務異常
- volatile Thread runner:執行任務的線程
- volatile WaitNode waiters:等待節點,關聯等待線程
- long stateOffset:state字段的內存偏移量
- long runnerOffset:runner字段的內存偏移量
- long waitersOffset:waiters字段的內存偏移量
后三個字段是配合Unsafe類做CAS操作使用的。
3 內部狀態轉換
FutureTask中使用state表示任務狀態,state值變更的由CAS操作保證原子性。
FutureTask對象初始化時,在構造器中把state置為為NEW,之后狀態的變更依據具體執行情況來定。
例如任務執行正常結束前,state會被設置成COMPLETING,代表任務即將完成,接下來很快就會被設置為NARMAL或者EXCEPTIONAL,這取決於調用Runnable中的call()方法是否拋出了異常。有異常則后者,反之前者。
任務提交后、任務結束前取消任務,那么有可能變為CANCELLED或者INTERRUPTED。在調用cancel方法時,如果傳入false表示不中斷線程,state會被置為CANCELLED,反之state先被變為INTERRUPTING,后變為INTERRUPTED。
總結下,FutureTask的狀態流轉過程,可以出現以下四種情況:
1. 任務正常執行並返回。 NEW -> COMPLETING -> NORMAL
2. 執行中出現異常。NEW -> COMPLETING -> EXCEPTIONAL
3. 任務執行過程中被取消,並且不響應中斷。NEW -> CANCELLED
4. 任務執行過程中被取消,並且響應中斷。 NEW -> INTERRUPTING -> INTERRUPTED
4 核心方法解析
接下來我們一起扒一扒FutureTask的源碼。我們先看一下任務線程是怎么執行的。當任務被提交到線程池后,會執行futureTask的run()方法。
1 public void run()
public void run() {
// 校驗任務狀態 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable;
// double check if (c != null && state == NEW) { V result; boolean ran; try {
//執行業務代碼 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally {
// 重置runner runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
翻譯一下,這個方法經歷了以下幾步
- 校驗當前任務狀態是否為NEW以及runner是否已賦值。這一步是防止任務被取消。
- double-check任務狀態state
- 執行業務邏輯,也就是c.call()方法被執行
- 如果業務邏輯異常,則調用setException方法將異常對象賦給outcome,並且更新state值
- 如果業務正常,則調用set方法將執行結果賦給outcome,並且更新state值
我們繼續往下看,setException(Throwable t)和set(V v) 具體是怎么做的
protected void set(V v) { // state狀態 NEW->COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; // COMPLETING -> NORMAL 到達穩定狀態 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 一些結束工作 finishCompletion(); } }
protected void setException(Throwable t) { // state狀態 NEW->COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; // COMPLETING -> EXCEPTIONAL 到達穩定狀態 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 一些結束工作 finishCompletion(); } }
code中的注釋已經寫的很清楚,故不翻譯了。狀態變更的原子性由unsafe對象提供的CAS操作保證。FutureTask的outcome變量存儲執行結果或者異常對象,會由主線程返回。
2 get()和get(long timeout, TimeUnit unit)
任務由線程池提供的線程執行,那么這時候主線程則會阻塞,直到任務線程喚醒它們。我們通過get(long timeout, TimeUnit unit)方法看看是怎么做的
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
get的源碼很簡潔,首先校驗參數,然后根據state狀態判斷是否超時,如果超時則異常,不超時則調用report(s)去獲取最終結果。
當 s<= COMPLETING時,表明任務仍然在執行且沒有被取消。如果它為true,那么走到awaitDone方法。
awaitDone是futureTask實現阻塞的關鍵方法,我們重點關注一下它的實現原理。
/** * 等待任務執行完畢,如果任務取消或者超時則停止 * @param timed 為true表示設置超時時間 * @param nanos 超時時間 * @return 任務完成時的狀態 * @throws InterruptedException */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 任務截止時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // 自旋 for (;;) { if (Thread.interrupted()) { //線程中斷則移除等待線程,並拋出異常 removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { // 任務可能已經完成或者被取消了 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // 可能任務線程被阻塞了,主線程讓出CPU Thread.yield(); else if (q == null) // 等待線程節點為空,則初始化新節點並關聯當前線程 q = new WaitNode(); else if (!queued) // 等待線程入隊列,成功則queued=true queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { //已經超時的話,移除等待節點 removeWaiter(q); return state; } // 未超時,將當前線程掛起指定時間 LockSupport.parkNanos(this, nanos); } else // timed=false時會走到這里,掛起當前線程 LockSupport.park(this); } }
注釋里也很清楚的寫明了每一步的作用,我們以設置超時時間為例,總結一下過程
- 計算deadline,也就是到某個時間點后如果還沒有返回結果,那么就超時了。
- 進入自旋,也就是死循環。
- 首先判斷是否響應線程中斷。對於線程中斷的響應往往會放在線程進入阻塞之前,這里也印證了這一點。
- 判斷state值,如果>COMPLETING表明任務已經取消或者已經執行完畢,就可以直接返回了。
- 如果任務還在執行,則為當前線程初始化一個等待節點WaitNode,入等待隊列。這里和AQS的等待隊列類似,只不過Node只關聯線程,而沒有狀態。AQS里面的等待節點是有狀態的。
- 計算nanos,判斷是否已經超時。如果已經超時,則移除所有等待節點,直接返回state。超時的話,state的值仍然還是COMPLETING。
- 如果還未超時,就通過LockSupprot類提供的方法在指定時間內掛起當前線程,等待任務線程喚醒或者超時喚醒。
當線程被掛起之后,如果任務線程執行完畢,就會喚醒等待線程哦。這一步就是在finishCompletion里面做的,前面已經提到這個方法。我們再看看這個方法具體做了哪些事吧~
/** * 移除並喚醒所有等待線程,執行done,置空callable * nulls out callable. */ private void finishCompletion() { //遍歷等待節點 for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //喚醒等待線程 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; // unlink to help gc q.next = null; q = next; } break; } } //模板方法,可以被覆蓋 done(); //清空callable callable = null; }
由代碼和注釋可以看出來,這個方法的作用主要在於喚醒等待線程。由前文可知,當任務正常結束或者異常時,都會調用finishCompletion去喚醒等待線程。這個時候,等待線程就可以醒來,開開心心的獲得結果啦。
最后我們看一下任務取消
3 public boolean cancel(boolean mayInterruptIfRunning)
注意,取消操作不一定會起作用,這里我們先貼個demo
1 public class FutureDemo { 2 public static void main(String[] args) { 3 ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); 4 // 預創建線程 5 executorService.prestartCoreThread(); 6 7 Future future = executorService.submit(new Callable<Object>() { 8 @Override 9 public Object call() { 10 System.out.println("start to run callable"); 11 Long start = System.currentTimeMillis(); 12 while (true) { 13 Long current = System.currentTimeMillis(); 14 if ((current - start) > 1000) { 15 System.out.println("當前任務執行已經超過1s"); 16 return 1; 17 } 18 } 19 } 20 }); 21 22 System.out.println(future.cancel(false)); 23 24 try { 25 Thread.currentThread().sleep(3000); 26 executorService.shutdown(); 27 } catch (Exception e) { 28 //NO OP 29 } 30 } 31 }
我們多次測試后發現,出現了2種打印結果,如圖
結果1
結果2
第一種是任務壓根沒取消,第二種則是任務壓根沒提交成功。
方法簽名注釋告訴我們,取消操作是可能會失敗的,如果當前任務已經結束或者已經取消,則當前取消操作會失敗。如果任務尚未開始,那么任務不會被執行。這就解釋了出現上圖結果2的情況。我們還是從源碼去分析cancel()究竟做了哪些事。
public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; if (mayInterruptIfRunning) { if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }
執行邏輯如下
- state不為NEW時,任務即將進入終態,直接返回false表明取消操作失敗。
- state狀態為NEW,任務可能已經開始執行,也可能還未開始。
- mayInterruptIfRunning表明是否中斷線程。若是,則嘗試將state設置為INTERRUPTING,並且中斷線程,之后將state設置為終態INTERRUPTED。
- 如果mayInterruptIfRunning=false,則不中斷線程,把state設置為CANCELLED
- 移除等待線程並喚醒。
- 返回true
可見,cancel()方法改變了futureTask的狀態位,如果傳入的是false並且業務邏輯已經開始執行,當前任務是不會被終止的,而是會繼續執行,直到異常或者執行完畢。如果傳入的是true,會調用當前線程的interrupt()方法,把中斷標志位設為true。
事實上,除非線程自己停止自己的任務,或者退出JVM,是沒有其他方法完全終止一個線程的任務的。mayInterruptIfRunning=true,通過希望當前線程可以響應中斷的方式來結束任務。當任務被取消后,會被封裝為CancellationException拋出。
總結
總結一下,futureTask中的任務狀態由變量state表示,任務狀態都基於state判斷。而futureTask的阻塞則是通過自旋+掛起線程實現。理解FutureTask的內部實現機制,我們使用Future時才能更加得心應手。文中摻雜着筆者的個人理解,如果有不正之處,還望讀者多多指正
作者:mayday芋頭