什么時候需要用線程池?
答:線程的創建比較昂貴(創建線程需要系統資源,頻繁創建和銷毀消耗大量時間,導致性能問題);短平快的任務(接收大量小任務)進行分發處理使用線程池而不是一個任務對應一個新線程。
線程池的使用需要注意哪些?
1. 需要手動聲明線程池
《阿里巴巴Java開發手冊》中講,禁止使用快捷的工具方法創建線程池,而應手動new ThreadPoolExecutor來創建線程池。
注意:newFixedThreadPool和newCachedThreadPool,可能因為資源耗盡導致OOM問題。
2.注意線程池幾個關鍵參數,創建合適的線程池
1. corePoolSize:線程池核心線程大小 2. maximumPoolSize: 線程池最大線程數量 3. keepAliveTime: 空閑線程存活時間 4. unit:空閑線程存活時間單位 5. workQueue: 工作隊列 (1.ArrayBlocking 2.LinkedBlockingQueue 3.SynchronizedQueue 4.PriorityBlockingQueue) 6. ThreadFactory:線程工廠 7. Handler 拒絕策略
3. 熟悉線程池默認工作行為,為線程池設置合適的初始化參數
線程池默認工作行為:
1. 線程池不會初始化corePoolSize個線程,有任務來了才創建工作線程; 2. 當核心線程滿了之后不會立即擴容線程池,而是把任務堆積到工作隊列中; 3. 當工作隊列滿了后擴容線程池,一直到線程個數達到maximumPoolSize為止; 4. 如果隊列已滿且達到了最大線程后還有任務進來,按照拒絕策略處理; 5. 當線程數大於核心線程數時,線程等待keepAliveTime后還是沒有任務需要處理的話,收縮線程到核心線程數。
4. 根據實際容量規划需求,通過一些手段改變默認工作行為
1. 聲明線程池后立即調用prestartAllCoreThreads方法,來啟動所有核心線程池; 2. 傳入true給allCoreThreadTimeOut方法,來讓線程池在空閑的時候同樣回收核心線程。 3. 方法: - Java線程池先用工作隊列存放來不及處理的任務,滿了后再擴容。 - 當工作隊列設置很大時,最大線程數這個參數顯得無意義,可以優先開啟更多線程,而把隊列當成一個后備方案。 4. 思路: - 重寫隊列offer方法,造成這個隊列已滿的假象; - 這樣達到最大線程后會觸發拒絕策略,實現一個自定義的拒絕策略處理程序; - 再把任務真正插入隊列。Tomcat線程池也實現了類似的效果(點擊查看)。
5. 注意線程池的混用
要根據任務的‘輕重緩急’來指定線程池的核心參數,包括線程數、回收策略和任務隊列 - 對於執行比較慢、數量不大的IO任務,或許要考慮更多的線程數,而不需要太大的隊列 - 對於吞吐量較大的計算型任務,線程數量不宜過多,可以是CPU核數*2(CPU 密集型任務,過多的線程只會增加線程切換的開銷),但可能需要較長的隊列來做緩沖。 - Java 8 的 parallel stream 功能,可以讓我們很方便地並行處理集合中的元素,其背后是共享同一個 ForkJoinPool,默認並行度是 CPU 核數 -1。 - 對於 CPU 綁定的任務來說,使用這樣的配置比較合適,但如果集合操作涉及同步 IO 操作的話(比如數據庫操作、外部服務調用等),建議自定義一個 ForkJoinPool(或普通線程池)。
6. 確保線程池是在復用的
- 使用線程池目的是復用,每次new一個線程池會很糟糕; - 如果從類庫來獲取一個線程池(標准/自定義類庫),請務必查看源碼,以確認線程實例化方式和配置符合預期。 - 復用線程池,要根據任務的性質來選用不同的線程池;
- IO綁定任務與CPU綁定任務,如果希望減少任務間干擾,需要隔離線程池。
示例:
//案例:線上遇到的一個事故:當天直播,我們調用一個外部服務去發送短信,發送短信接口正常時可以在100ms內響應,TPS100的發送量,CachedThreadPool能穩定在占用10個左右線程的情況下滿足需求。 //在某個時間段,外部短信服務不可用了(貌似是短信平台調整敏感詞),我們調用這個服務的超時又特別長,比如1分鍾,直播前5分鍾要發送短信,而1分鍾產生 6000 個發送短信的任務,需要6000個線程, //沒多久因為無法創建線程導致了OOM,整個應用程序崩潰。 //我們知道線程是需要分配一定的內存空間作為線程棧的,比如1MB,因此無限制創建線程必然會導致OOM
//從源碼中可以看到,newCachedThreadPool這種線程池的最大線程數是Integer.MAX_VALUE,可以認為是無上限的,而其工作隊列SynchronousQueue是一個沒有存儲空間的阻塞隊列 //這就以為着,只要有請求到來,就必須找到一條工作線程來處理,如果當前沒有空閑的線程就再創建一條新的。 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } //這里的核心線程數是0,keepAliveTime是60秒,在60秒之后所有的線程都是可以回收的。 //所以這就會出現如果業務操作並發量較大,可能一下子開啟幾千個線程而又不會撐爆內存。下邊案例就是。
//案例:某項目生產環境時不時有報警提示線程數過多,超過2000個,收到報警后查看監控發現,瞬時線程數比較多但過一會兒又降下來,線程數抖動很力換,而應用的訪問量變化不大。
//wrong:每次都創建一個線程池 class ThreadPoolHelper { public static ThreadPoolExecutor getThreadPool() { //線程池沒有復用 return (ThreadPoolExecutor) Executors.newCachedThreadPool(); } } //right:使用靜態字段來存放線程池的引用 class ThreadPoolHelper { private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get()); public static ThreadPoolExecutor getRightThreadPool() { return threadPoolExecutor; } }
監控
//簡單的打印線程池基本內部信息,包括線程數、活躍線程數、完成了多少任務,以及隊列中還有多少積壓任務等信息: private void printStats(ThreadPoolExecutor threadPool) { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { log.info("========================="); log.info("Pool Size: {}", threadPool.getPoolSize()); log.info("Active Threads: {}", threadPool.getActiveCount()); log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount()); log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size()); log.info("========================="); }, 0, 1, TimeUnit.SECONDS); }