作為一個牛逼的程序員,相信大家肯定是接觸過多線程的概念的。並且可能會在實際的工作中因為一些業務場景需要使用自定義線程池來執行批量的任務或對線程進行管理。同樣,我們項目中也存在一個兩個場景需要使用線程池。而這兩個場景分別為:
1、持續監聽某個外部接口的不間斷的返回信息,其實就是長鏈接的阻塞接口,總共12種資源需要監聽,所以就意味需要12個不間斷的線程執行阻塞任務。
2、RabbitMQ的消費者,因為需要應用啟動的時候就執行消息的消費,所以也通過線程池中獲取線程執行消費任務。
一、先看線程池的定義
public class ThreadPoolUtil {
private static Logger logger = LoggerFactory.getLogger(ThreadPoolUtil.class);
private static volatile ThreadPoolExecutor threadPoolExecutor = null;
/**
* 創建執行
* @return
*/
private static AtomicInteger nextId = new AtomicInteger(0);
public static ThreadPoolExecutor createExecutor(){
if (threadPoolExecutor != null){
return threadPoolExecutor;
}
synchronized (ThreadPoolUtil.class){
if (threadPoolExecutor != null){
return threadPoolExecutor;
}
int corePoolSize = 16;
int maxPoolSize = 16;
int keepAliveSeconds = 60;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue(500);
RejectedExecutionHandler rejectedExecutionHandler = (r, executor) -> logger.error("隊列已經滿了,總任務{},直接拒絕吧", executor.getTaskCount());
threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
keepAliveSeconds, TimeUnit.SECONDS, queue, r -> {
String fileName = Thread.currentThread().getStackTrace()[5].getFileName();
String threadName = fileName.substring(0,fileName.indexOf("."))+"-";
Thread thread = new Thread(r, threadName+nextId.incrementAndGet());
return thread;
}, rejectedExecutionHandler);
}
return threadPoolExecutor;
}
看看上面的線程池設計,好像是沒有啥問題的。如果是放在普通的可終結的任務使用當前線程池,理論上是沒有太大問題。但是!我們的應用剛好這幾個任務都是阻塞的。阻塞就意味着線程是無法回收的,其他的任務使用這個線程池之后,就只能先放到隊列中,然后一直得不到釋放的線程資源執行。最終隊列積壓,任務被拋棄。
二、線上事故描述
因為在初始化的時候,已經將 12 個監聽都啟動了,並且使用的是當前線程池構造工具。啟動完成之后,12個核心線程就一直被阻塞占用。這12個資源的監聽還是比較正常的,並且能夠對監聽數據進行處理和執行。
因為需要MQ消費端啟動的時候就可以執行消費,所以在啟動的時候,設置了啟動配置類中調用上述工具創建線程池,相當於用新的線程執行消息監聽的動作。然而MQ卻遲遲不見消費的過程,導致消息隊列一直積壓。並且無法完成正確的數據處理。
三、問題猜測及理論支撐
猜測:沒有被消費,應該就是我們的線程池中沒有空閑的線程進行消息監聽處理。初始化的時候的消費監聽的任務被直接丟棄到了線程池的任務隊列中,而這個線程池的任務隊列中數數據只有在兩種情況下才可能被執行。
第一種:線程池中有空閑的線程,可以進行執行
第二種:消息隊列滿了,開啟了加大了線程池的線程數以便執行堆積的任務
而我們的這個一步開啟MQ消費監聽的任務被發送到線程池的時候,因為核心線程數就是 12 ,而我們前面的資源監聽接口已經開啟了12個阻塞任務,所以就沒有了可用線程。所以被存放到了線程池待執行任務隊列中。可怕的是,我們這個線程池的隊列大小為500 ,很顯然 1 < 500 ,所以就無法觸發線程加大的動作,導致這個隊列的任務“被遺忘”。
理論支撐:
線程池的核心參數包括: coreSize , maxSize, quauaSize,RejectedExecutionHandler
分別為:核心線程數,最大線程數,可積壓的任務數,拒絕策略
當創建線程的時候,首先會先創建核心線程數等量的線程,比如上面就是 12個核心線程, 而當我們的核心線程都在執行階段的時候,再次加入的任務就會被存放到任務隊列中。當任務不斷的增加並且幅度遠遠大於核心線程的處理速度,導致任務隊列存放到最大值,比如上面的500,那么就需要增加線程數,此時就是需要增加線程數到最大值,比如上面的16,然而,增大了之后,發現已然不能處理消化任務的投放數量,這個時候就用不同的處理策略,比如上面的 rejectedExecutionHandler 就是直接丟棄。
猜測和理論匹配一下的話就是:核心線程是12 ,這12個線程被資源監聽的阻塞任務占用無法釋放,而開啟消費監聽的任務被丟到了待執行的任務隊列中,此時,任務隊列又不滿足益處的條件,所以就沒有增加新的線程來處理,以至於,這個創建消費監聽的任務就“被遺忘”了。
如何進行論證呢?使用如下測試代碼
public static void main(String[] args) {
ThreadPoolExecutor executor = createExecutor();
// 臨界值 分別設置12 16 512 518
for (int i =0; i < $臨界值;i++){
int finalI = i;
executor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("當前任務序號為:"finalI +" ,活躍線程數"+ executor.getActiveCount());
Thread.sleep(10000*1000); // 這就當作是持久的任務在執行
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
測試結果:
臨界值為 12, 核心線程剛好夠用
臨界值為 16 , 雖然任務數大於了核心線程,但是並沒有新建線程數。所以驗證任務被放到了隊列中,先使用隊列存放,隊列滿了再開新線程

臨界值為 512 任務數量大於 核心線程數 所以新任務放到了隊列中,且剛好不會有超出,不觸發新的線程創建

臨界值為 516 任務數量大於 ( 核心線程數 + 隊列大小 ) 所有活躍線程被加到最大

臨界值為 518, 任務數量大於 ( 隊列大小 + 最大線程數) 所有產生丟棄

四、如何解決當前事故
出現這個問題之后,我們直接就增加了核心線程的數量,以保證整體大於在阻塞任務的數量。比如我們這個就是重新設置為核心線程數量 16 > 12,
同時,我們將阻塞任務同非阻塞任務所創建的線程池進行隔離,以減少共用線程池造成的 正常任務被遺忘的可能性。
五、如何設置你的線程池大小
那么在開發中,如何設置i線程池的大小?其實這沒有特定的規范,需要結合自己任務的執行時間而考慮,
但是最好提前考慮好,任務是否為阻塞性任務,如果是的話,建議做好線程隔離。
在我們一般將核心線程設置為 n + 1 (n 為內核數量)
最大線程數量設置 2n + 1 (n 為內核數量)
