java8 線程池
java 線程的創建、銷毀和線程減切換是一件比較耗費計算機資源的事。如果我們需要用多線程處理任務,並頻繁的創建、銷毀線程會造成計算機資源的無端浪費,因此出現了線程池技術。在《java 並發編程的藝術》一書中,作者總結了三條使用線程池的好處:
降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。提高響應速度。當任務到達時,任務可以不需要的等到線程創建就能立即執行。提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。
線程池的使用
線程池的創建
線程池的創建可以通過創建 ThreadPoolExecutor
對象或者調用 Executors
的工廠方法來創建線程池。但是在阿里巴巴的 java 開發手冊中提到:
【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
說明: Executors 返回的線程池對象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
2) CachedThreadPool 和 ScheduledThreadPool:
允許的創建線程數量為 Integer.MAX_VALUE, 可能會創建大量的線程,從而導致 OOM。
ThreadPoolExecutor
因此先看一下怎么通過創建 ThreadPoolExecutor
對象來創建一個線程池。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
這是 ThreadPoolExecutor
的構造方法,其中的參數含義如下:
corePoolSize
:核心線程池大小, 當新的任務到線程池后,線程池會創建新的線程(即使有空閑線程),直到核心線程池已滿。maximumPoolSize
:最大線程池大小,顧名思義,線程池能創建的線程的最大數目keepAliveTime
:程池的工作線程空閑后,保持存活的時間TimeUnit
: 時間單位BlockingQueue
:用來儲存等待執行任務的隊列threadFactory
:線程工廠RejectedExecutionHandler
: 當隊列和線程池都滿了時拒絕任務的策略
重要參數的說明:
corePoolSize 和 maximumPoolSize
默認情況下線程中的線程初始時為 0, 當有新的任務到來時才會創建新線程,當線程數目到達corePoolSize
的數量時,新的任務會被緩存到workQueue
隊列中。如果不斷有新的任務到來,隊列也滿了的話,線程池會再新建線程直到總的線程數目達到maximumPoolSize
。如果還有新的任務到來,則要根據handler
對新的任務進行相應拒絕處理。BlockingQueue
一個阻塞隊列,用來存儲等待執行的任務,常用的有如下幾種:- ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
- LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按 FIFO (先進先出) 排序元素,吞吐量通常要高於 ArrayBlockingQueue。靜態工廠方法 Executors.newFixedThreadPool() 使用了這個隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於 LinkedBlockingQueue,靜態工廠方法 Executors.newCachedThreadPool 使用了這個隊列。
- PriorityBlockingQueue:一個具有優先級得無限阻塞隊列。
RejectedExecutionHandler
當隊列和線程池都滿了,說明線程池處於飽和狀態,那么必須采取一種策略處理提交的新任務。有下面四種 JDK 提供的策略:AbortPolicy
,表示無法處理新任務時拋出異常, 默認策略CallerRunsPolicy
:用調用者所在線程來運行任務。DiscardOldestPolicy
: 該策略將丟棄最老的一個請求,也就是即將被執行的任務,並嘗試再次提交當前任務。DiscardPolicy
:不處理,丟棄掉
除了這些 JDK 提供的策略外,還可以自己實現RejectedExecutionHandler
接口定義策略。
一個創建線程池的小例子:
public class CreateThreadPool {
public static void main(String args[]) {
//不建議的做法
ExecutorService executorService = Executors.newFixedThreadPool(2);
//使用 guava 開源框架的 ThreadFactoryBuilder 給線程池的線程設置名字
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-thread-%d").build();
ExecutorService pool = new ThreadPoolExecutor(4, 10, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(256),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
pool.execute(() -> System.out.println(Thread.currentThread().getName()));
pool.execute(() -> System.out.println(Thread.currentThread().getName()));
pool.execute(() -> System.out.println(Thread.currentThread().getName()));
pool.shutdown();
}
}
//輸出:
demo-thread-0
demo-thread-1
demo-thread-2
使用 Executors 的工廠方法創建線程
1. SingleThreadExecutor
創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。
此線程池保證所有任務的執行順序按照任務的提交順序執行。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
2. FixedThreadPool
創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。
線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
3. CachedThreadPool
創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,
那么就會回收部分空閑(60 秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。
此線程池不會對線程池大小做限制,線程池大小完全依賴於操作系統(或者說 JVM)能夠創建的最大線程大小, 極端情況下會因為創建過多線程而耗盡系統資源
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
這里雖然指定 maximumPool 為 Integer.MAX_VALUE
,但沒什么意義,如果不能滿足任務執行需求,CachedThreadPool
還會繼續創建新的線程。
4. ScheduledThreadPool
主要用來在給定的延遲之后運行任務,或者定期執行任務。
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
5. newWorkStealingPool
newWorkStealingPool
是 jdk1.8 才有的,會根據所需的並行層次來動態創建和關閉線程,通過使用多個隊列減少競爭,底層用的 ForkJoinPool
來實現的。ForkJoinPool
的優勢在於,可以充分利用多 cpu,多核 cpu 的優勢,把一個任務拆分成多個 “小任務”,把多個“小任務” 放到多個處理器核心上並行執行;當多個 “小任務” 執行完成之后,再將這些執行結果合並起來即可。
五種線程池的使用場景
- newSingleThreadExecutor:一個單線程的線程池,可以用於需要保證順序執行的場景,並且只有一個線程在執行。
- newFixedThreadPool:一個固定大小的線程池,可以用於已知並發壓力的情況下,對線程數做限制。
- newCachedThreadPool:一個可以無限擴大的線程池,比較適合處理執行時間比較小的任務。
- newScheduledThreadPool:可以延時啟動,定時啟動的線程池,適用於需要多個后台線程執行周期任務的場景。
- newWorkStealingPool:一個擁有多個任務隊列的線程池,可以減少連接數,創建當前可用 cpu 數量的線程來並行執行。
線程池的關閉
shutdown
shutdown
的原理是只是將線程池的狀態設置成 SHUTDOWN
狀態,然后中斷所有沒有正在執行任務的線程。
shutdownNow
shutdownNow
的原理是遍歷線程池中的工作線程,然后逐個調用線程的 interrupt
方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。shutdownNow
會首先將線程池的狀態設置成 STOP
,然后嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表。
通常調用 shutdown
來關閉線程池,如果任務不一定要執行完,則可以調用 shutdownNow
。
線程池的原理
一個線程池處理的基本流程如下 (jdk7 之前):
- 首先線程池判斷基本線程池是否已滿?沒滿,創建一個工作線程來執行任務。滿了,則進入下個流程。
- 其次線程池判斷工作隊列是否已滿?沒滿,則將新提交的任務存儲在工作隊列里。滿了,則進入下個流程。
- 最后線程池判斷整個線程池是否已滿?沒滿,則創建一個新的工作線程來執行任務,滿了,則交給飽和策略來處理這個任務。
jdk7 之后的線程池處理流程更復雜、判斷條件也更多:
execute
結合 ThreadPoolExecutor
類的源碼和圖看一下具體執行流程。
在 ThreadPoolExecutor
類中,最核心的任務提交方法是 execute()
方法:
public void execute(Runnable command) {
//判斷提交的任務是否為 null, 是則拋出異常
if (command == null)
throw new NullPointerException();
/*
* 獲取線程池控制狀態
* ctl 是一個 AtomicInteger 變量 (騷操作)
* jdk 8 中通過一個 int 值的前 28 位表示工作線程數量 workerCount, 剩余高位來表示 線程池狀態
* 計算 workerCount 和 runState 時通過掩碼計算。 CAPACITY = (1 << 29) - 1
* private static int runStateOf(int c) { return c & ~CAPACITY; }
* private static int workerCountOf(int c) { return c & CAPACITY; }
* */
int c = ctl.get();
//1. 當線程數小於 核心線程池容量時 將添加工作線程去執行任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get(); // 不成功則再次獲取線程池控制狀態
}
//2. (worker線程數量大於核心線程池容量時)如果線程池處於 RUNNING 狀態,將命令加入 workQueue 隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); //再次檢查防止狀態突變
if (! isRunning(recheck) && remove(command))
//2.1 如果狀態改變,線程池沒有 RUNNING 則將命令移出隊列,並拒絕執行
reject(command);
else if (workerCountOf(recheck) == 0)
//2.2 狀態沒有改變,線程池 RUNNING,但 worker線程數量為 0, 則添加非core的worker線程
addWorker(null, false);
}
//3. 如果線程池沒有 RUNNING 並嘗試添加非core的 worker 線程失敗,那就拒絕執行
else if (!addWorker(command, false))
reject(command);
}
- 如果運行的線程小於
corePoolSize
,則嘗試使用用戶定義的Runnalbe
對象創建一個新的線程。調用addWorker()
函數會原子性的檢查runState
和workCount
,通過返回false
來防止在不應該添加線程時添加了線程 - 如果一個任務能夠成功入隊列,在添加一個線程時仍需要進行雙重檢查(可能因為在前一次檢查后該線程死亡了),或者當進入到此方法時,線程池已經 shutdown 了,所以需要再次檢查狀態。若線程此時的狀態不是 RUNNING,則需要回滾入隊列操作;或者當線程池沒有工作線程時,需要創建一個新的工作線程。
- 如果無法入隊列,那么需要增加一個新工作線程,如果此操作失敗,那么就意味着線程池已經 SHUTDOWN 或者已經飽和了,所以拒絕任務
上面提到了線程池的狀態,那就來看一下:
- RUNNING = -1 << COUNT_BITS; // 運行狀態, 也是線程池的初始狀態
- SHUTDOWN = 0 << COUNT_BITS; // 不再接收新的任務,但是會處理隊列中的任務
- STOP = 1 << COUNT_BITS; // 不再接收新的任務,也不會處理隊列中的任務並且會中斷正在執行的任務
- TIDYING = 2 << COUNT_BITS; // 所有線程已經終止,並且工作線程數
workerCount
等於 0。在進入此狀態會會調用terminated()
方法 - TERMINATED = 3 << COUNT_BITS; // 終止狀態, 在
terminated()
方法返回后,由 TIDYING 狀態進入此狀態。
狀態轉換圖:
addWorker()
此方法用來創建新的線程添加到線程池
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/*
* 檢查線程池狀況, 確保此時可以添加新的線程,
* 如果是runing,那么跳過if。
* 如果rs>=SHUTDOWN,同時不等於SHUTDOWN,即為SHUTDOWN以上的狀態,那么不接受新線程。
* 如果rs>=SHUTDOWN,同時等於SHUTDOWN,同時first != null,那么拒絕新線程,
* 如果為Empty,那么隊列已空,不需要增加消耗線程,返回 false。
* */
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
/*
* 判斷線程池是否已滿,如果線程數大於等於最大容量 CAPACITY 直接返回false
* core 是一個boolean 參數,表明調用者想把此線程添加到哪個線程池
* 根據 core 的值判斷要添加的線程池是否已滿
**/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS 操作增加工作線程數
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//CAS 操作失敗, 再次檢查狀態重來一次
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//創建新的線程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//獲取到鎖
mainLock.lock();
try {
// 再次檢查狀態,因為狀態可能在獲取鎖之前改變
int rs = runStateOf(ctl.get());
//確保當前線程池還接收新的線程
//結合上面的線程狀態知道:當狀態值大於等於 SHUTDOWN 時 線程池就不再接收新的線程了
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//線程添加成功后就可以啟動線程准備執行了
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
從上面兩段源碼可以看到, 在添加新的線程進入線程池時,各種操作非常的嚴謹細致,往往需要多次檢查狀態,確保線程池的正確運行。
worker 工作線程
線程池創建線程時,會將線程封裝成工作線程 Worker,Worker 在執行完任務后,還會無限循環獲取工作隊列里的任務來執行。我們可以從 Worker 的 runWorker 方法里看到:
final void runWorker(Worker w) {
// 獲取當前線程
Thread wt = Thread.currentThread();
// 獲取w的firstTask
Runnable task = w.firstTask;
// 設置w的firstTask為null
w.firstTask = null;
// 釋放鎖(設置state為0,允許中斷)
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 任務不為null或者阻塞隊列還存在任務
// 獲取鎖
w.lock();
/*
* 這里的檢查主要是確保線程池此時還能接收新的任務去執行, 如果不在接收新的任務
* 則應該中斷當前線程
**/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 在執行之前調用鈎子函數
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 運行給定的任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 執行完后調用鈎子函數
afterExecute(task, thrown);
}
} finally {
task = null;
// 增加給worker完成的任務數量
w.completedTasks++;
// 釋放鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 處理完成后,調用鈎子函數
processWorkerExit(w, completedAbruptly);
}
}
參考
https://www.cnblogs.com/javanoob/p/threadpool.html
http://ifeve.com/java-threadpool/
https://mp.weixin.qq.com/s/5dexEENTqJWXN_17c6Lz6A