線程池異常處理之重啟線程處理任務
本文記錄一下在使用線程池過程中,如何處理 while(true)
循環長期運行的任務,在業務處理邏輯中,如果拋出了運行時異常時怎樣重新提交任務。
這種情形在Kafka消費者中遇到,當為每個Consumer開啟一個線程時, 在線程的run方法中會有while(true)
循環中消費Topic數據。
本文會借助Google Guava包中的com.google.common.util.concurrent.ThreadFactoryBuilder
類創建線程工廠,因為它能不僅很方便地為線程池設置一個易讀的名稱,而且很方便地設置線程執行過程中出現異常時 用來處理異常的 異常處理器,示例如下:
MyExceptionHandler exceptionHandler = new MyExceptionHandler();
//設置線程名稱
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d")
//設置異常處理器
.setUncaughtExceptionHandler(exceptionHandler).build();
當線程執行過程中出現了異常,MyExceptionHandler#uncaughtException(...)
方法就會由JVM調用。在java.lang.ThreadGroup#uncaughtException
方法注釋提到:由於每個線程都隸屬於某個線程組,如果該線程所屬的線程組有父線程組,則調用父線程組中指定的異常處理器;若沒有父線程組,則判斷 有沒有 為線程自定義 異常處理器,而在本文中,定義了自己的異常處理器:MyExceptionHandler
,因此線程執行異常時就會調用MyExceptionHandler#uncaughtException(...)
創建好了線程工廠,接下來就是創建線程池了。
CustomThreadPoolExecutor threadPoolExecutor = new CustomThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue,threadFactory);
CustomThreadPoolExecutor 繼承ThreadPoolExecutor
擴展線程池的功能:若線程執行某任務失敗時 需要重新提交該任務,可以重寫CustomThreadPoolExecutor#afterExecute
方法,在該方法中實現提交任務。
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
//若線程執行某任務失敗了,重新提交該任務
if (t != null) {
Runnable task = r;
System.out.println("restart task...");
execute(task);
}
}
}
如果在new ThreadPoolExecutor時未傳入 ThreadFactory參數,如下:
BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue);
其實是調用Executors.defaultThreadFactory()
創建默認的ThreadFactory:
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
它為每個創建的線程設置了名字:"pool-xxx-thread-xxx"。而采用默認的ThreadFactory時相應的默認的異常處理器執行邏輯是由java.lang.ThreadGroup#uncaughtException
方法來處理的,其中處理異常的相關源碼如下:
else if (!(e instanceof ThreadDeath)) {
System.err.print("Exception in thread \""
+ t.getName() + "\" ");
e.printStackTrace(System.err);
}
如果線程執行過程中拋出的錯誤 不是 ThreadDeath對象,那么只是簡單地:打印線程名稱,並將堆棧信息記錄到控制台中,任務結束。如果是一個ThreadDeath對象,看ThreadDeath類的源碼注釋可知:異常處理器不會被調用,程序不會輸出任何日志信息。(有木有碰到這種情況,線程池中的線程不知不覺地消失了……)
The ThreadGroup#uncaughtException top-level error handler does not print out a message if ThreadDeath is never caught.
在本文的示例程序CustomThreadPoolExecutorTest.java中,為了模擬在while(true)
循環中拋出異常,定義一個 Boolean 變量 stop 使得線程執行一段時間拋出一個異常:也即先讓test線程運行一段時間,然后主線程設置 stop 變量的值,使得test線程拋出運行時異常。(完整代碼可參考文末)
if (stop) {
throw new RuntimeException("running encounter exception");
}
線程池提交 while(true)
循環任務:
threadPoolExecutor.execute(()->{
//提交的是一個while(true)任務,正常運行時這類任務不會結束
while (true) {
System.out.println("start processing");
try {
//模擬任務每次執行耗時1000ms
Thread.sleep(1000);
} catch (InterruptedException e) {
//ignore
}
System.out.println("finish processing");
if (stop) {
throw new RuntimeException("running encounter exception");
}
}
});
threadPoolExecutor.execute
提交了一個任務,這會耗費一個線程來執行該任務,由於任務是個while(true)
循環,正常情況下該任務不會終止。換句話說,這個任務會"永久"占用線程池中的一個線程。因此,對於while(true)
循環的任務需要注意:
創建線程池new ThreadPoolExecutor(...)
時,指定的 corePoolSize 不能小於 需要提交的任務個數,否則有些任務不能立即啟動,線程池需要增加線程(最大增加到maximumPoolSize 個線程)來處理。如果 maximumPoolSize 小於需要提交的任務個數,由於每個任務永久地占用一個線程執行,那么有些任務就只能一直堆積在taskQueue 任務隊列中了
而在本示例中,main 線程通過設置 stop 變量讓 test 線程拋出異常,自定義的異常處理器MyExceptionHandler就會處理該異常,並且在該任務執行“完成”后,JVM會調用線程池的afterExecute(...)方法,又重新提交該任務。
總結
這篇文章總結了本人在使用JAVA線程池中的一些理解,寫代碼以線程池方式提交任務,程序跑一段時間,沒有數據輸出了,好像暫停了,看堆棧信息線程莫名其妙地消失了,或者阻塞在任務隊列上拿不到Task了……因此需要明白線程池底層執行的機制。
- 在實現Kafka消費者過程中,每個消費者一個線程,使用線程池來管理線程、提交任務。但總過一段時間后Kafka Broker Rebalance,看后台日志是Kafka Consumer在解析一些消息時拋出了運行時異常。這樣線程池就結束了這個任務,由於沒有重寫
afterExecute()
方法 當任務出現異常時重新提交任務。因此,這意味着永久丟失了一個消費者線程。而少了一個消費者,Kafka就發生了Rebalance。 - 盡量使用線程池來管理線程,而不是自己 new Thread(),一方面是采用線程池可方便地為每個線程設置合理的名稱,這樣便於debug。另一方面,通過
implements Thread.UncaughtExceptionHandler
自定義線程運行時異常處理器,可方便地打印出線程異常日志。 - 可繼承
ThreadPoolExecutor
擴展線程池功能,比如在任務執行完成后,執行一些額外的操作。關於如何擴展線程池,ElasticSearch源碼中線程池模塊很值得借鑒。 - 上文中提到的異常處理器 和 向線程池提交任務的拒絕策略
RejectedExecutionHandler
是兩回事。另外,為了圖方便,直接在main方法中創建線程池了,實際應用中肯定不能這樣。這里給出的代碼只是Examples。
最后給出一個思考問題:針對需要長期運行的任務,比如每隔一段時間從Redis中讀取若干條數據。是提交一個Runnable任務,這個Runnable任務里是個while(true)
循環讀取數據:
executor.execute(()->{
while (true) {
//讀若干條數據
read();
sleep(1000);
}
});
還是:在一個外部while循環中,不斷地向 taskQueue 任務隊列中提交任務呢?
while (true) {
executor.execute(()->{
read();
});
sleep(1000);
}
CustomThreadPoolExecutorTest.java 完整代碼:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public class CustomThreadPoolExecutorTest {
private static volatile boolean stop = false;
public static void main(String[] args)throws InterruptedException {
BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
//定義 線程執行過程中出現異常時的 異常處理器
MyExceptionHandler exceptionHandler = new MyExceptionHandler();
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d")
.setUncaughtExceptionHandler(exceptionHandler).build();
CustomThreadPoolExecutor threadPoolExecutor = new CustomThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue,threadFactory);
threadPoolExecutor.execute(()->{
//提交的是一個while(true)任務,正常運行時這類任務不會結束
while (true) {
System.out.println("start processing");
try {
//模擬任務每次執行耗時1000ms
Thread.sleep(1000);
} catch (InterruptedException e) {
//ignore
}
System.out.println("finish processing");
if (stop) {
throw new RuntimeException("running encounter exception");
}
}
});
Thread.sleep(2000);
//模擬 test- 線程 在執行任務過程中拋出異常
stop = true;
Thread.sleep(1000);
stop = false;
}
private static class MyExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println(String.format("thread name %s, msg %s", t.getName(), e.getMessage()));
}
}
}
ThreadPoolExecutorTest.java 測試線程在執行過程中拋出ThreadDeath對象:
import java.util.concurrent.*;
public class ThreadPoolExecutorTest {
private static volatile boolean stop = false;
public static void main(String[] args) throws InterruptedException{
BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.HOURS, taskQueue);
executor.execute(()->{
while (true) {
System.out.println("start processing");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//ignore
}
System.out.println("finish processing");
if (stop) {
throw new ThreadDeath();
// throw new RuntimeException("runtime exception");
}
}
});
Thread.sleep(3000);
stop = true;
Thread.sleep(2000);
executor.execute(()->{
//能夠繼續提交任務執行
System.out.println("continue submit runnable task,is All thread in thread pool dead?");
});
}
}