Callable是Java里面與Runnable經常放在一起說的接口。
Callable是類似於Runnable的接口,實現Callable接口的類和實現Runnable的類都是可被其他線程執行的任務。
Callable的接口定義如下;
public interface Callable<V> {
V call() throws Exception;
}
Callable和Runnable的區別如下:
I Callable定義的方法是call,而Runnable定義的方法是run。
II Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
III Callable的call方法可拋出異常,而Runnable的run方法不能拋出異常。
Future 介紹
Future表示異步計算的結果,它提供了檢查計算是否完成的方法,以等待計算的完成,並檢索計算的結果。Future的cancel方法可以取消任務的執行,它有一布爾參數,參數為 true 表示立即中斷任務的執行,參數為 false 表示允許正在運行的任務運行完成。Future的 get 方法等待計算完成,獲取計算結果。
下面四個頭文件一般是一起用的:
import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;
* Callable是類似於Runnable的接口,實現Callable接口的類和實現Runnable的類都是可被其它線程執行的任務。
* Callable和Runnable有幾點不同:
* (1)Callable規定的方法是call(),而Runnable規定的方法是run().
* (2)Callable的任務執行后可返回值,而Runnable的任務是不能返回值的。
* (3)call()方法可拋出異常,而run()方法是不能拋出異常的。
* (4)運行Callable任務可拿到一個Future對象,
* Future 表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並檢索計算的結果。
* 通過Future對象可了解任務執行情況,可取消任務的執行,還可獲取任務執行的結果。
示例代碼如下:
package com.company; import java.util.concurrent.*; public class Main { public static class MyCallable implements Callable { private int flag = 0; public MyCallable(int flag) { this.flag = flag; } @Override public Object call() throws Exception { if (this.flag == 0) { return "flag == 0"; } else if (this.flag == 1) { try { while (true) { System.out.println("looping"); Thread.sleep(2000); } } catch (InterruptedException e) { System.out.println("Interrupted"); } return false; } else { throw new Exception("Bad flag value!"); } } } public static void main(String[] args) { MyCallable task0 = new MyCallable(0); MyCallable task1 = new MyCallable(1); MyCallable task2 = new MyCallable(2); ExecutorService es = Executors.newFixedThreadPool(3); try { Future future0 = es.submit(task0); System.out.println("task0: " + future0.get()); Future future1 = es.submit(task1); Thread.sleep(5000); System.out.println("task1 cancelled: " + future1.cancel(true)); // 注意這時候Task在Sleep,是被Interrupted了 Future future2 = es.submit(task2); System.out.println("task2: " + future2.get()); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } es.shutdown(); System.out.println(); } }
運行的結果是:
task0: flag == 0 looping looping looping Interrupted java.util.concurrent.ExecutionException: java.lang.Exception: Bad flag value! task1 cancelled: true at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at com.company.Main.main(Main.java:89) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Caused by: java.lang.Exception: Bad flag value! at com.company.Main$MyCallable.call(Main.java:67) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
可以看出上面的三個部分,分別對應了3個flag的線程執行。
同時也復習了 ExecutorService 的內容。
用CompletionService的例子可以看這里:
package com.company; import java.util.HashMap; import java.util.Map; import java.util.concurrent.*; public class Main { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(es); for (int i=0; i<5; i++) { final int taskId = i; cs.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { return taskId; } }); } try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } for (int i=0; i<5; i++) { // 注意,用了CompletionService之后,異常獲取是放在這里 try { //System.out.println("i" + i); System.out.println(cs.take().get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } es.shutdown(); System.out.println(); } }
運行結果:
1 0 2 3 4
注意,這個順序是按照完成順序排列的。
其實,這個也是先線程運行,然后sleep,最后get結果的,可以見文章再后面部分的詳解。
Runnable
它只有一個run()函數,用於將耗時操作寫在其中,該函數沒有返回值。作為參數傳給Thread構造函數,Thread類在調用start()函數后就是執行的是Runnable的run()函數(異步,立即返回)。Runnable的聲明如下:
public interface Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * * @see java.lang.Thread#run() */ public abstract void run(); }
Callable
Callable與Runnable的功能大致相似,Callable中有一個call()函數,但是call()函數有返回值,而Runnable的run()函數不能將結果返回給客戶程序。Callable的聲明如下 :
可以看到,這是一個泛型接口,call()函數返回的類型就是客戶程序傳遞進來的V類型。
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
注意上面的 throws Exception,這是重要的區別。你覆蓋其父類方法時不能拋出父類沒有聲明的可捕獲異常。所以你繼承的Runnable的run方法中不能拋出任何的exception,而繼承的Callable的call方法里面可以拋出異常。
還有注意,拋出拋出unchecked exception(Error或RuntimeException或子類),和拋出異常是兩碼事情。因為unchecked exception即RuntimeException(運行時異常)是不需要try...catch...或throws 機制去處理的異常,就好像空指針異常一樣,你可以在任何的代碼中進行拋出或捕獲。但是捕獲方式跟try...catch...或throws 機制不一樣。
對於如何捕獲unchecked exception,可以實現Thread.UncaughtExceptionHandler中的uncaughtException來捕獲:
代碼如下:
package com.company; import java.util.concurrent.*; class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { System.out.println("Caught " + e); } } class HandleThread implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler()); return t; } } class MyRunnable implements Runnable { @Override public void run() { System.out.println("About to throw RuntimeException"); throw new RuntimeException(); } } public class Main { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(new HandleThread()); es.execute(new Thread(new MyRunnable())); es.shutdown(); System.out.println(); } }
輸出結果:
About to throw RuntimeException Caught java.lang.RuntimeException
如果把上面的RuntimeException換成Error,如下:
throw new Error(); 輸出結果: About to throw RuntimeException Caught java.lang.Error 不太明白,為什么RuntimeException多了一個空行
如果換成除0的運算式子,如下:
int a = 12 / 0; 輸出如下: About to throw RuntimeException Caught java.lang.ArithmeticException: / by zero
如果 Executors.newCachedThreadPool 的參數里面不加上 ThreadFactory,那么這兩種運行時錯誤都是抓不住的。
當然了,也可以對Thread直接設置,如下:
class ErrHandler implements UncaughtExceptionHandler { ... } ThreadA a = new ThreadA(); handle = new ErrHandler(); a.setUncaughtExceptionHandler(handle);// 加入定義的ErrHandler a.start();
Future
Executor就是Runnable和Callable的調度容器,Future就是對於具體的Runnable或者Callable任務的執行結果進行
取消、查詢是否完成、獲取結果、設置結果操作。get方法會阻塞,直到任務返回結果(Future簡介)。
看下面函數的說明,應該能看出大概:
/** * @see FutureTask * @see Executor * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this Future's <tt>get</tt> method */ public interface Future<V> { /** * Attempts to cancel execution of this task. This attempt will * fail if the task has already completed, has already been cancelled, * or could not be cancelled for some other reason. If successful, * and this task has not started when <tt>cancel</tt> is called, * this task should never run. If the task has already started, * then the <tt>mayInterruptIfRunning</tt> parameter determines * whether the thread executing this task should be interrupted in * an attempt to stop the task. * */ boolean cancel(boolean mayInterruptIfRunning); /** * Returns <tt>true</tt> if this task was cancelled before it completed * normally. */ boolean isCancelled(); /** * Returns <tt>true</tt> if this task completed. * */ boolean isDone(); /** * Waits if necessary for the computation to complete, and then * retrieves its result. * * @return the computed result */ V get() throws InterruptedException, ExecutionException; /** * Waits if necessary for at most the given time for the computation * to complete, and then retrieves its result, if available. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the computed result */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
注意上面的TimeUnit,是這樣子的,能夠控制前面的timeout數字代表的是天、時還是秒等。通過如下定義:
TimeUnit timeUnit =TimeUnit.DAYS;
就代表了,前面timeout是1就1天,前面timeout是2就2天。
FutureTask
FutureTask則是一個RunnableFuture<V>,而RunnableFuture實現了Runnbale又實現了Futrue<V>這兩個接口,
public class FutureTask<V> implements RunnableFuture<V>
RunnableFuture
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
另外它還可以包裝Runnable和Callable<V>, 由構造函數注入依賴。
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
可以看到,Runnable注入會被Executors.callable()函數轉換為Callable類型,即FutureTask最終都是執行Callable類型的任務。該適配函數的實現如下 :
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
RunnableAdapter適配器
/** * A callable that runs given task and returns given result */ static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
由於FutureTask實現了Runnable,因此它既可以通過Thread包裝來直接執行,也可以提交給ExecuteService來執行。
並且還可以直接通過get()函數獲取執行結果,該函數會阻塞,直到結果返回。因此FutureTask既是Future、
Runnable,內部又包裝了Callable( 如果是Runnable最終也會被轉換為Callable ), 它是這兩者的合體。
來一段代碼示例吧:
public class Main { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("hi hi"); return null; } }); es.submit(futureTask); es.shutdown(); System.out.println(); } }
運行結果是能夠打印。注意上面的submit接口接受的參數是Runnable和Callable。這里FutureTask是作為Runnable來調用的。
而execute接口只接受Runnable。
關於 submit 和 execute的區別如下:
1、接收的參數不一樣,剛剛說了。
2、submit有返回值,而execute沒有
Method submit extends base method Executor.execute by creating and returning a Future that can be used to cancel execution and/or wait for completion. 用到返回值的例子,比如說我有很多個做validation的task,我希望所有的task執行完,然后每個task告訴我它的執行結果,是成功還是失敗,如果是失敗,原因是什么。然后我就可以把所有失敗的原因綜合起來發給調用者。
從之前的例子應該可以看出,返回了Future實例,而通過Future可以get和cancel操作。
Future future2 = es.submit(task2);
3、submit方便Exception處理
There is a difference when looking at exception handling. If your tasks throws an exception and if it was submitted with execute this exception will go to the uncaught exception handler (when you don't have provided one explicitly, the default one will just print the stack trace to System.err). If you submitted the task with submit any thrown exception, checked or not, is then part of the task's return status. For a task that was submitted with submit and that terminates with an exception, the Future.get will rethrow this exception, wrapped in an ExecutionException. 意思就是如果你在你的task里會拋出checked或者unchecked exception,而你又希望外面的調用者能夠感知這些exception並做出及時的處理,
那么就需要用到submit,通過捕獲Future.get拋出的異常。
上面這個尤其要關注,是調用get的時候,才會拋出異常。如果不調用get,那其實線程還是執行的,但是不會拋出異常。
package com.company; import java.util.concurrent.*; public class Main { public static class MyCallable implements Callable { private int flag = 0; public MyCallable(int flag) { this.flag = flag; } @Override public Object call() throws Exception { if (this.flag == 0) { return "flag == 0"; } else if (this.flag == 1) { try { while (true) { System.out.println("looping"); Thread.sleep(4000); } } catch (InterruptedException e) { System.out.println("Interrupted"); } return false; } else { System.out.println("About to throw Exception"); throw new Exception("Bad flag value!"); } } } public static void main(String[] args) { MyCallable task0 = new MyCallable(0); MyCallable task1 = new MyCallable(1); MyCallable task2 = new MyCallable(2); ExecutorService es = Executors.newFixedThreadPool(3); try { Future future0 = es.submit(task0); System.out.println("task0: " + future0.get()); Future future1 = es.submit(task1); Thread.sleep(5000); System.out.println("task1 cancelled: " + future1.cancel(false)); Future future2 = es.submit(task2); //System.out.println("task2: " + future2.get()); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } es.shutdown(); System.out.println(); } }
注意,其中兩個改動點,已飄紅:
1. 第二個Callable返回的Future的cancel參數用false;
2. 第三個拋出異常的Callable返回的Future不再調用get函數,看是否有異常拋出。在拋出異常前,也加了一個打印。
運行結果如下:
task0: flag == 0 looping looping task1 cancelled: true About to throw Exception looping looping
可以看到2點:
1. 循環一直在繼續,也就是cancel(false)壓根沒能夠停止住線程。意思是等線程運行結束了,狀態置為cancelled?
2. 沒有get就沒有實際拋出異常。
對於之前那個CompletionService的例子,增加了一些改動如下:
package com.company; import java.util.concurrent.*; public class Main { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(es); for (int i=0; i<5; i++) { final int taskId = i; cs.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("Run thread " + taskId); return taskId; } }); } try { System.out.println("Sleeping"); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } for (int i=0; i<5; i++) { // 注意,用了CompletionService之后,異常獲取是放在這里 try { //System.out.println("i" + i); System.out.println(cs.take().get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } es.shutdown(); System.out.println(); } }
運行了之后,得到結果:
Run thread 0 Run thread 1 Run thread 2 Run thread 3 Sleeping Run thread 4 (此處等待數秒) 0 1 2 3 4
證明了,CompletionService也是先運行,然后get的時候才獲得結果的。
也就是說,submit和execute在啟動線程方面,沒有區別,都是即時執行的。
再復習一下異常的情況,如果一道面試題是這樣的:
一個線程運行時發生異常會怎樣?
那么,回答,要區分普通異常,還是RuntimeException(包括其子類和Error),
如果是普通異常,Runnable無法拋出,但是Callable可以拋出,應該通過在Future實例的get方法調用的地方接住;
如果是RuntimeException,如下:
簡單的說,如果異常沒有被捕獲該線程將會停止執行。Thread.UncaughtExceptionHandler是用於處理未捕獲異常造成線程突然中斷情況的一個內嵌接口。
當一個未捕獲異常將造成線程中斷的時候JVM會使用Thread.getUncaughtExceptionHandler()來查詢線程的UncaughtExceptionHandler,
並將線程和異常作為參數傳遞給handler的uncaughtException()方法進行處理。
需要特別注意的是,一個線程的異常只會終止當前線程,而對其他線程和主線程是沒有任何影響的。比如如下例子:
package com.company; import java.util.concurrent.*; class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { System.out.println("Caught " + e); } } class HandleThread implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler()); return t; } } class MyRunnable implements Runnable { @Override public void run() { System.out.println("About to throw RuntimeException"); throw new RuntimeException(); } } public class Main { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool();//new HandleThread()); es.execute(new Thread(new MyRunnable())); es.shutdown(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Done"); } }
調用結果,當子線程出現RuntimeException並拋出未接住的時候,停止的只有子線程,而主線程是不受影響的。
About to throw RuntimeException Exception in thread "pool-1-thread-1" java.lang.RuntimeException at com.company.MyRunnable.run(Main.java:28) at java.lang.Thread.run(Thread.java:745) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) (此處有數秒主線程Sleep停頓) Done
注意領悟下面這段話:
在java多線程程序中,所有線程都不允許拋出未捕獲的checked exception(比如sleep時的InterruptedException),(注:Callable可以,另談)
也就是說各個線程需要自己把自己的checked exception處理掉。
這一點是通過java.lang.Runnable.run()方法聲明(因為此方法聲明上沒有throw exception部分)進行了約束。
但是線程依然有可能拋出unchecked exception(如運行時異常),當此類異常跑拋出時,線程就會終結,而對於主線程和其他線程完全不受影響,
且完全感知不到某個線程拋出的異常(也是說完全無法catch到這個異常)。(注:可以實現UncaughtExceptionHandler並且對本線程加上這個handler來處理)
JVM的這種設計源自於這樣一種理念:“線程是獨立執行的代碼片斷,線程的問題應該由線程自己來解決,而不要委托到外部。”
基於這樣的設計理念,在Java中,線程方法的異常(無論是checked還是unchecked exception),
都應該在線程代碼邊界之內(run方法內)進行try catch並處理掉.換句話說,我們不能捕獲從線程中逃逸的異常。