第一部分:What
在Java中一般通過繼承Thread類或者實現Runnable接口這兩種方式來創建多線程,但是這兩種方式都有個缺陷,就是不能在執行完成后獲取執行的結果,因此Java 1.5之后提供了Callable和Future接口,通過它們就可以在任務執行完畢之后得到任務的執行結果。本文會簡要的介紹使用方法,然后會從源代碼角度分析下具體的實現原理。
本文以Java 1.7的代碼進行分析。
第二部分:How
Callable接口
對於需要執行的任務需要實現Callable接口,Callable接口定義如下:
1 public interface Callable<V> { 2 /** 3 * Computes a result, or throws an exception if unable to do so. 4 * 5 * @return computed result 6 * @throws Exception if unable to compute a result 7 */ 8 V call() throws Exception; 9 }
可以看到Callable是個泛型接口,泛型V就是要call()方法返回的類型。Callable接口和Runnable接口很像,都可以被另外一個線程執行,但是正如前面所說的,Runnable不會返回數據也不能拋出異常。
Future接口
Future接口代表異步計算的結果,通過Future接口提供的方法可以查看異步計算是否執行完成,或者等待執行結果並獲取執行結果,同時還可以取消執行。Future接口的定義如下:
1 public interface Future<V> { 2 boolean cancel(boolean mayInterruptIfRunning); 3 boolean isCancelled(); 4 boolean isDone(); 5 V get() throws InterruptedException, ExecutionException; 6 V get(long timeout, TimeUnit unit) 7 throws InterruptedException, ExecutionException, TimeoutException; 8 }
- cancel():cancel()方法用來取消異步任務的執行。如果異步任務已經完成或者已經被取消,或者由於某些原因不能取消,則會返回false。如果任務還沒有被執行,則會返回true並且異步任務不會被執行。如果任務已經開始執行了但是還沒有執行完成,若mayInterruptIfRunning為true,則會立即中斷執行任務的線程並返回true,若mayInterruptIfRunning為false,則會返回true且不會中斷任務執行線程。
- isCanceled():判斷任務是否被取消,如果任務在結束(正常執行結束或者執行異常結束)前被取消則返回true,否則返回false。
- isDone():判斷任務是否已經完成,如果完成則返回true,否則返回false。需要注意的是:任務執行過程中發生異常、任務被取消也屬於任務已完成,也會返回true。
- get():獲取任務執行結果,如果任務還沒完成則會阻塞等待直到任務執行完成。如果任務被取消則會拋出CancellationException異常,如果任務執行過程發生異常則會拋出ExecutionException異常,如果阻塞等待過程中被中斷則會拋出InterruptedException異常。
- get(long timeout,Timeunit unit):帶超時時間的get()版本,如果阻塞等待過程中超時則會拋出TimeoutException異常。
FutureTask
Future只是一個接口,不能直接用來創建對象,FutureTask是Future的實現類,
FutureTask的繼承圖如下:
可以看到,FutureTask實現了RunnableFuture接口,則RunnableFuture接口繼承了Runnable接口和Future接口,所以FutureTask既能當做一個Runnable直接被Thread執行,也能作為Future用來得到Callable的計算結果。
使用
FutureTask一般配合ExecutorService來使用,也可以直接通過Thread來使用。
1 package com.beautyboss.slogen.callback; 2 3 import java.util.concurrent.*; 4 5 /** 6 * Author : Slogen 7 * AddTime : 17/6/4 8 * Email : huangjian13@meituan.com 9 */ 10 public class CallDemo { 11 12 public static void main(String[] args) throws ExecutionException, InterruptedException { 13 14 /** 15 * 第一種方式:Future + ExecutorService 16 * Task task = new Task(); 17 * ExecutorService service = Executors.newCachedThreadPool(); 18 * Future<Integer> future = service.submit(task1); 19 * service.shutdown(); 20 */ 21 22 23 /** 24 * 第二種方式: FutureTask + ExecutorService 25 * ExecutorService executor = Executors.newCachedThreadPool(); 26 * Task task = new Task(); 27 * FutureTask<Integer> futureTask = new FutureTask<Integer>(task); 28 * executor.submit(futureTask); 29 * executor.shutdown(); 30 */ 31 32 /** 33 * 第三種方式:FutureTask + Thread 34 */ 35 36 // 2. 新建FutureTask,需要一個實現了Callable接口的類的實例作為構造函數參數 37 FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task()); 38 // 3. 新建Thread對象並啟動 39 Thread thread = new Thread(futureTask); 40 thread.setName("Task thread"); 41 thread.start(); 42 43 try { 44 Thread.sleep(1000); 45 } catch (InterruptedException e) { 46 e.printStackTrace(); 47 } 48 49 System.out.println("Thread [" + Thread.currentThread().getName() + "] is running"); 50 51 // 4. 調用isDone()判斷任務是否結束 52 if(!futureTask.isDone()) { 53 System.out.println("Task is not done"); 54 try { 55 Thread.sleep(2000); 56 } catch (InterruptedException e) { 57 e.printStackTrace(); 58 } 59 } 60 int result = 0; 61 try { 62 // 5. 調用get()方法獲取任務結果,如果任務沒有執行完成則阻塞等待 63 result = futureTask.get(); 64 } catch (Exception e) { 65 e.printStackTrace(); 66 } 67 68 System.out.println("result is " + result); 69 70 } 71 72 // 1. 繼承Callable接口,實現call()方法,泛型參數為要返回的類型 73 static class Task implements Callable<Integer> { 74 75 @Override 76 public Integer call() throws Exception { 77 System.out.println("Thread [" + Thread.currentThread().getName() + "] is running"); 78 int result = 0; 79 for(int i = 0; i < 100;++i) { 80 result += i; 81 } 82 83 Thread.sleep(3000); 84 return result; 85 } 86 } 87 }
第三部分:Why
構造函數
先從FutureTask的構造函數看起,FutureTask有兩個構造函數,其中一個如下:
1 public FutureTask(Callable<V> callable) { 2 if (callable == null) 3 throw new NullPointerException(); 4 this.callable = callable; 5 this.state = NEW; // ensure visibility of callable 6 }
這個構造函數會把傳入的Callable變量保存在this.callable字段中,該字段定義為private Callable<V> callable;用來保存底層的調用,在被執行完成以后會指向null,接着會初始化state字段為NEW。state字段用來保存FutureTask內部的任務執行狀態,一共有7中狀態,每種狀態及其對應的值如下:
1 private volatile int state; 2 private static final int NEW = 0; 3 private static final int COMPLETING = 1; 4 private static final int NORMAL = 2; 5 private static final int EXCEPTIONAL = 3; 6 private static final int CANCELLED = 4; 7 private static final int INTERRUPTING = 5; 8 private static final int INTERRUPTED = 6;
其中需要注意的是state是volatile類型的,也就是說只要有任何一個線程修改了這個變量,那么其他所有的線程都會知道最新的值。
為了后面更好的分析FutureTask的實現,這里有必要解釋下各個狀態。
- NEW:表示是個新的任務或者還沒被執行完的任務。這是初始狀態。
- COMPLETING:任務已經執行完成或者執行任務的時候發生異常,但是任務執行結果或者異常原因還沒有保存到outcome字段(outcome字段用來保存任務執行結果,如果發生異常,則用來保存異常原因)的時候,狀態會從NEW變更到COMPLETING。但是這個狀態會時間會比較短,屬於中間狀態。
- NORMAL:任務已經執行完成並且任務執行結果已經保存到outcome字段,狀態會從COMPLETING轉換到NORMAL。這是一個最終態。
- EXCEPTIONAL:任務執行發生異常並且異常原因已經保存到outcome字段中后,狀態會從COMPLETING轉換到EXCEPTIONAL。這是一個最終態。
- CANCELLED:任務還沒開始執行或者已經開始執行但是還沒有執行完成的時候,用戶調用了cancel(false)方法取消任務且不中斷任務執行線程,這個時候狀態會從NEW轉化為CANCELLED狀態。這是一個最終態。
- INTERRUPTING: 任務還沒開始執行或者已經執行但是還沒有執行完成的時候,用戶調用了cancel(true)方法取消任務並且要中斷任務執行線程但是還沒有中斷任務執行線程之前,狀態會從NEW轉化為INTERRUPTING。這是一個中間狀態。
- INTERRUPTED:調用interrupt()中斷任務執行線程之后狀態會從INTERRUPTING轉換到INTERRUPTED。這是一個最終態。
有一點需要注意的是,所有值大於COMPLETING的狀態都表示任務已經執行完成(任務正常執行完成,任務執行異常或者任務被取消)。
各個狀態之間的可能轉換關系如下圖所示:
另外一個構造函數如下,
1 public FutureTask(Runnable runnable, V result) { 2 this.callable = Executors.callable(runnable, result); 3 this.state = NEW; // ensure visibility of callable 4 }
這個構造函數會把傳入的Runnable封裝成一個Callable對象保存在callable字段中,同時如果任務執行成功的話就會返回傳入的result。這種情況下如果不需要返回值的話可以傳入一個null。
順帶看下Executors.callable()這個方法,這個方法的功能是把Runnable轉換成Callable,代碼如下:
1 public static <T> Callable<T> callable(Runnable task, T result) { 2 if (task == null) 3 throw new NullPointerException(); 4 return new RunnableAdapter<T>(task, result); 5 }
可以看到這里采用的是適配器模式,調用RunnableAdapter<T>(task, result)方法來適配,實現如下:
1 static final class RunnableAdapter<T> implements Callable<T> { 2 final Runnable task; 3 final T result; 4 RunnableAdapter(Runnable task, T result) { 5 this.task = task; 6 this.result = result; 7 } 8 public T call() { 9 task.run(); 10 return result; 11 } 12 }
這個適配器很簡單,就是簡單的實現了Callable接口,在call()實現中調用Runnable.run()方法,然后把傳入的result作為任務的結果返回。
在new了一個FutureTask對象之后,接下來就是在另一個線程中執行這個Task,無論是通過直接new一個Thread還是通過線程池,執行的都是run()方法,接下來就看看run()方法的實現。
run()
run()方法實現如下:
1 public void run() { 2 // 1. 狀態如果不是NEW,說明任務或者已經執行過,或者已經被取消,直接返回 3 // 2. 狀態如果是NEW,則嘗試把當前執行線程保存在runner字段中 4 // 如果賦值失敗則直接返回 5 if (state != NEW || 6 !UNSAFE.compareAndSwapObject(this, runnerOffset, 7 null, Thread.currentThread())) 8 return; 9 try { 10 Callable<V> c = callable; 11 if (c != null && state == NEW) { 12 V result; 13 boolean ran; 14 try { 15 // 3. 執行任務 16 result = c.call(); 17 ran = true; 18 } catch (Throwable ex) { 19 result = null; 20 ran = false; 21 // 4. 任務異常 22 setException(ex); 23 } 24 if (ran) 25 // 4. 任務正常執行完畢 26 set(result); 27 } 28 } finally { 29 // runner must be non-null until state is settled to 30 // prevent concurrent calls to run() 31 runner = null; 32 // state must be re-read after nulling runner to prevent 33 // leaked interrupts 34 int s = state; 35 // 5. 如果任務被中斷,執行中斷處理 36 if (s >= INTERRUPTING) 37 handlePossibleCancellationInterrupt(s); 38 } 39 }
run()方法首先會
- 判斷當前任務的state是否等於NEW,如果不為NEW則說明任務或者已經執行過,或者已經被取消,直接返回。
- 如果狀態為NEW則接着會通過unsafe類把任務執行線程引用CAS的保存在runner字段中,如果保存失敗,則直接返回。
- 執行任務。
- 如果任務執行發生異常,則調用setException()方法保存異常信息。setException()方法如下:
1 protected void setException(Throwable t) { 2 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { 3 outcome = t; 4 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state 5 finishCompletion(); 6 } 7 }
在setException()方法中
- 首先會CAS的把當前的狀態從NEW變更為COMPLETING狀態。
- 把異常原因保存在outcome字段中,outcome字段用來保存任務執行結果或者異常原因。
- CAS的把當前任務狀態從COMPLETING變更為EXCEPTIONAL。這個狀態轉換對應着上圖中的二。
- 調用finishCompletion()。關於這個方法后面在分析。
5. 如果任務成功執行則調用set()方法設置執行結果,該方法實現如下:
1 protected void set(V v) { 2 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { 3 outcome = v; 4 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state 5 finishCompletion(); 6 } 7 }
這個方法跟上面分析的setException()差不多,
- 首先會CAS的把當前的狀態從NEW變更為COMPLETING狀態。
- 把任務執行結果保存在outcome字段中。
- CAS的把當前任務狀態從COMPLETING變更為NORMAL。這個狀態轉換對應着上圖中的一。
- 調用finishCompletion()。
發起任務線程跟執行任務線程通常情況下都不會是同一個線程,在任務執行線程執行任務的時候,任務發起線程可以查看任務執行狀態、獲取任務執行結果、取消任務等等操作,接下來分析下這些操作。
get()
任務發起線程可以調用get()方法來獲取任務執行結果,如果此時任務已經執行完畢則會直接返回任務結果,如果任務還沒執行完畢,則調用方會阻塞直到任務執行結束返回結果為止。get()方法實現如下:
1 public V get() throws InterruptedException, ExecutionException { 2 int s = state; 3 if (s <= COMPLETING) 4 s = awaitDone(false, 0L); 5 return report(s); 6 }
get()方法實現比較簡單,會
- 判斷任務當前的state <= COMPLETING是否成立。前面分析過,COMPLETING狀態是任務是否執行完成的臨界狀態。
- 如果成立,表明任務還沒有結束(這里的結束包括任務正常執行完畢,任務執行異常,任務被取消),則會調用awaitDone()進行阻塞等待。
- 如果不成立表明任務已經結束,調用report()返回結果。
awaitDone()
當調用get()獲取任務結果但是任務還沒執行完成的時候,調用線程會調用awaitDone()方法進行阻塞等待,該方法定義如下:
1 private int awaitDone(boolean timed, long nanos) 2 throws InterruptedException { 3 // 計算等待截止時間 4 final long deadline = timed ? System.nanoTime() + nanos : 0L; 5 WaitNode q = null; 6 boolean queued = false; 7 for (;;) { 8 // 1. 判斷阻塞線程是否被中斷,如果被中斷則在等待隊 9 // 列中刪除該節點並拋出InterruptedException異常 10 if (Thread.interrupted()) { 11 removeWaiter(q); 12 throw new InterruptedException(); 13 } 14 15 // 2. 獲取當前狀態,如果狀態大於COMPLETING 16 // 說明任務已經結束(要么正常結束,要么異常結束,要么被取消) 17 // 則把thread顯示置空,並返回結果 18 int s = state; 19 if (s > COMPLETING) { 20 if (q != null) 21 q.thread = null; 22 return s; 23 } 24 // 3. 如果狀態處於中間狀態COMPLETING 25 // 表示任務已經結束但是任務執行線程還沒來得及給outcome賦值 26 // 這個時候讓出執行權讓其他線程優先執行 27 else if (s == COMPLETING) // cannot time out yet 28 Thread.yield(); 29 // 4. 如果等待節點為空,則構造一個等待節點 30 else if (q == null) 31 q = new WaitNode(); 32 // 5. 如果還沒有入隊列,則把當前節點加入waiters首節點並替換原來waiters 33 else if (!queued) 34 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, 35 q.next = waiters, q); 36 else if (timed) { 37 // 如果需要等待特定時間,則先計算要等待的時間 38 // 如果已經超時,則刪除對應節點並返回對應的狀態 39 nanos = deadline - System.nanoTime(); 40 if (nanos <= 0L) { 41 removeWaiter(q); 42 return state; 43 } 44 // 6. 阻塞等待特定時間 45 LockSupport.parkNanos(this, nanos); 46 } 47 else 48 // 6. 阻塞等待直到被其他線程喚醒 49 LockSupport.park(this); 50 } 51 }
awaitDone()中有個死循環,每一次循環都會
- 判斷調用get()的線程是否被其他線程中斷,如果是的話則在等待隊列中刪除對應節點然后拋出InterruptedException異常。
- 獲取任務當前狀態,如果當前任務狀態大於COMPLETING則表示任務執行完成,則把thread字段置null並返回結果。
- 如果任務處於COMPLETING狀態,則表示任務已經處理完成(正常執行完成或者執行出現異常),但是執行結果或者異常原因還沒有保存到outcome字段中。這個時候調用線程讓出執行權讓其他線程優先執行。
- 如果等待節點為空,則構造一個等待節點WaitNode。
- 如果第四步中新建的節點還沒如隊列,則CAS的把該節點加入waiters隊列的首節點。
- 阻塞等待。
假設當前state=NEW且waiters為NULL,也就是說還沒有任何一個線程調用get()獲取執行結果,這個時候有兩個線程threadA和threadB先后調用get()來獲取執行結果。再假設這兩個線程在加入阻塞隊列進行阻塞等待之前任務都沒有執行完成且threadA和threadB都沒有被中斷的情況下(因為如果threadA和threadB在進行阻塞等待結果之前任務就執行完成或線程本身被中斷的話,awaitDone()就執行結束返回了),執行過程是這樣的,以threadA為例:
- 第一輪for循環,執行的邏輯是q == null,所以這時候會新建一個節點q。第一輪循環結束。
- 第二輪for循環,執行的邏輯是!queue,這個時候會把第一輪循環中生成的節點的netx指針指向waiters,然后CAS的把節點q替換waiters。也就是把新生成的節點添加到waiters鏈表的首節點。如果替換成功,queued=true。第二輪循環結束。
- 第三輪for循環,進行阻塞等待。要么阻塞特定時間,要么一直阻塞知道被其他線程喚醒。
在threadA和threadB都阻塞等待之后的waiters結果如圖
cancel(boolean)
用戶可以調用cancel(boolean)方法取消任務的執行,cancel()實現如下:
1 public boolean cancel(boolean mayInterruptIfRunning) { 2 // 1. 如果任務已經結束,則直接返回false 3 if (state != NEW) 4 return false; 5 // 2. 如果需要中斷任務執行線程 6 if (mayInterruptIfRunning) { 7 // 2.1. 把任務狀態從NEW轉化到INTERRUPTING 8 if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) 9 return false; 10 Thread t = runner; 11 // 2.2. 中斷任務執行線程 12 if (t != null) 13 t.interrupt(); 14 // 2.3. 修改狀態為INTERRUPTED 15 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state 16 } 17 // 3. 如果不需要中斷任務執行線程,則直接把狀態從NEW轉化為CANCELLED 18 else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) 19 return false; 20 // 4. 21 finishCompletion(); 22 return true; 23 }
cancel()方法會做下面幾件事:
1 .判斷任務當前執行狀態,如果任務狀態不為NEW,則說明任務或者已經執行完成,或者執行異常,不能被取消,直接返回false表示執行失敗。
2. 判斷需要中斷任務執行線程,則
- 把任務狀態從NEW轉化到INTERRUPTING。這是個中間狀態。
- 中斷任務執行線程。
- 修改任務狀態為INTERRUPTED。這個轉換過程對應上圖中的四。
3. 如果不需要中斷任務執行線程,直接把任務狀態從NEW轉化為CANCELLED。如果轉化失敗則返回false表示取消失敗。這個轉換過程對應上圖中的四。
4. 調用finishCompletion()。
finishCompletion()
根據前面的分析,不管是任務執行異常還是任務正常執行完畢,或者取消任務,最后都會調用finishCompletion()方法,該方法實現如下:
1 private void finishCompletion() { 2 // assert state > COMPLETING; 3 for (WaitNode q; (q = waiters) != null;) { 4 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { 5 for (;;) { 6 Thread t = q.thread; 7 if (t != null) { 8 q.thread = null; 9 LockSupport.unpark(t); 10 } 11 WaitNode next = q.next; 12 if (next == null) 13 break; 14 q.next = null; // unlink to help gc 15 q = next; 16 } 17 break; 18 } 19 } 20 21 done(); 22 23 callable = null; // to reduce footprint 24 }
這個方法的實現比較簡單,依次遍歷waiters鏈表,喚醒節點中的線程,然后把callable置空。
被喚醒的線程會各自從awaitDone()方法中的LockSupport.park*()阻塞中返回,然后會進行新一輪的循環。在新一輪的循環中會返回執行結果(或者更確切的說是返回任務的狀態)。
report()
report()方法用在get()方法中,作用是把不同的任務狀態映射成任務執行結果。實現如下:
1 private V report(int s) throws ExecutionException { 2 Object x = outcome; 3 // 1. 任務正常執行完成,返回任務執行結果 4 if (s == NORMAL) 5 return (V)x; 6 // 2. 任務被取消,拋出CancellationException異常 7 if (s >= CANCELLED) 8 throw new CancellationException(); 9 // 3. 其他狀態,拋出執行異常ExecutionException 10 throw new ExecutionException((Throwable)x); 11 }
映射關系如下圖所示:
如果任務處於NEW、COMPLETING和INTERRUPTING這三種狀態的時候是執行不到report()方法的,所以沒必要對這三種狀態進行轉換。
get(long,TimeUnit)
帶超時等待的獲取任務結果,實現如下:
1 public V get(long timeout, TimeUnit unit) 2 throws InterruptedException, ExecutionException, TimeoutException { 3 if (unit == null) 4 throw new NullPointerException(); 5 int s = state; 6 if (s <= COMPLETING && 7 // 如果awaitDone()超時返回之后任務還沒結束,則拋出異常 8 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) 9 throw new TimeoutException(); 10 return report(s); 11 }
跟get()不同點在於get(long,TimeUnit)會在awaitDone()超時返回之后拋出TimeoutException異常。
isCancelled()和isDone()
這兩個方法分別用來判斷任務是否被取消和任務是否執行完成,實現都比較簡單,代碼如下:
1 public boolean isCancelled() { 2 return state >= CANCELLED; 3 }
1 public boolean isDone() { 2 return state != NEW; 3 }
根據前面的分析,這兩個方法就很容易理解不用多做解釋了,O(∩_∩)O。
總結下,其實FutureTask的實現還是比較簡單的,當用戶實現Callable()接口定義好任務之后,把任務交給其他線程進行執行。FutureTask內部維護一個任務狀態,任何操作都是圍繞着這個狀態進行,並隨時更新任務狀態。任務發起者調用get*()獲取執行結果的時候,如果任務還沒有執行完畢,則會把自己放入阻塞隊列中然后進行阻塞等待。當任務執行完成之后,任務執行線程會依次喚醒阻塞等待的線程。調用cancel()取消任務的時候也只是簡單的修改任務狀態,如果需要中斷任務執行線程的話則調用Thread.interrupt()中斷任務執行線程。
第四部分:Other
有個值得關注的問題就是當任務還在執行的時候用戶調用cancel(true)方法能否真正讓任務停止執行呢?
在前面的分析中我們直到,當調用cancel(true)方法的時候,實際執行還是Thread.interrupt()方法,而interrupt()方法只是設置中斷標志位,如果被中斷的線程處於sleep()、wait()或者join()邏輯中則會拋出InterruptedException異常。
因此結論是:cancel(true)並不一定能夠停止正在執行的異步任務。