在實際開發中,我們常常會用到線程池,但任務一旦提交到線程池之后,如果發生異常之后,怎么處理? 怎么獲取到異常信息?而不是任務提交之后,消失的無影無蹤。
要知道以上答案,先看下 線程池里面的線程發生異常之后會發生什么。
我們先通過工具類Executors創建一個簡單的線程池,里面核心線程數為1
ExecutorService executorService=Executors.newFixedThreadPool(1); executorService.submit(()->{ int i=1/0; }); executorService.submit(()->{ System.out.println("當線程池拋出異常后繼續新的任務"); });
上面我們創建了一個核心線程數和最大線程數都為1的線程池。
然后往里面提交了兩個任務。
其中一個任務 int i=1/0; 0不能作為除數,因此會拋出異常,java線程中 如果拋出未被捕獲的異常,會導致線程終止。
該線程池中只有一個線程,如果終止之后,提交第二個任務會發生什么?
運行結果

答案是第一個線程內部發生異常之后,沒有任何異常信息出現,第二個任務正常執行。
線程池有兩種提交方式 excute和sumbit ,換成excute 看下
ExecutorService executorService=Executors.newFixedThreadPool(1); executorService.execute(()->{ int i=1/0; }); executorService.execute(()->{ System.out.println("當線程池拋出異常后繼續新的任務"); });

第一個任務出現了異常棧信息,第二個任務正常執行。
線程池的兩種不同的提交方式,會有不同的異常情形,但是不管怎樣,線程內的任務拋出異常之后,線程池照樣能正常運行。
問題1:execute和submit有什么區別?為什么一個會拋出異常 一個不會?
這個問題先放這,我們等下回頭再看,這個問題非常重要。
那么當前最要緊的問題來了,我們要如何才能獲取到線程池里面的任務拋出的異常?
解決方案一,將整個任務try-catch起來,捕獲里面的異常,這種方式是最簡單有效的方式。
executorService.execute(()->{ try{ int i=1/0; }catch (Exception ex){ System.out.println(ex.getMessage()); } });

換成submit提交
executorService.submit(()->{ try{ int i=1/0; }catch (Exception ex){ System.out.println("sumbit提交"+ex.getMessage()); } }); executorService.submit(()->{ System.out.println("當線程池拋出異常后繼續新的任務"); });

可以看到 清晰易懂的捕獲到了異常,可以知道我們的任務出現了問題,而不是消失的無影無蹤。
解析方案2: 每一個任務都加一個try-catch 實在是太麻煩了,而且代碼也不好看,那么這樣想的話,可以用UncaughtExceptionHandler 這個類。

UncaughtExceptionHandler 是Thread類一個內部類,也是一個函數式接口。
內部的uncaughtException是一個處理線程內發生的異常的方法,參數為線程對象t和異常對象e。
使用方式如下
//創建線程對象 內部會拋出異常 Thread thread=new Thread(()->{ int i=1/0; }); //設置該對象的默認異常處理器 thread.setDefaultUncaughtExceptionHandler((Thread t, Throwable e)->{ System.out.println("exceptionHandler"+e.getMessage()); }); //啟動線程 thread.start();
運行結果

相反,如果我們不設置UncaughtExceptionHandler ,那么就是
Thread thread=new Thread(()->{ int i=1/0; }); thread.start();

直接拋出異常。
因此 Thread的UncaughtExceptionHandler類能幫我們捕獲異常並處理, 那么在線程池里面生效嗎?
試試excute提交
ExecutorService executorService=Executors.newFixedThreadPool(1); Thread thread=new Thread(()->{ int i=1/0; }); thread.setDefaultUncaughtExceptionHandler((Thread t, Throwable e)->{ System.out.println("exceptionHandler"+e.getMessage()); }); executorService.execute(thread);

在excute提交方式里面是生效的。
那么因此,如果我們不想在每個線程的任務里面都加try-catch的話,可以自己實現的一個線程池,重寫它的線程工廠方法,在創建線程的時候,都賦予UncaughtExceptionHandler處理器對象。
具體代碼如下
//1.實現一個自己的線程池工廠 ThreadFactory factory = (Runnable r) -> { //創建一個線程 Thread t = new Thread(r); //給創建的線程設置UncaughtExceptionHandler對象 里面實現異常的默認邏輯 t.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> { System.out.println("線程工廠設置的exceptionHandler" + e.getMessage()); }); return t; }; //2.創建一個自己定義的線程池,使用自己定義的線程工廠 ExecutorService service = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(10),factory); //3.提交任務 service.execute(()->{ int i=1/0; });

測試發現,即使我們不用在thread里面try-catch 也能拿到異常信息了。
我們來看下setDefaultUncaughtExceptionHandler的原理是什么
是在什么時候調用的。
Thread類里面dispatchUncaughtException(Throwable e) 這個方法,調用了 getUncaughtExceptionHandler().uncaughtException(this, e); 獲取到了我們設置的UncaughtExceptionHandler,並把線程對象和異常對象都作為參數傳進去了。
這個方法,只能被JVM調用,將未捕獲異常分派給該方法所在的處理程序
調用的節點,就是當Thread對象拋出了未被捕獲的異常的時候。
了解到這里,是不是我們只需要在線程池的線程工廠里面給所有生產的線程都設置上這個處理器就好了嗎? 答案當然不是!
剛剛我們一直用的是excute, 這次我們用sumbit看下,
//1.實現一個自己的線程池工廠 ThreadFactory factory = (Runnable r) -> { //創建一個線程 Thread t = new Thread(r); //給創建的線程設置UncaughtExceptionHandler對象 里面實現異常的默認邏輯 t.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> { System.out.println("線程工廠設置的exceptionHandler" + e.getMessage()); }); return t; }; //2.創建一個自己定義的線程池,使用自己定義的線程工廠 ExecutorService service = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(10),factory); //3. submit 提交任務 service.submit(()->{ int i=1/0; });

結果是什么也沒有輸出,異常信息消失了。 說明UncaughtExceptionHandler並沒有被調用。
這樣就回到了我們之前的
問題1:execute和submit有什么區別?
在日常使用中,我們知道,execute和submit最大的區別就是execute沒有返回值,submit有返回值。
我們submit返回的是一個future ,可以通過這個future取到線程執行的結果或者異常信息。
ExecutorService executorService = Executors.newFixedThreadPool(1); //創建Callable對象 Callable callable=()->{return 1;}; //提交Callable進線程池,返回future Future future = executorService.submit(callable); try { //獲取線程池里面的結果 Integer a= (Integer) future.get(); System.out.println("future中獲取結果"+a); } catch (Exception e) { //獲取線程池里面的異常 System.out.println("future中獲取異常"+e.getMessage()); }

注意,如果要獲取返回的結果的話,這里的線程池提交的參數是Callable類型,而不是Thread或者Runable 。
那么獲取異常
ExecutorService executorService = Executors.newFixedThreadPool(1); //創建Callable對象 //會拋出異常 Callable callable=()->{return 1/0;}; //提交Callable進線程池,返回future Future future = executorService.submit(callable); try { //獲取線程池里面的結果 Integer a= (Integer) future.get(); System.out.println("future中獲取結果"+a); } catch (Exception e) { //獲取線程池里面的異常 System.out.println("future中獲取異常"+e.getMessage()); }

有沒有返回值是submit和excute最大的區別。 那么為什么 如果有返回值的submit ,里面的線程內的任務拋出未捕獲的異常的時候,不會顯示異常呢?
猜測是submit方法內部已經捕獲了異常, 只是沒有打印出來,也因為異常已經被捕獲,因此jvm也就不會去調用Thread的UncaughtExceptionHandler去處理異常。
接下來驗證猜測:
先看excute, 其是Executor的接口
再看submit
其是ExecutorService的接口,且ExecutorService繼承自Executor


然后!!重點!!
AbstractExecutorService 實現了ExecutorService接口,
抽象的AbstractExecutorService 類幾乎實現了ExecutorService接口的所有方法
包括submit(Runable task)
這里可以看到,submit內部 也是調用了execute 。
調用之前創建了一個runableFuture對象,而且將這future對象作為參數,調用execute(runable r ), 而且調用完execute之后 返回了這個future 作為返回值。
通過下面可以看到,runableFuture同時繼承了runable和future
因此runableFuture即是runable 也是future ,因此可以作為execute(runable r)的參數。
這里補充一點:
java中類是不允許多繼承的,但是接口可以,
因為類的多繼承會有問題,比如說類3繼承了類1和類2, 類1和類2都有方法名為A的方法,但是其內部的實現邏輯不同,那么類3到底是繼承的是誰的方法邏輯呢?
但是接口不一樣,因為接口的方法都是聲明沒有方法體, 接口3,繼承接口1和接口2的方法A, 方法A也只是一個方法聲明,沒有具體實現,不存在上述的歧義問題。
問題到這里面就明顯了,為什么submit有返回值了 ,大體邏輯如下
//創建一個即是Runnable又是Future的對象 RunnableFuture<Void> ftask = newTaskFor(task, null); //execute內部執行這個對象內部的邏輯,然后將結果或者異常 set到這個ftask里面 execute(ftask); //返回這個ftask return ftask;
ThreadPoolExecutor繼承了AbstractExecutorService ,實現了里面的execute方法。
同時作為Executors這個jdk自帶的線程池工具類里面創建線程必不可少的一個組件, 同時也是我們自己定義自己的線程池必不可少的一個基礎類。
我們之前的猜測是submit方法內部已經捕獲了異常, 只是沒有打印出來,也因為異常已經被捕獲,因此jvm也就不會去調用Thread的UncaughtExceptionHandler去處理異常。
而submit里面也只是調用了execute,因此問題就出在execute內部了。
接下來分析execute,這涉及到了線程池內部的原理了, 相當深入。
我們看下 execute的實現
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //獲取當前線程數 int c = ctl.get(); //如果當前線數小於核心線程數 if (workerCountOf(c) < corePoolSize) { **//新增一個worker** **if (addWorker(command, true))** return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
忽略其他的,重點關注,在線程池的excute里面,我們的任務被提交到了addWorker(command, true) 。
看下addWorker的實現, 在ThreadPoolExecutor的內部,有一個內部類叫Worker
addWorker為其的一個方法,作用是將runable封裝成Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { //....省略其他 private boolean addWorker(Runnable firstTask, boolean core) { //...省略其他 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //將firstTask 參數封裝成Worker **w = new Worker(firstTask);** final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //將worker加到線程池的隊列中 **workers.add(w);** int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //啟動線程池中的一個線程 **t.start();** workerStarted = true; } } } }
Worker的構造函數
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //從線程池的線程工廠里面中創建出來一個線程 this.thread = getThreadFactory().newThread(this); }
因此,任務被封裝了一個worker,而worker實現了runable接口,因此執行的邏輯就在worker的run方法里面,里面調用了runWorker
final void runWorker(Worker w) { //當前線程 Thread wt = Thread.currentThread(); //我們的提交的任務 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //直接就調用了task的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //調用線程池的afterExecute方法 傳入了task和異常 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
核心就在 task.run(); 這個方法里面了, 期間如果發生異常會被拋出。
因此,如果用execute提交的任務,會被封裝成了一個runable任務,然后進去 再被封裝成一個worker,最后在worker的run方法里面跑runWoker方法, 里面再又調了我們最初的參數 runable任務的任務,並且用try-catch捕獲了異常,會被直接拋出去,因此我們在execute中看到了我們的任務的異常信息。
那么為什么submit沒有異常信息呢? 因為submit是將任務封裝成了一個futureTask ,
然后這個futureTask被封裝成worker,在woker的run方法里面,最終調用的是futureTask的run方法, 猜測里面是直接吞掉了異常,並沒有拋出異常,因此在worker的runWorker方法里面無法捕獲到異常。
excute最終是在ThreadPoolExecutor才會真正的實現, 但是submit在abstractExecutorService就實現了,
其內容如下
newTaskFor方法將Runnable封裝成了一個future
然后再看futureTask的run方法,果不其然,生吞了異常,將異常放到了 setException(ex);里面
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
setException(ex);
將異常對象賦予outcome
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
outcome是futureTask的返回結果
調用futuretask的get的時候,返回report()
reoport里面實際上返回的是outcome ,剛好之前的異常就set到了這個outcome里面
因此,在用submit提交的時候,runable對象被封裝成了future ,里面的 run try-catch了所有的異常,並設置到了outcome里面, 可以通過future.get獲取到outcome。
所以在submit提交的時候,里面發生了異常, 是不會有任何拋出信息的。
那么在submit里面,除了從返回結果里面取到異常之外, 沒有其他方法了。
因此,在不需要返回結果的情況下,最好用execute ,這樣如果疏漏了異常捕獲,也不至於丟掉異常信息。
在excute的方法里面,可以通過重寫afterExecute進行異常處理,但是注意! 這個也只適用於excute提交,因為submit的task.run里面把異常吞了,根本不會跑出來異常,因此也不會有異常進入到afterExecute里面,里面的thrown參數為null。
在runWorker里面,調用task.run之后,會調用線程池的 afterExecute(task, thrown) 方法
final void runWorker(Worker w) { //當前線程 Thread wt = Thread.currentThread(); //我們的提交的任務 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //直接就調用了task的run方法 task.run(); //如果是futuretask的run,里面是吞掉了異常,不會有異常拋出, // 因此Throwable thrown = null; 也不會進入到catch里面 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //調用線程池的afterExecute方法 傳入了task和異常 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
因此重寫 afterExecute(task, thrown); 這個方法,在里面也可以處理異常
在ThreadPoolExecutor里面 afterExecute 方法內沒有任何邏輯
代碼例子:
//1.創建一個自己定義的線程池,重寫afterExecute方法 ExecutorService service = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(10)){ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); System.out.println("afterExecute里面獲取到異常信息"+t.getMessage()); } }; //2.提交任務 service.execute(()->{ int i=1/0; });

如果要用這個afterExecute處理submit提交的異常, 要額外處理,因為用submit提交的時候,里面的Throwable對象為null,是、 如果要取異常信息,需要在Runnable r里面取,此時這個r實際的類型是futureTask
//定義線程池 ExecutorService service = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(10)) { //重寫afterExecute方法 @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t != null) { //這個是excute提交的時候 System.out.println("afterExecute里面獲取到異常信息" + t.getMessage()); } //如果r的實際類型是FutureTask 那么是submit提交的,所以可以在里面get到異常 if (r instanceof FutureTask) { try { Future<?> future = (Future<?>) r; future.get(); } catch (Exception e) { log.error("future里面取執行異常", e); } } } }; //2.提交任務 service.submit(() -> { int i = 1 / 0; });

