線程池拒絕提交任務的2種情況
- 調用 shutdown 等方法關閉線程池后,即便此時可能線程池內部依然有沒執行完的任務正在執行,但是由於線程池已經關閉,此時如果再向線程池內提交任務,就會遭到拒絕
- 線程池沒有能力繼續處理新提交的任務,也就是工作已經非常飽和的時候
java 在 ThreadPoolExecutor 類中為我們提供了 4 種默認的拒絕策略來應對不同的場景,都實現了 RejectedExecutionHandler 接口,如圖所示:
4種拒絕策略
- 第一種拒絕策略是 AbortPolicy,這種拒絕策略在拒絕任務時,會直接拋出一個類型為 RejectedExecutionException 的 RuntimeException,讓你感知到任務被拒絕了,於是你便可以根據業務邏輯選擇重試或者放棄提交等策略。
- 第二種拒絕策略是 DiscardPolicy,這種拒絕策略正如它的名字所描述的一樣,當新任務被提交后直接被丟棄掉,也不會給你任何的通知,相對而言存在一定的風險,因為我們提交的時候根本不知道這個任務會被丟棄,可能造成數據丟失。
- 第三種拒絕策略是 DiscardOldestPolicy,如果線程池沒被關閉且沒有能力執行,則會丟棄任務隊列中的頭結點,通常是存活時間最長的任務,這種策略與第二種不同之處在於它丟棄的不是最新提交的,而是隊列中存活時間最長的,這樣就可以騰出空間給新提交的任務,但同理它也存在一定的數據丟失風險。
- 第四種拒絕策略是 CallerRunsPolicy,相對而言它就比較完善了,當有新任務提交后,如果線程池沒被關閉且沒有能力執行,則把這個任務交於提交任務的線程執行,也就是誰提交任務,誰就負責執行任務。這樣做主要有兩點好處。
- 第一點新提交的任務不會被丟棄,這樣也就不會造成業務損失。
- 第二點好處是,由於誰提交任務誰就要負責執行任務,這樣提交任務的線程就得負責執行任務,而執行任務又是比較耗時的,在這段期間,提交任務的線程被占用,也就不會再提交新的任務,減緩了任務提交的速度,相當於是一個負反饋。在此期間,線程池中的線程也可以充分利用這段時間來執行掉一部分任務,騰出一定的空間,相當於是給了線程池一定的緩沖期。
常用線程池及使用隊列
線程池 | 隊列 | 創建方式 | 適用場景 |
---|---|---|---|
FixedThreadPool | LinkedBlockingQueue | ThreadPoolExecutor | |
SingleThreadExecutor | LinkedBlockingQueue | ThreadPoolExecutor | |
CachedThreadPool | SynchronousQueue | ThreadPoolExecutor | |
ScheduledThreadPoolExecutor | DelayedWorkQueue | ScheduledThreadPoolExecutor | |
SingleThreadScheduledExecutor | DelayedWorkQueue | ScheduledThreadPoolExecutor | |
ForkJoinPool |
FixedThreadPool
它的核心線程數和最大線程數是一樣的,所以可以把它看作是固定線程數的線程池,它的特點是線程池中的線程數除了初始階段需要從 0 開始增加外,之后的線程數量就是固定的,就算任務數超過線程數,線程池也不會再創建更多的線程來處理任務,而是會把超出線程處理能力的任務放到任務隊列中進行等待。而且就算任務隊列滿了,到了本該繼續增加線程數的時候,由於它的最大線程數和核心線程數是一樣的,所以也無法再增加新的線程了。
CachedThreadPool
可以稱作可緩存線程池,它的特點在於線程數是幾乎可以無限增加的(實際最大可以達到 Integer.MAX_VALUE,為 2^31-1,這個數非常大,所以基本不可能達到),而當線程閑置時還可以對線程進行回收。也就是說該線程池的線程數量不是固定不變的,當然它也有一個用於存儲提交任務的隊列,但這個隊列是 SynchronousQueue,隊列的容量為0,實際不存儲任何任務,它只負責對任務進行中轉和傳遞,所以效率比較高。
ScheduledThreadPool
支持定時或周期性執行任務。比如每隔 10 秒鍾執行一次任務,而實現這種功能的方法主要有 3 種,如代碼所示:
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.schedule(new Task(), 10, TimeUnit.SECONDS);
service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS);
service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);
3種方法的區別
- 第一種方法 schedule 比較簡單,表示延遲指定時間后執行一次任務,如果代碼中設置參數為 10 秒,也就是 10 秒后執行一次任務后就結束。
- 第二種方法 scheduleAtFixedRate 表示以固定的頻率執行任務,它的第二個參數 initialDelay 表示第一次延時時間,第三個參數 period 表示周期,也就是第一次延時后每次延時多長時間執行一次任務。
- 第三種方法 scheduleWithFixedDelay 與第二種方法類似,也是周期執行任務,區別在於對周期的定義,之前的 scheduleAtFixedRate 是以任務開始的時間為時間起點開始計時,時間到就開始執行第二次任務,而不管任務需要花多久執行;而 scheduleWithFixedDelay 方法以任務結束的時間為下一次循環的時間起點開始計時。
SingleThreadExecutor
使用唯一的線程去執行任務,原理和 FixedThreadPool 是一樣的,只不過這里線程只有一個,如果線程在執行任務的過程中發生異常,線程池也會重新創建一個線程來執行后續的任務。這種線程池由於只有一個線程,所以非常適合用於所有任務都需要按被提交的順序依次執行的場景,而前幾種線程池不一定能夠保障任務的執行順序等於被提交的順序,因為它們是多線程並行執行的。
SingleThreadScheduledExecutor
它實際和第三種 ScheduledThreadPool 線程池非常相似,它只是 ScheduledThreadPool 的一個特例,內部只有一個線程
總結上述的五種線程池,我們以核心線程數、最大線程數,以及線程存活時間三個維度進行對比,如表格所示。
第一個線程池 FixedThreadPool,它的核心線程數和最大線程數都是由構造函數直接傳參的,而且它們的值是相等的,所以最大線程數不會超過核心線程數,也就不需要考慮線程回收的問題,如果沒有任務可執行,線程仍會在線程池中存活並等待任務。
第二個線程池 CachedThreadPool 的核心線程數是 0,而它的最大線程數是 Integer 的最大值,線程數一般是達不到這么多的,所以如果任務特別多且耗時的話,CachedThreadPool 就會創建非常多的線程來應對。
ForkJoinPool
ForkJoinPool在 JDK 7 加入。與前面線程池區別是適合執行可以產生子任務的任務
如圖所示,我們有一個 Task,這個 Task 可以產生三個子任務,三個子任務並行執行完畢后將結果匯總給 Result,比如說主任務需要執行非常繁重的計算任務,我們就可以把計算拆分成三個部分,這三個部分是互不影響相互獨立的,這樣就可以利用 CPU 的多核優勢,並行計算,然后將結果進行匯總。這里面主要涉及兩個步驟,第一步是拆分也就是 Fork,第二步是匯總也就是 Join,到這里你應該已經了解到 ForkJoinPool 線程池名字的由來了
典型的斐波拉契數列
數列的特點就是后一項的結果等於前兩項的和,第 0 項是 0,第 1 項是 1,那么第 2 項就是 0+1=1,以此類推。我們通常通過遞歸方式實現
if (n <= 1) {
return n;
} else {
Fib f1 = new Fib(n - 1);
Fib f2 = new Fib(n - 2);
f1.solve();
f2.solve();
number = f1.number + f2.number;
return number;
}
計算流程如下
任務拆分結果
使用ForkJoinTask子類RecursiveTask實現
public class Fibonacci extends RecursiveTask<Integer> {
int n;
public Fibonacci(int n) {
this.n = n;
}
@Override
public Integer compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
f2.fork();
return f1.join() + f2.join();
}
}
public class ForkJoinPool01 {
public static void main(String[] args) throws Exception {
ForkJoinPool forkJoinPool = new ForkJoinPool();
for (int i = 0; i < 10; i++) {
ForkJoinTask<Integer> task = forkJoinPool.submit(new Fibonacci(i));
System.out.println(task.get());
}
}
}
結果
0
1
1
2
3
5
8
13
21
34
CountedCompleter
ForkJoinPool與其他線程池的不同
- ForkJoinPool采用分治策略
fork join
拆分任務然后合並結果 - 內部結構不同,其他線程池共用一個隊列,ForkJoinPool線程池每個線程有自己獨立的任務隊列
ForkJoinPool 線程池內部除了有一個共用的任務隊列之外,每個線程還有一個對應的雙端隊列 deque,這時一旦線程中的任務被 Fork 分裂了,分裂出來的子任務放入線程自己的 deque 里,而不是放入公共的任務隊列中。如果此時有三個子任務放入線程 t1 的 deque 隊列中,對於線程 t1 而言獲取任務的成本就降低了,可以直接在自己的任務隊列中獲取而不必去公共隊列中爭搶也不會發生阻塞(除了后面會講到的 steal 情況外),減少了線程間的競爭和切換,是非常高效的
我們再考慮一種情況,此時線程有多個,而線程 t1 的任務特別繁重,分裂了數十個子任務,但是 t0 此時卻無事可做,它自己的 deque 隊列為空,這時為了提高效率,t0 就會想辦法幫助 t1 執行任務,這就是“work-stealing”的含義。
雙端隊列 deque 中,線程 t1 獲取任務的邏輯是后進先出,也就是LIFO(Last In Frist Out),而線程 t0 在“steal”偷線程 t1 的 deque 中的任務的邏輯是先進先出,也就是FIFO(Fast In Frist Out),如圖所示,圖中很好的描述了兩個線程使用雙端隊列分別獲取任務的情景。你可以看到,使用 “work-stealing” 算法和雙端隊列很好地平衡了各線程的負載。
我們用一張全景圖來描述 ForkJoinPool 線程池的內部結構,你可以看到 ForkJoinPool 線程池和其他線程池很多地方都是一樣的,但重點區別在於它每個線程都有一個自己的雙端隊列來存儲分裂出來的子任務。ForkJoinPool 非常適合用於遞歸的場景,例如樹的遍歷、最優路徑搜索等場景
線程池的阻塞隊列
線程池內部結構由4部分組成
- 線程池管理器 : 負責管理線程池的創建、銷毀、添加任務等管理操作,它是整個線程池的管家。
- 工作線程 : 圖中的線程 t0~t9,這些線程勤勤懇懇地從任務隊列中獲取任務並執行。
- 任務隊列 : 作為一種緩沖機制,線程池會把當下沒有處理的任務放入任務隊列中,由於多線程同時從任務隊列中獲取任務是並發場景,此時就需要任務隊列滿足線程安全的要求,所以線程池中任務隊列采用 BlockingQueue 來保障線程安全
- 任務 : 任務要求實現統一的接口,以便工作線程可以處理和執行。
阻塞隊列
線程池 | 隊列 |
---|---|
FixedThreadPool | LinkedBlockingQueue |
SingleThreadExecutor | LinkedBlockingQueue |
CachedThreadPool | SynchrouousQueue |
ScheduleThreadPool | DelayedWorkQueue |
SingleThreadScheduledExecutor | DelayedWorkQueue |
LinkedBlockingQueue
對於 FixedThreadPool 和 SingleThreadExector 而言,它們使用的阻塞隊列是容量為 Integer.MAX_VALUE 的 LinkedBlockingQueue,可以認為是無界隊列。由於 FixedThreadPool 線程池的線程數是固定的,所以沒有辦法增加特別多的線程來處理任務,這時就需要 LinkedBlockingQueue 這樣一個沒有容量限制的阻塞隊列來存放任務。這里需要注意,由於線程池的任務隊列永遠不會放滿,所以線程池只會創建核心線程數量的線程,所以此時的最大線程數對線程池來說沒有意義,因為並不會觸發生成多於核心線程數的線程。
SynchrouousQueue
第二種阻塞隊列是 SynchronousQueue,對應的線程池是 CachedThreadPool。線程池 CachedThreadPool 的最大線程數是 Integer 的最大值,可以理解為線程數是可以無限擴展的。CachedThreadPool 和上一種線程池 FixedThreadPool 的情況恰恰相反,FixedThreadPool 的情況是阻塞隊列的容量是無限的,而這里 CachedThreadPool 是線程數可以無限擴展,所以 CachedThreadPool 線程池並不需要一個任務隊列來存儲任務,因為一旦有任務被提交就直接轉發給線程或者創建新線程來執行,而不需要另外保存它們。
DelayedWorkQueue
第三種阻塞隊列是DelayedWorkQueue,它對應的線程池分別是 ScheduledThreadPool 和 SingleThreadScheduledExecutor,這兩種線程池的最大特點就是可以延遲執行任務,比如說一定時間后執行任務或是每隔一定的時間執行一次任務。DelayedWorkQueue 的特點是內部元素並不是按照放入的時間排序,而是會按照延遲的時間長短對任務進行排序,內部采用的是“堆”的數據結構。之所以線程池 ScheduledThreadPool 和 SingleThreadScheduledExecutor 選擇 DelayedWorkQueue,是因為它們本身正是基於時間執行任務的,而延遲隊列正好可以把任務按時間進行排序,方便任務的執行。