線程池常用問題
了解JDK Executors線程池嗎?
知道JDK提供了哪些默認的實現嗎?
看過阿里巴巴java開發手冊嗎?知道為啥不允許使用默認的實現嗎?
你們沒有用默認的吧?那來介紹一下你們自定義線程池的幾個常用參數唄?
你這個幾個參數的值是怎么得來的呀?算出來的?怎么算出來的?
線程池里面的任務是IO密集型的還是計算密集型的呢?
好,現在我們有一個自定義線程池了,來說一下你這個線程池的工作流程唄?
那你這個線程池滿了怎么辦呀?拒絕?咋拒絕?有哪些拒絕策略呢?
別緊張,隨便說兩個就行。
......
回到開始說的阿里巴巴java開發手冊不允許使用默認實現,你回答說可能會引起OOM,那我們聊聊JVM吧
不允許使用的原因
線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。 說明:Executors各個方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要問題是線程數最大數是Integer.MAX_VALUE,可能會創建數量非常多的線程,甚至OOM。
測試流程
測試用例
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; public class ExecutorsTest { public static void main(String[] args) throws Exception { ThreadPoolTaskExecutor executor = init(); executor.execute(() -> sayHi("execute")); Thread.sleep(1000); executor.submit(() -> sayHi("submit")); } public static void sayHi(String name) { String printStr = "thread-name:" + Thread.currentThread().getName() + ",執行方式:" + name; System.out.println(printStr); throw new RuntimeException(printStr + " error!!!"); } private static ThreadPoolTaskExecutor init() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setThreadNamePrefix("thread_"); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(1000); executor.setKeepAliveSeconds(30); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize();; return executor; } }
拋出堆棧異常為啥對了一半?
從執行結果我們看出
當執行方式是execute時,可以看到堆棧異常的輸出。
當執行方式是submit時,堆棧異常沒有輸出。
怎么拿到submit的異常堆棧?
所以,現在知道為什么回答:拋出堆棧異常只對了一半吧。
execute方法執行時,會拋出(打印)堆棧異常。
submit方法執行時,返回結果封裝在future中,如果調用future.get()方法則必須進行異常捕獲,從而可以拋出(打印)堆棧異常。
你以為這一部分寫到這里就完事了?那不行啊,你心里沒有一個疑問嗎?為啥execute直接拋出異常,submit沒有直接拋出異常呢?
源碼查看
執行executes方法時
在java.util.concurrent.ThreadPoolExecutor#runWorker中拋出了異常:
在_java.lang.ThreadGroup#uncaughtException_進行了異常處理:
這個uncaughtException是何許人也,看java doc上咋說的:
這個方法是JVM調用的,我們只需要指定我們想要的處理方式即可。
那我們怎么指定呢:
//直接new Thread()的時候 Thread t=newThread(); t.setUncaughtExceptionHandler(newThread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e){ //根據業務場景,做你想做的 } }); //線程池的時候 ExecutorService threadPool = Executors.newFixedThreadPool(1, thread -> { Thread t =newThread(thread); t.setUncaughtExceptionHandler((t1, e) -> System.out.println("根據業務場景,做你想做的:"+ e.getMessage()));return;} );
執行submit方法時
其本質也是調用了execute方法,所以它還是回到_java.util.concurrent.ThreadPoolExecutor#runWorker_方法:
向前,繼續跟進去看看:
_java.util.concurrent.FutureTask#setException_干啥了啊,瞅一眼:
我們馬上走向最終的真相:
好了,第一個議題【拋出堆棧異常為啥對了一半?】討論完畢。在源碼里面走了一趟,現在我們可以給出這一部分的滿分答案了。
不影響其他線程任務
這一部分我們直接上代碼,運行起來看結果吧:
代碼和運行結果是不會騙人的:
線程池中一個線程異常了后,不影響其他線程任務
大家注意線程名稱這個細節:1,2,3,4,6。魔鬼都在細節里啊,這個點我下面會講,先在這里把問題拋出來:我就納悶了,怎么沒有5啊?!
這個線程會被放回線程池為啥錯了
5號線程去哪里了?
new Worker()方法會告訴你:5去哪里了。
再配上這張由我這個靈魂畫師親自操刀畫的圖,一起食用,味道更佳:
現在知道為啥:我回答這個線程會被放回線程池為啥全錯了吧。還附帶送你一個線程名稱變化的細節。
結論
當一個線程池里面的線程異常后:
1、當執行方式是execute時,可以看到堆棧異常的輸出
原因:ThreadPoolExecutor.runWorker()方法中,task.run(),即執行我們的方法,如果異常的話會throw x;所以可以看到異常。
2、當執行方式是submit時,堆棧異常沒有輸出。但是調用Future.get()方法時,可以捕獲到異常
原因:ThreadPoolExecutor.runWorker()方法中,task.run(),其實還會繼續執行FutureTask.run()方法,再在此方法中c.call()調用我們的方法,
如果報錯是setException(),並沒有拋出異常。當我們去get()時,會將異常拋出。
3、不會影響線程池里面其他線程的正常執行
4、線程池會把這個線程移除掉,並創建一個新的線程放到線程池中
當線程異常,會調用ThreadPoolExecutor.runWorker()方法最后面的finally中的processWorkerExit(),會將此線程remove,並重新addworker()一個線程。
源碼執行流程
execute源碼執行流程
1、開始執行任務,新增或者獲取一個線程去執行任務(比如剛開始是新增coreThread去執行任務)。執行到task.run()時會去執行提交的任務。
如果任務執行失敗,或throw x拋出異常。
2、之后會到finally中的afterExecute()擴展方法,我們可以擴展該方法對異常做些什么。
3、之后因為線程執行異常會跳出runWorker的外層循環,進入到processWorkerExit()方法,此方法會將執行任務失敗的線程刪除,並新增一個線程。
4、之后會到ThreadGroup#uncaughtException方法,進行異常處理。
如果沒有通過setUncaughtExceptionHandler()方法設置默認的UncaughtExceptionHandler,就會在uncaughtException()方法中打印出異常信息。
submit源碼執行流程
1、將傳進來的任務封裝成FutureTask,同樣走execute的方法調用,然后直接返回FutureTask。
2、開始執行任務,新增或者獲取一個線程去執行任務(比如剛開始是新增coreThread去執行任務)。
3、執行到task.run()時,因為是FutureTask,所以會去調用FutureTask.run()。
4、在FutureTask.run()中,c.call()執行提交的任務。如果拋出異常,並不會throw x,而是setException()保存異常。
5、當我們阻塞獲取submit()方法結果時get(),才會將異常信息拋出。當然因為runWorker()沒有拋出異常,所以並不會刪除線程。