一、ThreadPoolExecutor 參數說明
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize:核心線程池的大小。當提交一個任務到線程池時,核心線程池會創建一個核心線程來執行任務,即使其他核心線程能夠執行新任務也會創建線程,等到需要執行的任務數大於核心線程池基本大小時就不再創建。如果調用了線程池的 prestartAllCoreThreads() 方法,核心線程池會提前創建並啟動所有核心線程。
-
workQueue:任務隊列。當核心線程池中沒有線程時,所提交的任務會被暫存在隊列中。Java 提供了多種阻塞隊列。
-
maximumPoolSize:線程池允許創建的最大線程數。如果隊列也滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的空閑線程執行任務。值得注意的是,如果使用了無界的任務隊列則這個參數不起作用。
-
keepAliveTime:當線程池中的線程數大於 corePoolSize 時,keepAliveTime 為多余的空閑線程等待新任務的最長時間,超過這個時間后多余的線程將被終止。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高線程的利用率。值得注意的是,如果使用了無界的任務隊列則這個參數不起作用。
-
TimeUnit:線程活動保持時間的單位。
-
threadFactory:創建線程的工廠。可以通過線程工廠給每個創建出來的線程設置符合業務的名字。
// 依賴 guava new ThreadFactoryBuilder().setNameFormat("xx-task-%d").build(); -
handler:飽和策略。當隊列和線程池都滿了,說明線程池處於飽和狀態,那么必須采取一種策略處理提交的新任務。Java 提供了以下4種策略:
- AbortPolicy:默認。直接拋出異常。
- CallerRunsPolicy:只用調用者所在線程來運行任務。
- DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務。
- DiscardPolicy:不處理,丟棄掉。
tips: 一般我們稱核心線程池中的線程為核心線程,這部分線程不會被回收;超過任務隊列后,創建的線程為空閑線程,這部分線程會被回收(回收時間即 keepAliveTime)
二、常見的 ThreadPoolExecutor 介紹
Executors 是創建 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的工廠類。
Java 提供了多種類型的 ThreadPoolExecutor,比較常見的有 FixedThreadPool、SingleThreadExecutor、CachedThreadPool等。
FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool 被稱為可重用固定線程數的線程池。可以看到 corePoolSize 和 maximumPoolSize 都被設置成了 nThreads;keepAliveTime設置為0L,意味着多余的空閑線程會被立即終止;使用了阻塞隊列 LinkedBlockingQueue 作為線程的工作隊列(隊列的容量為 Integer.MAX_VALUE)。
FixedThreadPool 所存在的問題是,由於隊列的容量為 Integer.MAX_VALUE,基本可以認為是無界的,所以 maximumPoolSize 和 keepAliveTime 參數都不會生效,飽和拒絕策略也不會執行,會造成任務大量堆積在阻塞隊列中。
FixedThreadPool 適用於為了滿足資源管理的需求,而需要限制線程數量的應用場景。
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor 是使用單個線程的線程池。可以看到 corePoolSize 和 maximumPoolSize 被設置為1,其他參數與 FixedThreadPool 相同,所以所帶來的風險也和 FixedThreadPool 一致,就不贅述了。
SingleThreadExecutor 適用於需要保證順序的執行各個任務。
CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool 是一個會根據需要創建新線程的線程池。可以看到 corePoolSize 被設置為 0,所以創建的線程都為空閑線程;maximumPoolSize 被設置為 Integer.MAX_VALUE(基本可認為無界),意味着可以創建無限數量的空閑線程;keepAliveTime 設置為60L,意味着空閑線程等待新任務的最長時間為60秒;使用沒有容量的 SynchronousQueue 作為線程池的工作隊列。
CachedThreadPool 所存在的問題是, 如果主線程提交任務的速度高於maximumPool 中線程處理任務的速度時,CachedThreadPool 會不斷創建新線程。極端情況下,CachedThreadPool會因為創建過多線程而耗盡CPU和內存資源。
CachedThreadPool 適用於執行很多的短期異步任務的小程序,或者是負載較輕的服務器。
三、自建 ThreadPoolExecutor 線程池
鑒於上面提到的風險,我們更提倡使用 ThreadPoolExecutor 去創建線程池,而不用 Executors 工廠去創建。
以下是一個 ThreadPoolExecutor 創建線程池的 Demo 實例:
public class Pool {
static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("pool-task-%d").build();
static ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,
200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
threadFactory, new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 無返回值的任務執行 -> Runnable
executor.execute(() -> System.out.println("Hello World"));
// 2. 有返回值的任務執行 -> Callable
Future<String> future = executor.submit(() -> "Hello World");
// get 方法會阻塞線程執行等待返回結果
String result = future.get();
System.out.println(result);
// 3. 監控線程池
monitor();
// 4. 關閉線程池
shutdownAndAwaitTermination();
monitor();
}
private static void monitor() {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
System.out.println("【線程池任務】線程池中曾經創建過的最大線程數:" + threadPoolExecutor.getLargestPoolSize());
System.out.println("【線程池任務】線程池中線程數:" + threadPoolExecutor.getPoolSize());
System.out.println("【線程池任務】線程池中活動的線程數:" + threadPoolExecutor.getActiveCount());
System.out.println("【線程池任務】隊列中等待執行的任務數:" + threadPoolExecutor.getQueue().size());
System.out.println("【線程池任務】線程池已執行完任務數:" + threadPoolExecutor.getCompletedTaskCount());
}
/**
* 關閉線程池
* 1. shutdown、shutdownNow 的原理都是遍歷線程池中的工作線程,然后逐個調用線程的 interrupt 方法來中斷線程。
* 2. shutdownNow:將線程池的狀態設置成 STOP,然后嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表。
* 3. shutdown:將線程池的狀態設置成 SHUTDOWN 狀態,然后中斷所有沒有正在執行任務的線程。
*/
private static void shutdownAndAwaitTermination() {
// 禁止提交新任務
executor.shutdown();
try {
// 等待現有任務終止
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// 取消當前正在執行的任務
executor.shutdownNow();
// 等待一段時間讓任務響應被取消
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
// 如果當前線程也中斷,則取消
executor.shutdownNow();
// 保留中斷狀態
Thread.currentThread().interrupt();
}
}
}
創建線程池需要注意以下幾點:
- CPU 密集型任務應配置盡可能小的線程,如配置 Ncpu+1 個線程。
- IO 密集型任務(數據庫讀寫等)應配置盡可能多的線程,如配置 Ncpu*2 個線程。
- 優先級不同的任務可以使用優先級隊列 PriorityBlockingQueue 來處理。
- 建議使用有界隊列。可以避免創建數量非常多的線程,甚至拖垮系統。有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點兒,比如幾千。
四、ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor。它主要用來在給定的延遲之后運行任務,或者定期執行任務。
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
}
ScheduledThreadPoolExecutor 的功能與 Timer 類似,但功能更強大、更靈活。Timer 對應的是單個后台線程,而ScheduledThreadPoolExecutor 可以在構造函數中指定多個對應的后台線程數。
Java 提供了多種類型的 ScheduledThreadPoolExecutor ,可以通過 Executors 創建,比較常見的有 ScheduledThreadPool、SingleThreadScheduledExecutor 等。適用於需要多個后台線程執行周期任務,同時為了滿足資源管理的需求而需要限制后台線程數量的應用場景。
public class ScheduleTaskTest {
static ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").build();
static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5, threadFactory);
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 延遲 3 秒后執行 Runnable 方法
scheduledExecutorService.schedule(() -> System.out.println("Hello World"), 3000, TimeUnit.MILLISECONDS);
// 2. 延遲 3 秒后執行 Callable 方法
ScheduledFuture<String> scheduledFuture = scheduledExecutorService.schedule(() -> "Hello ScheduledFuture", 3000, TimeUnit.MILLISECONDS);
System.out.println(scheduledFuture.get());
// 3. 延遲 1 秒后開始每隔 3 秒周期執行。
// 如果中間任務遇到異常,則禁止后續執行。
// 固定的頻率來執行某項任務,它不受任務執行時間的影響。到時間,就執行。
scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println("Hello ScheduleAtFixedRate"), 1, 3000, TimeUnit.MILLISECONDS);
// 4. 延遲 1 秒后,每個任務結束延遲 3 秒后再執行下個任務。
// 如果中間任務遇到異常,則禁止后續執行。
// 受任務執行時間的影響,等待任務執行結束后才開始計算延遲。
scheduledExecutorService.scheduleWithFixedDelay(() -> System.out.println("Hello ScheduleWithFixedDelay"), 1, 3000, TimeUnit.MILLISECONDS);
}
}
ScheduledThreadPoolExecutor 的執行步驟大抵如下:
- 當調用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者 scheduleWithFixedDelay()方 法時,會向 DelayedWorkQueue 隊列添加 ScheduledFutureTask 任務。
- 線程池中的線程從 DelayedWorkQueue隊列中獲取執行時間已到達的 ScheduledFutureTask,然后執行任務。
- 線程修改 ScheduledFutureTask 任務的執行時間為下次將要被執行的時間。
- 線程把修改后的 ScheduledFutureTask 重新放回隊列。
