在看這篇文章之前,請先了解一下線程的初始配置參數。
六大線程池
在我們日常業務開發中,如果遇到使用線程池的場景時,會先去思考一下這種場景需要使用到怎樣的線程池,去避免線程資源濫用。這個時候選擇困難症就來了,不過不用擔心,Java其實早就已經給我們提供了六種快速創建線程池的方法,並且不需要設置繁瑣參數,開箱即用。
- FixedThreadPool(有限線程數的線程池)
- CachedThreadPool (無限線程數的線程池)
- ScheduledThreadPool (定時線程池)
- SingleThreadExecutor (單一線程池)
- SingleThreadScheduledExecutor(單一定時線程池)
- ForkJoinPool (孕婦線程池)
FixedThreadPool(有限線程數的線程池)
FixedThreadPool線程池的特點是他的核心線程數和最大線程數是一樣的,你可以把它看作成是一個固定線程數的線程池,因為他不會去將超出線程數的線程緩存到隊列中,如果超出線程數了,就會按線程拒絕策略來執行。可以看下面的示例代碼
public class ThreadPoolMain {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 20; i++) {
executor.execute(()->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"執行了");
});
}
executor.shutdown();
}
}
這里在創建線程池的過程中之設置了一個線程數,但是他具體的設置參數是多少呢?
我們來看一下源碼
ExecutorService newFixedThreadPool(int nThreads)
ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue
workQueue)
RejectedExecutionHandler defaultHandler
仔細看會發現newFixedThreadPool創建也是通過ThreadPoolExecutor對象來創建的,只是將核心線程數和最大線程數設置成一樣,並且空閑線程等待時間為0,並且線程緩存隊列為0.在這里還有一個線程的拒絕策略,默認的拒絕策略為AbortPolicy。
詳細的執行流程看下面這張圖
線程池有 t0~t9,10 個線程,它們會不停地執行任務,如果某個線程任務執行完了,就會從任務隊列中獲取新的任務繼續執行,期間線程數量不會增加也不會減少,始終保持在 10 個。
CachedThreadPool(無限線程數的線程池)
看到Cached這個單詞應該就可以想到,這是一個可以緩存線程任務的線程池,並且直接執行,他的特點就是可以無限的緩存線程任務,最大可以達到Integer.MAX_VALUE,為 2^31-1,反正很大。
接下來我們就來看這個線程池的源碼
ExecutorService newCachedThreadPool()
可以看到,這個線程池的默認核心線程數為0,最大線程數為Integer.MAX_VALUE,同時還設置了空閑線程等待時間60秒,並且用SynchronousQueue隊列來緩存線程任務的數據。這里值得說一下,SynchronousQueue隊列其實並不緩存線程任務的數據,把它說成是線程的中轉站梗符合他一點,因為在線程的設置中,線程的最大數已經設置成Integer.MAX_VALUE,其實在這里,隊列再去緩存線程已經沒有多大意義了,而SynchronousQueue的好處就是他去中轉和傳遞線程任務時,效率比較高。
這個我們還看到設置了60秒的線程空閑時間,其實這個就很大程度的保持了線程池中線程數的活躍性,當沒有一個線程執行的時候,線程將不會有一個線程在維護,如果有線程進來了,那么我就立馬創建一個線程去使用,當使用完成之后,如果60秒后沒有任務在進來,那么這個線程將會被銷毀。如果有,就繼續接着用,這樣就最大限度了保證了線程池的靈活性。
ScheduledThreadPool(定時線程池)
這個線程就是為了定時而發明的,它支持定時或周期性執行任務,比如10秒鍾執行一次任務。
實現的方法具體如下
schedule
public class ThreadPoolMain {
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
Runnable task01 = new Runnable() {
@Override
public void run() {
String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
System.out.println(time+":schedule 執行了");
}
};
String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
System.out.println(time+":開始執行");
service.schedule(task01, 10, TimeUnit.SECONDS);
}
}
輸出結果
19:57:16:開始執行
19:57:26:schedule 執行了
這個方法我們可以看到,他只執行了一次,並且是在從項目啟動的時候開始計算時間,10秒后才開始執行,並且只執行一次。
scheduleAtFixedRate
public class ThreadPoolMain {
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
Runnable task02 = new Runnable() {
@Override
public void run() {
String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
System.out.println(time+":scheduleAtFixedRate 開始執行");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
System.out.println(time+":scheduleAtFixedRate 執行完成了");
}
};
String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
System.out.println(time+":開始執行");
service.scheduleAtFixedRate(task02, 10, 10, TimeUnit.SECONDS);
}
}
執行結果
19:59:20:開始執行
19:59:30:scheduleAtFixedRate 開始執行
19:59:32:scheduleAtFixedRate 執行完成了
19:59:40:scheduleAtFixedRate 開始執行
19:59:42:scheduleAtFixedRate 執行完成了
19:59:50:scheduleAtFixedRate 開始執行
19:59:52:scheduleAtFixedRate 執行完成了
這里我們可以看到,scheduleAtFixedRate設置了兩個參數,一個是initialDelay,另外一個是period,initialDelay表示項目啟動時,需要隔多久時間才開始執行第一次,period則表示后面繼續執行的間隔時間。這里你會發現,scheduleAtFixedRate會嚴格執行定時時間來執行,他不會管之前正在執行的定時方法有沒有執行完成,他還是會照樣不變10秒執行一次。
scheduleWithFixedDelay
public class ThreadPoolMain {
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
Runnable task03 = new Runnable() {
@Override
public void run() {
String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
System.out.println(time+":scheduleWithFixedDelay 開始執行");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
System.out.println(time+":scheduleWithFixedDelay 執行完成了");
}
};
String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
System.out.println(time+":開始執行");
service.scheduleWithFixedDelay(task03, 10, 10, TimeUnit.SECONDS);
}
}
執行結果
20:06:30:開始執行
20:06:40:scheduleWithFixedDelay 開始執行
20:06:42:scheduleWithFixedDelay 執行完成了
20:06:52:scheduleWithFixedDelay 開始執行
20:06:54:scheduleWithFixedDelay 執行完成了
20:07:04:scheduleWithFixedDelay 開始執行
20:07:06:scheduleWithFixedDelay 執行完成了
20:07:16:scheduleWithFixedDelay 開始執行
20:07:18:scheduleWithFixedDelay 執行完成了
scheduleWithFixedDelay方法其實和scheduleAtFixedRate方法很相像,但是他們有個區別,就是等不等的區別,像scheduleAtFixedRate方法,他不會等到之前的線程是否有沒有執行完成,他照樣還是會按照約定好的定時時間去執行任務,而scheduleWithFixedDelay不同,他會等待線程執行完成之后,上一次線程結束時間來開始計算。簡單來說他們的區別就是scheduleAtFixedRate的執行周期是按照線程任務的開始執行時間,scheduleWithFixedDelay的執行周期是按照線程任務的結束時間。
接下來我們來看看ScheduledExecutorService線程池的源碼
ScheduledThreadPoolExecutor(int corePoolSize)
super(...)
這里我們看到,我們給ScheduledExecutorService設置的線程數,其實就是核心線程數,最大線程數則是Integer.MAX_VALUE,並且線程空閑時間為0,並且這里的線程隊列使用的是優先隊列DelayedWorkQueue。其實ScheduledExecutorService強大的定時功能還是以優先隊列的基礎上來實現的,這里的原理我們就不多介紹。
SingleThreadExecutor(單一線程池)
這個線程很適用於需要按照提交順序去執行線程的場景,因為在他的線程池中,只有一個線程可以執行,他的原理其實跟FixedThreadPool有點相像,只不過他的核心線程數和最大線程數都是1,這樣當提交者去提交線程的時候,就必須先讓線程池中的線程執行完成之后才會去執行接下來的線程。這樣就保證了線程的順序性,而這種順序性,前面幾種線程的機制是做不到的。
SingleThreadScheduledExecutor(單一定時線程池)
這個線程池像是SingleThreadExecutor和ScheduledThreadPool生的娃,他有SingleThreadExecutor單一線程池的特性,一次只能執行一個線程,又可以完成ScheduledThreadPool線程池的定時功能。如果你能理解這個線程服務的能力,你就能理解這個線程的能力。
五種線程池的總結
我們對五種線程池的創建方式進行一個匯總,具體看下圖
其實看完上面這個圖就會發現,這五種線程池,其實就是Java提前給我們准備好的默認線程池,他們其實已經可以滿足我們日常業務開發中大部分的場景。
第六種線程池 ForkJoinPool
第六種線程池為什么要單獨拎出來說呢,是因為這個線程池是在JDK7加入的,他的名字其實也大概的描述了他的執行機制。我們先看下面這張圖
task任務為一個線程任務,但是這個任務下會有三個子任務,這三個任務又可以分為三個子線程去執行到這個就可以理解為Fork,但是這個task任務需要拿到結果,就需要他的三個子線程都執行完成才能拿到結果,這里其實就是將他的三個子任務去Join了,等到子任務都執行完了,才會返回task任務的執行結果。
另外,在ForkJoinPool線程池中,他們每個線程都有自己的獨立的任務隊列,例如下面這張圖所示
這里的理解其實就是,ForkJoinPool會有一個自己的公共隊列,當這個公共隊列執行的線程任務所Fork出來的子線程任務將不會放到公共隊列中,而是放在自己單獨的隊列中,這樣就互相不影響。這種設計其實就是為了減少線程間的競爭和切換,是很高效的一種方式。
當發生某一個線程任務的子任務很繁重,單另外一個線程的子任務卻很少時,ForkJoinPool會怎么去處理呢,其實在ForkJoinPool的子任務線程隊列中使用的是一種雙端隊列,在加上運行work-stealing偷線程任務的功能完美的平衡了各個線程的負載,
看到上面這個圖,當Thread1的線程任務滿了,而Thread0的線程隊列空閑的時候,Thread0的線程就會去Thread1那幫忙,這時Thread0線程就會跟Thread1訪問同一個隊列,也就是訪問Thread1的WorkQueue來幫他完成任務。
最后我們在看看一下ForkJoinPool的內部結構
你可以看到 ForkJoinPool 線程池和其他線程池很多地方都是一樣的,但重點區別在於它每個線程都有一個自己的雙端隊列來存儲分裂出來的子任務。ForkJoinPool 非常適合用於遞歸的場景,例如樹的遍歷、最優路徑搜索等場景。
ForkJoinPool 的使用方法
public class ForkJoinPoolDemo extends RecursiveAction {
/**
* 每個"小任務"最多只打印20個數
*/
private static final int MAX = 20;
private int start;
private int end;
public ForkJoinPoolDemo(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
//當end-start的值小於MAX時,開始打印
if((end-start) < MAX) {
for(int i= start; i<end;i++) {
System.out.println(Thread.currentThread().getName()+"的值"+i);
}
}else {
// 將大任務分解成兩個小任務
int middle = (start + end) / 2;
ForkJoinPoolDemo left = new ForkJoinPoolDemo(start, middle);
ForkJoinPoolDemo right = new ForkJoinPoolDemo(middle, end);
left.fork();
right.fork();
}
}
public static void main(String[] args) throws InterruptedException {
// 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的並行線程的ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 提交可分解的PrintTask任務
forkJoinPool.submit(new ForkJoinPoolDemo(0, 1000));
//阻塞當前線程直到 ForkJoinPool 中所有的任務都執行結束
forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
// 關閉線程池
forkJoinPool.shutdown();
}
}
這里的執行邏輯其實很簡單,ForkJoinPoolDemo會去判斷end值-start值是否大於MAX,如果小於則打印,不創建子線程,如果大於則會將start加上end值除以2,然后分解成兩個小任務,通過fork方法來執行,然后繼續執行剛才的邏輯。知道所有的子線程都小於MAX值打印出來了這個線程也就執行完成了。這個小demo大家可以仔細去研究一下,結合之前將的ForkJoinPool線程池的原理和概念,會更有助於你理解。