以前線程Thread既表示執行的任務,又表示執行的機制。在JDK1.5中,java並發框架提供了一種“執行服務”的相關API,它將"任務的執行"和"任務的提交“相分離,”執行服務“封裝了任務執行的細節,對於任務提交者來說,它可進一步聚焦於任務本身,如任務提交、獲取任務執行后的結果、取消任務而不需要關注任務執行的細節,如線程的創建、任務的調試、線程的復用或關閉等。
1.基本接口
任務執行服務主要涉及4個接口
-
Runnable和Callable: 表示要執行的異步任務
-
Executor和ExecutorService:表示執行任務的服務
-
Future : 表示任務執行后的結果
Runnable是我們最常用的異步任務接口,這個接口的方法沒有返回值,不能拋出異常。而Callable接口就是為了解決Runnable的不足而在JDK1.5引入的接口,此接口的方法有返回值,且可拋出異常。
@FunctionalInterface public interface Callable<V> { V call() throws Exception; }
1)Executor接口
Executor接口也只有一個方法,這個方法接受一個Runnable類型的參數,這是個抽象方法,它無法指定任務該如何執行。它可能是新建一個線程執行任務,也可能是利用線程池中的一個線程,還可能是在調用者線程中執行。
public interface Executor { void execute(Runnable command); }
ExecutorService擴展了Executor接口,它添加了一些新功能,如支持有返回結果的任務、支持超時任務、支持取消任務、支持批量提交任務。這里的submit方法,返回類型是Future,返回后,只表示任務已提交,不代表已經執行,具體執行與否要看”執行服務“如何調度,通過Future可以查詢異步任務的狀態、獲取最終的結果、取消任務等。
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
2)Future接口
Future接口是對任務的結果做了進一步封裝,字面上”future"是未來的意思,這里確實是表示“未來(或最終)的結果”,“結果”需要等待。
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
cancel()方法用於取消異步任務,若任務已完成、已取消或因其他原因不能取消等原因而導致任務取消失敗就會返回false,反之返回true. 如果任務還未開始,則不再運行,但若任務已經在運行,則不一定能取消這個任務。參數mayInterruptIfRunning表示,如果任務正在執行,是否調用Thread.interrupt()方法中斷線程,而interrupt()方法只是設置線程中斷標志,它不一定能真的中斷線程。
isCancelled() 和isDone()分別返回任務是否被取消、任務是否已完成的布爾值。只要cancel方法返回true,那么即使執行任務的線程還未結束,isCancelled方法也一定會返回true。不管什么原因,無論是任務正常結束、任務拋出異常或任務被取消,只要任務結束了,isDone都會返回true.
get()方法用於返回異步任務的結果,若任務未完成,當前線程則會阻塞待。get(long,TimeUnit)方法需要設定等待時長,若在給定的時間內還未完成任務,則會拋出TimeoutException異常。
get方法的最終結果大致有3種: ①任務正常完成,get方法返回任務的執行結果,若任務是Runnable且入參未提供結果,最終返回null ②任務被取消了,get方法會拋出CancellationException. ③任務執行過程中拋出了異常,get方法會將異常包裝為ExecutionException重新拋出,此異常的getCause方法可獲得原始異常。
Future是實現”任務的提交“與”任務的執行“相分離的關鍵,它是兩者的橋梁,它使任務的提交者和任務的執行器的關注點相隔離,同時又讓兩者彼此聯系。
2.用法示例
下面的例子中,使用異步任務,計算0-300的累加結果,在計算出結果前Future的get()方法將阻塞等待。
這里使用了Executors工具類的newSingleThreadeExecutor方法創建一了個執行服務。Executors有很多靜態方法,分別創建各種線程池執行服務。
class SimpleTaskTest { static class Accumlation implements Callable<Integer> { @Override public Integer call() throws Exception { int sum =0; for (int i = 0; i < 300; i++) { sum += i; Thread.sleep(10); } return sum; } } public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<Integer> future = executor.submit(new Accumlation()); long start = System.currentTimeMillis(); int result = future.get(); System.out.println("Future.get方法等待了" + (System.currentTimeMillis() - start) + "毫秒才返回,計算結果:" + result); } }
打印的結果
Future.get方法等待了3147毫秒才返回,計算結果:44850
3.基本原理
1)ExecutorService實現
ExecutorService是Executor的子接口,它添加了一些新功能,如支持有返回結果的任務、支持超時任務、支持取消任務、支持批量提交任務。
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
①ExecutorService有兩個關閉執行服務的方法,分別是shutdown()和shutdownNow. 兩者關閉執行服務的方式有所差別,shutdown()方法調用后不會接受新的任務,但已提交的任務將繼續執行(即使任務還未真正開始執行);而shutdownNow()方法不僅不會接受新任務,而且還會終止已經提交但未執行的任務,對於正在執行的任務,一般調用Thread.interrupt()方法設置中斷標志,不過線程可能不響應中斷,shutdownNow會返回已提交但未執行的任務列表。shutdown和shutdownNow不會阻塞等待,它們返回后不代表所有任務都結束。
②isShutdown返回執行服務是否被關閉的布爾值(不會等待),只要shutdown或shutdownNow任意一方法被調用后,isShutdwon都將返回true.
③awaitTermination方法用於等待執行服務中的所有任務完成,此方法需要設置超時時間,如果在限時間內所有任務都結束了(允許非正常結束),
④isTerminated 返回在執行服務關閉后所有任務是否已完成的布爾值,如果在此之前shutdownNow或shutdown沒有被調用,這里永不可能返回true.
⑤三個submit方法都用於提交單任務,submit(Callable<T> )方法中入參Callable本身有返回結果 ;submit(Runnable,T)方法在設定任務的同時可以提供一個結果,在任務結束時將返回這個結果;submit(Runnable )方法入參沒有提供結果,最終返回的結果是null 。
⑥ExecutorService有兩類批量提交任務的方法,invokeAll和invokeAny,它們都有兩個版本,一個不限時版本、一個超時版本。
invokeAll等待(給定的)所有任務完成,返回的Future集合中,每個Future的isDone方法都返回true,但isDone是true並不代表任務完成了,也有可能是因任務被取消而導致任務非正常結束。invokeAll的超時版本方法,需要指定等待時間的時間,若超時后還有任務還未完成,這些任務就會被取消。而invokeAny,只要有一個任務正常完成(沒拋出異常)后,它就返回此任務的結果;在正常返回或異常拋出返回后,其他任務則會被取消。對於invokeAny的超時版本,如果在限時內有一任務正常(沒拋出異常)完成,就返回此任務的結果 ,其他將任務會被取消;如果沒有任務能在限時內成功完成返回,就拋出TimeoutException; 沒有任務正常成功返回(可能是因發生某種異常而返回),將拋出ExecutionException.
在了解ExecutorService接口的相關抽象方法定義后,我們來進一步分析它的實現類和實現原理。
ExecutorService的主要實現類是ThreadPoolExecutor,它是基於線程池實現的,ExecutorService有一個很重要的抽象類AbstractExecutorService, 而且ThreadPoolExecutor就是直接繼承於AbstractExecutorService。
我們可以基於此抽象類實現一個簡易的ExecutorService。AbstractExecutorService提供了submit 、invokeAll和invokeAny的默認實現,子類只需要實現其他方法就行了。shutdown與isShutdown 等方法與生命周期管理有關,我暫時可以不用去管它,其實它的子類最關鍵在於實現execute方法,因為submit、invokeAll、invokeAny等方法底層主要還是調用execute方法。

import java.util.List; import java.util.concurrent.*; public class CustomizeExecutorService extends AbstractExecutorService { @Override public void shutdown() { System.out.println("=====shutdown====="); } @Override public List<Runnable> shutdownNow() { System.out.println("=====shutdownNow====="); return null; } @Override public boolean isShutdown() { return false; } @Override public boolean isTerminated() { return false; } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return false; } @Override public void execute(Runnable command) { new Thread(command).start(); } } class Accumlation implements Callable<Integer> { @Override public Integer call() throws Exception { int sum = 0; for (int i = 0; i < 300; i++) { sum += i; Thread.sleep(10); } return sum; } public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = new CustomizeExecutorService(); Future<Integer> future = executor.submit(new Accumlation()); long start = System.currentTimeMillis(); int result = future.get(); System.out.println("Future.get方法等待了" + (System.currentTimeMillis() - start) + "毫秒,計算結果:" + result); } }
打印結果
Future.get方法等待了3142毫秒,計算結果:44850
AbstractExecutorService的基本方法是submit,其他方法的實現可以此為參照。可以看下其submit系列方法的實現是怎樣的。
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
不論入參是Runnable還是Callable,最終都會將其封裝成RunnableFuture,而RunnableFuture又是Runnable的子接口,而后去調用execute(Runnable),最后返回RunnableFuture 。這里有點奇怪的地方在於對入參的封裝,其實質相當於將Runnable或Callable任務封裝成Runnable結果。這里也再次體現出了RunnableFuture的作用,它是連接“任務的提交”和“任務的執行”的橋梁。
上面的submit方法體中調用newTaskFor,它實際上返回一個FutureTask類型的實例對象。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
2)Future實現
上面的提到的RunnableFuture同時繼承了Runnable和Future接口,RunnableFuture本身沒有增加任何抽象方法.
public interface RunnableFuture<V> extends Runnable, Future<V> { void run();//此方法是父接口Runnable的抽象方法 }
而FutureTask又是RunnableFutrue接口的主要實現類,我們可以來看看其的實現細節
它有一個成員變量state表示狀態
private volatile int state;
它有這些可能取值
private static final int NEW = 0;//剛開始的狀態或任務在運行中 private static final int COMPLETING = 1;//臨時狀態,任務即將結束,正在設置結果 private static final int NORMAL = 2;//任務正常完成 private static final int EXCEPTIONAL = 3;//因拋出異常而結束任務 private static final int CANCELLED = 4;//任務被取消 private static final int INTERRUPTING = 5;//任務正在被中斷 private static final int INTERRUPTED = 6;//任務被中斷(中斷的最終狀態)
其他成員變量
private Callable<V> callable; private Object outcome; // non-volatile, protected by state reads/writes private volatile Thread runner; private volatile WaitNode waiters;
成員變量callable表示要執行的任務。
成員變量outcome表示任務的結果或任務非正常結束的異常
成員變量runner表示執行此任務的線程
成員變量waiter表示等待任務執行結果的等待棧表(數據結構是單向鏈表,先進后出)。WaitNode是一個簡單的靜態內部,一個成員變量thread表示等待結果的線程,另一個成員變量next表示下一個等待節點(線程)。
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
FutureTask的構造方法會初始化callable和state。FutureTask的構造方法可以接受Runnable類型參數,它會調用Executors.callable將Runnable轉換為Callable類型實例,以便於統一處理。
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
任務執行服務會使用一個線程執行FutureTask的run方法,run方法的實現
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) //將當前線程設置為執行任務的線程 return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call();//執行任務 ran = true; } catch (Throwable ex) { //運行時有異常,設置異常 result = null; ran = false; setException(ex); } if (ran) set(result);//設置結果 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; //state已是最終狀態,不再變化,將runer設為null,防止run方法被並發調用 // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; //清空運行線程runner后再重新獲取state,防止遺漏掉對中斷的處理 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
其主要邏輯是:
①檢查狀態,設置運行任務的線程
②調用callable的call方法去執行任務,並捕獲運行中可能出現的異常
③如果任務正常完成,調用set設置任務的結果,將state設為NORMAL, 將結果保存到outcome ,喚醒所有等待結果的線程
⑤最后將運行線程runner清空,若狀態可能是任務被取消的中斷還要處理此中斷。
對於任務提交者,可通過get方法獲取任務的最終結果,它的超時版本是
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && //還未完成 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) //等待完成 //到了限定時間,任務仍未完成,拋出超時異常TimeoutException throw new TimeoutException(); return report(s);//報告結果 }
其主要邏輯是:若任務未完成就等待任務完成,最后調用report報告結果,report會根據狀態返回結果或拋出異常。
而report方法的基本邏輯也很簡單:若是任務正常結束就返回這個任務的結果,若是任務被取消,就拋出任務取消異常CancellationException,若是在執行任務過程中發生了異常就統一將其封裝成ExecutionException並拋出。
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
說到任務被取消,我們可以看看cancel(boolean)方法如何實現的
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) //①不是NEW狀態,表示任務至少是COMPLETING(即將結束)狀態,返回false //②CAS更新state為INTERRUPTING或CANCELLED失敗,返回false //只有state狀態更新成功,才能取消任務(防止被並發調用) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) {//允許中斷就設置中斷標志 try { Thread t = runner; if (t != null) t.interrupt();//設置中斷標志 } finally { // final state 設置中斷的最終狀態 //INTERRUPTING -> INTERRUPTED ,將state由“正在中斷”更新為”已經中斷“ UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { //從等待棧中喚醒並移除所有的線程(節點) finishCompletion(); } return true; }
其基本邏輯:
①任務已結束或被取消,返回false
②若mayInterruptIfRunning為true,調用interrupt設置中斷標志,將state設置為INTERRUPTED,若mayInterruptIfRunning為false,將state設為CANCELLED.
③調用finishCompletion喚醒並移除等待棧中的所有線程