本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》,由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買,京東自營鏈接:http://item.jd.com/12299018.html
Java並發包提供了一套框架,大大簡化了執行異步任務所需的開發,本節我們就來初步探討這套框架。
在之前的介紹中,線程Thread既表示要執行的任務,又表示執行的機制,而這套框架引入了一個"執行服務"的概念,它將"任務的提交"和"任務的執行"相分離,"執行服務"封裝了任務執行的細節,對於任務提交者而言,它可以關注於任務本身,如提交任務、獲取結果、取消任務,而不需要關注任務執行的細節,如線程創建、任務調度、線程關閉等。
以上描述可能比較抽象,接下來,我們會一步步具體闡述。
基本接口
首先,我們來看任務執行服務涉及的基本接口:
- Runnable和Callable:表示要執行的異步任務
- Executor和ExecutorService:表示執行服務
- Future:表示異步任務的結果
Runnable和Callable
關於Runnable和Callable,我們在前面幾節都已經了解了,都表示任務,Runnable沒有返回結果,而Callable有,Runnable不會拋出異常,而Callable會。
Executor和ExecutorService
Executor表示最簡單的執行服務,其定義為:
public interface Executor { void execute(Runnable command); }
就是可以執行一個Runnable,沒有返回結果。接口沒有限定任務如何執行,可能是創建一個新線程,可能是復用線程池中的某個線程,也可能是在調用者線程中執行。
ExecutorService擴展了Executor,定義了更多服務,基本方法有:
public interface ExecutorService extends Executor { <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); //... 其他方法 }
這三個submit都表示提交一個任務,返回值類型都是Future,返回后,只是表示任務已提交,不代表已執行,通過Future可以查詢異步任務的狀態、獲取最終結果、取消任務等。我們知道,對於Callable,任務最終有個返回值,而對於Runnable是沒有返回值的,第二個提交Runnable的方法可以同時提供一個結果,在異步任務結束時返回,而對於第三個方法,異步任務的最終返回值為null。
Future
我們來看Future接口的定義:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
get用於返回異步任務最終的結果,如果任務還未執行完成,會阻塞等待,另一個get方法可以限定阻塞等待的時間,如果超時任務還未結束,會拋出TimeoutException。
cancel用於取消異步任務,如果任務已完成、或已經取消、或由於某種原因不能取消,cancel返回false,否則返回true。如果任務還未開始,則不再運行。但如果任務已經在運行,則不一定能取消,參數mayInterruptIfRunning表示,如果任務正在執行,是否調用interrupt方法中斷線程,如果為false就不會,如果為true,就會嘗試中斷線程,但我們從69節知道,中斷不一定能取消線程。
isDone和isCancelled用於查詢任務狀態。isCancelled表示任務是否被取消,只要cancel方法返回了true,隨后的isCancelled方法都會返回true,即使執行任務的線程還未真正結束。isDone表示任務是否結束,不管什么原因都算,可能是任務正常結束、可能是任務拋出了異常、也可能是任務被取消。
我們再來看下get方法,任務最終大概有三個結果:
- 正常完成,get方法會返回其執行結果,如果任務是Runnable且沒有提供結果,返回null
- 任務執行拋出了異常,get方法會將異常包裝為ExecutionException重新拋出,通過異常的getCause方法可以獲取原異常
- 任務被取消了,get方法會拋出異常CancellationException
如果調用get方法的線程被中斷了,get方法會拋出InterruptedException。
Future是一個重要的概念,是實現"任務的提交"與"任務的執行"相分離的關鍵,是其中的"紐帶",任務提交者和任務執行服務通過它隔離各自的關注點,同時進行協作。
基本用法
基本示例
說了這么多接口,具體怎么用呢?我們看個簡單的例子:
public class BasicDemo { static class Task implements Callable<Integer> { @Override public Integer call() throws Exception { int sleepSeconds = new Random().nextInt(1000); Thread.sleep(sleepSeconds); return sleepSeconds; } } public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<Integer> future = executor.submit(new Task()); // 模擬執行其他任務 Thread.sleep(100); try { System.out.println(future.get()); } catch (ExecutionException e) { e.printStackTrace(); } executor.shutdown(); } }
我們使用了工廠類Executors創建了一個任務執行服務,Executors有多個靜態方法,可以用來創建ExecutorService,這里使用的是:
public static ExecutorService newSingleThreadExecutor()
表示使用一個線程執行所有服務,后續我們會詳細介紹Executors,注意與Executor相區別,后者是單數,是接口。
不管ExecutorService是如何創建的,對使用者而言,用法都一樣,例子提交了一個任務,提交后,可以繼續執行其他事情,隨后可以通過Future獲取最終結果或處理任務執行的異常。
最后,我們調用了ExecutorService的shutdown方法,它會關閉任務執行服務。
ExecutorService的更多方法
前面我們只是介紹了ExecutorService的三個submit方法,其實它還有如下方法:
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
有兩個關閉方法,shutdown和shutdownNow,區別是,shutdown表示不再接受新任務,但已提交的任務會繼續執行,即使任務還未開始執行,shutdownNow不僅不接受新任務,已提交但尚未執行的任務會被終止,對於正在執行的任務,一般會調用線程的interrupt方法嘗試中斷,不過,線程可能不響應中斷,shutdownNow會返回已提交但尚未執行的任務列表。
shutdown和shutdownNow不會阻塞等待,它們返回后不代表所有任務都已結束,不過isShutdown方法會返回true。調用者可以通過awaitTermination等待所有任務結束,它可以限定等待的時間,如果超時前所有任務都結束了,即isTerminated方法返回true,則返回true,否則返回false。
ExecutorService有兩組批量提交任務的方法,invokeAll和invokeAny,它們都有兩個版本,其中一個限定等待時間。
invokeAll等待所有任務完成,返回的Future列表中,每個Future的isDone方法都返回true,不過isDone為true不代表任務就執行成功了,可能是被取消了,invokeAll可以指定等待時間,如果超時后有的任務沒完成,就會被取消。
而對於invokeAny,只要有一個任務在限時內成功返回了,它就會返回該任務的結果,其他任務會被取消,如果沒有任務能在限時內成功返回,拋出TimeoutException,如果限時內所有任務都結束了,但都發生了異常,拋出ExecutionException。
ExecutorService的invokeAll示例
我們在64節介紹過使用jsoup下載和分析HTML,我們使用它看一個invokeAll的例子,同時下載並分析兩個URL的標題,輸出標題內容,代碼為:
public class InvokeAllDemo { static class UrlTitleParser implements Callable<String> { private String url; public UrlTitleParser(String url) { this.url = url; } @Override public String call() throws Exception { Document doc = Jsoup.connect(url).get(); Elements elements = doc.select("head title"); if (elements.size() > 0) { return elements.get(0).text(); } return null; } } public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(10); String url1 = "http://www.cnblogs.com/swiftma/p/5396551.html"; String url2 = "http://www.cnblogs.com/swiftma/p/5399315.html"; Collection<UrlTitleParser> tasks = Arrays.asList(new UrlTitleParser[] { new UrlTitleParser(url1), new UrlTitleParser(url2) }); try { List<Future<String>> results = executor.invokeAll(tasks, 10, TimeUnit.SECONDS); for (Future<String> result : results) { try { System.out.println(result.get()); } catch (ExecutionException e) { e.printStackTrace(); } } } catch (InterruptedException e) { e.printStackTrace(); } executor.shutdown(); } }
這里,使用了Executors的另一個工廠方法newFixedThreadPool創建了一個線程池,這樣使得多個任務可以並發執行,關於線程池,我們下節介紹。
其它代碼比較簡單,我們就不解釋了。使用ExecutorService,編寫並發異步任務的代碼就像寫順序程序一樣,不用關心線程的創建和協調,只需要提交任務、處理結果就可以了,大大簡化了開發工作。
基本實現原理
了解了ExecutorService和Future的基本用法,我們來看下它們的基本實現原理。
ExecutorService的主要實現類是ThreadPoolExecutor,它是基於線程池實現的,關於線程池我們下節再介紹。ExecutorService有一個抽象實現類AbstractExecutorService,本節,我們簡要分析其原理,並基於它實現一個簡單的ExecutorService,Future的主要實現類是FutureTask,我們也會簡要探討其原理。
AbstractExecutorService
AbstractExecutorService提供了submit, invokeAll和invokeAny的默認實現,子類只需要實現如下方法:
public void shutdown() public List<Runnable> shutdownNow() public boolean isShutdown() public boolean isTerminated() public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException public void execute(Runnable command)
除了execute,其他方法都與執行服務的生命周期管理有關,簡化起見,我們忽略其實現,主要考慮execute。
submit/invokeAll/invokeAny最終都會調用execute,execute決定了到底如何執行任務,簡化起見,我們為每個任務創建一個線程,一個完整的最簡單的ExecutorService實現類如下:
public class SimpleExecutorService extends AbstractExecutorService { @Override public void shutdown() { } @Override public List<Runnable> shutdownNow() { return null; } @Override public boolean isShutdown() { return false; } @Override public boolean isTerminated() { return false; } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return false; } @Override public void execute(Runnable command) { new Thread(command).start(); } }
對於前面的例子,創建ExecutorService的代碼可以替換為:
ExecutorService executor = new SimpleExecutorService();
可以實現相同的效果。
ExecutorService最基本的方法是submit,它是如何實現的呢?我們來看AbstractExecutorService的代碼:
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
它調用newTaskFor生成了一個RunnableFuture,RunnableFuture是一個接口,既擴展了Runnable,又擴展了Future,沒有定義新方法,作為Runnable,它表示要執行的任務,傳遞給execute方法進行執行,作為Future,它又表示任務執行的異步結果。這可能令人混淆,我們來看具體代碼:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
就是創建了一個FutureTask對象,FutureTask實現了RunnableFuture接口。它是怎么實現的呢?
FutureTask
它有一個成員變量表示待執行的任務,聲明為:
private Callable<V> callable;
有個整數變量state表示狀態,聲明為:
private volatile int state;
取值可能為:
NEW = 0; //剛開始的狀態,或任務在運行 COMPLETING = 1; //臨時狀態,任務即將結束,在設置結果 NORMAL = 2; //任務正常執行完成 EXCEPTIONAL = 3; //任務執行拋出異常結束 CANCELLED = 4; //任務被取消 INTERRUPTING = 5; //任務在被中斷 INTERRUPTED = 6; //任務被中斷
有個變量表示最終的執行結果或異常,聲明為:
private Object outcome;
有個變量表示運行任務的線程:
private volatile Thread runner;
還有個單向鏈表表示等待任務執行結果的線程:
private volatile WaitNode waiters;
FutureTask的構造方法會初始化callable和狀態,如果FutureTask接受的是一個Runnable對象,它會調用Executors.callable轉換為Callable對象,如下所示:
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
任務執行服務會使用一個線程執行FutureTask的run方法,run()代碼為:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
其基本邏輯是:
- 調用callable的call方法,捕獲任何異常
- 如果正常執行完成,調用set設置結果,保存到outcome
- 如果執行過程發生異常,調用setException設置異常,異常也是保存到outcome,但狀態不一樣
- set和setException除了設置結果,修改狀態外,還會調用finishCompletion,它會喚醒所有等待結果的線程
對於任務提交者,它通過get方法獲取結果,限時get方法的代碼為:
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
其基本邏輯是,如果任務還未執行完畢,就等待,最后調用report報告結果, report根據狀態返回結果或拋出異常,代碼為:
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
cancel方法的代碼為:
public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; if (mayInterruptIfRunning) { if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }
其基本邏輯為:
- 如果任務已結束或取消,返回false
- 如果mayInterruptIfRunning為true,調用interrupt中斷線程,設置狀態為INTERRUPTED
- 如果mayInterruptIfRunning為false,設置狀態為CANCELLED
- 調用finishCompletion喚醒所有等待結果的線程
invokeAll和invokeAny
理解了FutureTask,我們再來看AbstractExecutorService的其他方法,invokeAll的基本邏輯很簡單,對每個任務,創建一個FutureTask,並調用execute執行,然后等待所有任務結束。
invokeAny的實現稍微復雜些,它利用了ExecutorCompletionService,關於這個類及invokeAny的實現,我們后續章節再介紹。
小結
本節介紹了Java並發包中任務執行服務的基本概念和原理,該服務體現了並發異步開發中"關注點分離"的思想,使用者只需要通過ExecutorService提交任務,通過Future操作任務和結果即可,不需要關注線程創建和協調的細節。
本節主要介紹了AbstractExecutorService和FutureTask的基本原理,實現了一個最簡單的執行服務SimpleExecutorService,對每個任務創建一個單獨的線程。實際中,最經常使用的執行服務是基於線程池實現的ThreadPoolExecutor,線程池是並發程序中一個非常重要的概念和技術,讓我們下一節來探討。
(與其他章節一樣,本節所有代碼位於 https://github.com/swiftma/program-logic)
----------------
未完待續,查看最新文章,敬請關注微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及計算機技術的本質。用心原創,保留所有版權。