ThreadPoolExecutor是線程池的框架。雖然好多大佬都分析過了,為了加深理解,今天我也來分析一下ThreadPoolExecutor的源碼
ThreadPoolExecutor這個類上面的英文注釋已經很詳細了,一看就能明白。這部分就直接把對應的英文翻譯成中文。
下面這一段中文就全部是類上面的英文的翻譯
一個 ExecutorService 使用可能的幾個池線程之一執行每個提交的任務,通常使用 Executors 工廠方法配置。
線程池解決兩個不同的問題:由於減少了每個任務的調用開銷,它們通常在執行大量異步任務時提供改進的性能,並且它們提供了一種限制和管理資源的方法,包括在執行集合時消耗的線程任務。 每個ThreadPoolExecutor還維護一些基本的統計信息,例如已完成的任務數。
為了在廣泛的上下文中有用,此類提供了許多可調整的參數和可擴展性hooks。 但是,強烈建議程序員使用更方便的 Executors工廠方法 Executors.newCachedThreadPool(無界線程池,具有自動線程回收)、Executors.newFixedThreadPool(固定大小線程池)和 Executors.newSingleThreadExecutor(單個后台線程),它們為最常見的使用場景進行了預先配置。
否則,在手動配置和調整此類時使用以下指南:
-
線程池中核心線程和最大線程大小
ThreadPoolExecutor 將根據 corePoolSize和 maximumPoolSize設置的線程池大小。
在方法 execute(Runnable) 中提交新任務時,
- 如果正在運行的線程少於 corePoolSize,即使其他工作線程處於空閑狀態,也會創建一個新線程來處理請求,
- 如果正在運行的線程數大於corePoolSize同時少於 maximumPoolSize,則只有在隊列已滿時才會創建一個新線程來處理請求。
通過將 corePoolSize 和 maximumPoolSize 設置為相同,就可以創建一個固定大小的線程池。
通過將maximumPoolSize 設置為本質上無界的值,例如Integer.MAX_VALUE,可以允許池容納任意數量的並發任務。
最常見的是,核心和最大池大小僅在構造時設置,但它們也可以使用 setCorePoolSize 和 setMaximumPoolSize 動態更改。
-
按需構建
默認情況下,核心線程只有在新任務到達時才最初創建和啟動,但這可以使用方法 prestartCoreThread 或 prestartAllCoreThreads 動態覆蓋。如果使用非空隊列構造池,可能想要預啟動線程。
-
創建新線程
使用 ThreadFactory方法創建新線程。如果沒有另外指定,則使用 Executors.defaultThreadFactory,它創建的線程都在同一個 ThreadGroup 中,並且具有相同的 NORM_PRIORITY 優先級和非守護進程狀態。通過提供不同的 ThreadFactory,可以更改線程的名稱、線程組、優先級、守護程序狀態等。執行任何任務。線程應該擁有“modifyThread”運行時權限。
如果工作線程或其他使用池的線程不具備此權限,則服務可能會降級:配置更改可能無法及時生效,關閉池可能會一直處於可以 終止但未完成的狀態。
-
保活時間
如果線程池中當前有超過 corePoolSize 的線程,則如果空閑時間超過 keepAliveTime,多余的線程將被終止。這提供了一種在不繁忙使用線程池時減少資源消耗的方法。如果線程池稍后變得更加繁忙,則將構建新線程。也可以使 setKeepAliveTime(long, TimeUnit) 方法動態更改此參數。使用 Long.MAX_VALUE TimeUnit.NANOOSECONDS 值可以有效地禁止空閑線程在關閉之前終止。
默認情況下,保持活動策略僅在有超過 corePoolSize 個線程時適用,但方法 allowCoreThreadTimeOut(boolean) 也可用於將此超時策略應用於核心線程,只要 keepAliveTime 值不為零.
-
隊列
任何 BlockingQueue 都可用於傳輸和保存提交的任務隊列。此隊列的使用與池大小交互:
- 如果線程池中正在運行的線程少於 corePoolSize,則 Executor將創建新線程執行任務而不是將任務添加到任務隊列。
- 如果線程池中大於corePoolSize 的線程正在運行,Executor 總是將任務添加到隊列中而不是創建新線程。
- 如果線程池中線程數大於corePoolSize、少於 maximumPoolSize、任務隊列已滿,則會創建一個新線程,
- 如果線程池中線程數大於maximumPoolSize、任務隊列已滿,在這種情況下,任務將被拒絕。
隊列的一般形式:
- 直接交接。
工作隊列的一個很好的默認選擇是 SynchronousQueue,它將任務移交給線程而不用其他方式保留它們。 在這里,如果沒有線程可立即運行,則將任務排隊的嘗試將失敗,因此將構建一個新線程。 在處理可能具有內部依賴性的請求集時,此策略可避免鎖定。 直接切換通常需要無限的maximumPoolSizes 以避免拒絕新提交的任務。 這反過來又承認了當命令平均持續到達速度快於它們可以處理的速度時無限線程增長的可能性。
- 無界隊列。
使用無界隊列(例如,沒有預定義容量的 LinkedBlockingQueue)將導致新任務在所有 corePoolSize 線程都忙時在隊列中等待。因此,不會創建超過 corePoolSize 的線程。 (因此maximumPoolSize的值沒有任何影響。)當每個任務完全獨立於其他任務時,這可能是合適的,因此任務不會影響彼此的執行;例如,在網頁服務器中。雖然這種排隊方式在平滑請求的瞬時爆發方面很有用,但它承認當命令的平均到達速度超過它們的處理速度時,工作隊列可能會無限增長。
- 有界隊列。
有界隊列(例如,ArrayBlockingQueue)在與有限的 maximumPoolSizes 一起使用時有助於防止資源耗盡,但可能更難以調整和控制。隊列大小和最大池大小可以相互權衡:使用大隊列和小池可以最大限度地減少 CPU 使用率、操作系統資源和上下文切換開銷,但會導致人為地降低吞吐量。如果任務頻繁阻塞(例如,如果它們受 I/O 限制),則系統可能能夠為比您允許的更多線程安排時間。使用小隊列通常需要更大的池大小,這會使 CPU 更忙,但可能會遇到不可接受的調度開銷,這也會降低吞吐量。
-
拒絕任務
當 Executor 已經關閉,並且當 Executor 對最大線程和工作隊列容量使用有限邊界並且飽和時,在方法 execute(Runnable) 中提交的新任務將被拒絕。 無論哪種情況,execute 方法都會調用其 RejectedExecutionHandler 的RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) 方法。提供了四個預定義的處理程序策略:
- 在默認的 ThreadPoolExecutor.AbortPolicy 中,處理程序在拒絕時拋出運行時 RejectedExecutionException。
- 在 ThreadPoolExecutor.CallerRunsPolicy 中,調用執行自身的線程運行任務。這提供了一個簡單的反饋控制機制,可以減慢提交新任務的速度。
- 在 ThreadPoolExecutor.DiscardPolicy 中,無法執行的任務被簡單地丟棄。此策略僅適用於從不依賴任務完成的極少數情況。
- 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行器沒有關閉,工作隊列頭部的任務會被丟棄,然后重試執行(可能會再次失敗,導致重復)。這種策略很少被接受。在幾乎所有情況下,您還應該取消任務以在任何等待其完成的組件中導致異常,和/或記錄失敗,如 ThreadPoolExecutor.DiscardOldestPolicy 文檔中所示。
可以定義和使用其他類型的 RejectedExecutionHandler 類。這樣做需要小心,特別是當策略設計為僅在特定容量或排隊策略下工作時。
-
Hook方法
此類提供受保護的可覆蓋 beforeExecute(Thread, Runnable) 和 afterExecute(Runnable, Throwable) 方法,這些方法在每個任務執行之前和之后調用。這些可用於操作執行環境;例如,重新初始化 ThreadLocals、收集統計信息或添加日志條目。此外,可以覆蓋已終止的方法以執行在 Executor 完全終止后需要完成的任何特殊處理。
如果鈎子、回調或 BlockingQueue 方法拋出異常,內部工作線程可能會依次失敗、突然終止並可能被替換。
-
隊列維護
方法 getQueue() 允許訪問工作隊列以進行監視和調試。強烈建議不要將此方法用於任何其他目的。提供的兩種方法 remove(Runnable) 和 purge 可用於在大量排隊任務被取消時協助存儲回收。
-
Reclamation
程序中不再引用並且沒有剩余線程的池可以在不顯式關閉的情況下被回收(垃圾收集)。您可以通過設置適當的保持活動時間、使用零核心線程的下限和/或設置 allowCoreThreadTimeOut(boolean) 來配置池以允許所有未使用的線程最終死亡。
package java.util.concurrent;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
*/
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* ctl 用來表示線程池到狀態,包含兩部分
* workerCount, 代表有效的線程數量
* runState, 代表當前線程池的狀態running, shutting down等等
*
*
* runState 提供了線程池生命周期的狀態。主要有以下取值:
*
* RUNNING: 接受新任務並處理任務隊列中的任務
* SHUTDOWN: 不接受新任務,但是處理任務隊列中的任務
* STOP: 不接受新任務,也不處理任務隊列中的任務並且終端當前正在處理的任務
* TIDYING: 所有任務已經結束, workerCount==0,線程轉化到TIDYING狀態后將執行terminated() hook方法
* TERMINATED: terminated()方法已經執行完成
*
* 這些取值之間的數字順序很重要,就可以直接用數字進行比較。 runState 單調遞增,但並不是會經過每個狀態。
* 這些狀態之間的過渡是:
*
* RUNNING -> SHUTDOWN # 當前調用了shutdown()
*
* (RUNNING or SHUTDOWN) -> STOP # 當調用了shutdownNow()
*
* SHUTDOWN -> TIDYING # 當任務隊列和線程池中線程都為空時
*
* STOP -> TIDYING # 當線程池為空時
*
* TIDYING -> TERMINATED # 當terminated() hook 方法執行完成時
*
* 當調用awaitTermination()后,會一直等待,直到線程池狀態到達TERMINATED時返回
*
* 檢測從 SHUTDOWN 到 TIDYING 的轉換並不像您想要的那么簡單,因為在 SHUTDOWN 狀態期間隊列可能會在非空之后變為空,
* 反之亦然,但是我們只能在看到它為空之后才能終止,我們看到 workerCount 是 0(有時需要重新檢查——見下文)。
*/
//ctl用來表示狀態和做線程數統計;int用2進制表示32位,前面3位表示狀態,后面29位表示線程數
//這里在初始化對象時就已經設置了RUNNING,所以runState一開始就是RUNNING
//runState 表示線程池狀態
//workerCount 表示線程池中線程數
//runState,workerCount這兩個變量是不存在的,runState和workerCount兩個變量一起組成了ctl
//前面3位表示runState,后面29位表示workerCount
//所以我們也可以認為runState,workerCount是存在的
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//COUNT_BITS表示位移量
private static final int COUNT_BITS = Integer.SIZE - 3;
//COUNT_MASK表示11111111111111111111111111111(29個1)
//ctl&COUNT_MASK就是只保留ctl低29位,結果也就是當前線程池中線程的數量
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 分別左移29位,也就是int后面29位都是0,用前面3位來表示runState
//它們的2進制和10進制分別時下面的值,可以看到它們的int值時從小到大排列的
// RUNNING = 11100000000000000000000000000000 --> -536870912
// SHUTDOWN = 0 --> 0
// STOP = 100000000000000000000000000000 --> 536870912
// TIDYING = 1000000000000000000000000000000 --> 1073741824
// TERMINATED = 1100000000000000000000000000000 --> 1610612736
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// ~COUNT_MASK表示將int前3位置1,后面29位置0;runStateOf根據傳入的參數計算當前線程池的狀態
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
//workerCountOf根據傳入的參數計算線程池中的線程數
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//由於runState是int,通過比較int大小來確定它們的狀態
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
//由於runState是int,通過比較int大小來確定它們的狀態
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
//通過上面runState幾種取值的定義,也能看到只有RUNNING這一種狀態是小於SHUTDOWN的
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
//通過CAS的方式將ctl+1。用在創建一個線程后,增加線程的數量
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
//通過CAS的方式將ctl-1。用在一個線程結束后,減少線程的數量
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
//和上面的用法是一樣的,ctl-1
private void decrementWorkerCount() {
ctl.addAndGet(-1);
}
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
//用來保存任務的隊列
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
//當前線程池中所有線程集合
private final HashSet<Worker> workers = new HashSet<>();
//用來喚醒調用awaitTermination的掛起
private final Condition termination = mainLock.newCondition();
//線程池的線程從啟動到現在曾經的最大線程數量。比如之前線程數是10,現在降到8,這個值還會10
private int largestPoolSize;
//完成任務的數量
private long completedTaskCount;
//線程工廠,用來產生新的線程
private volatile ThreadFactory threadFactory;
//當前的拒絕任務執行器
private volatile RejectedExecutionHandler handler;
//保活時間,如果當前線程數<corePoolSize,<=maximumPoolSize,會將空閑時間超過keepAliveTime的線程結束掉
//如果allowCoreThreadTimeOut==false,當線程數<corePoolSize,<=maximumPoolSize時,會將空閑時間超過keepAliveTime的線程結束掉
//如果allowCoreThreadTimeOut==true,將空閑時間超過keepAliveTime的線程結束掉
private volatile long keepAliveTime;
//是否允許當前線程數<corePoolSize,繼續結束空閑時間超過keepAliveTime的線程
private volatile boolean allowCoreThreadTimeOut;
//核心線程數
private volatile int corePoolSize;
//最大線程數
private volatile int maximumPoolSize;
//默認的拒絕任務執行器
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
//用來執行任務的實體。實現了Runnable接口,代表一個可執行的線程。同時內部持有了一個Thread對象(用來真正執行任務的線程)
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//真正執行任務的線程
@SuppressWarnings("serial") // Unlikely to be serializable
final Thread thread;
//當前worker執行的第一個任務,有可能是null
@SuppressWarnings("serial") // Not statically typed as Serializable
Runnable firstTask;
//完成的任務數
volatile long completedTasks;
// TODO: switch to AbstractQueuedLongSynchronizer and move
// completedTasks into the lock word.
//根據傳入的任務,創建對象並創建內部真正執行任務的線程,參數可能是null
Worker(Runnable firstTask) {
//這里將state=-1,這時是不能加鎖的。因為加鎖是判斷state是不是等於0,等於0的場景下才能加鎖成功。
//要執行lock方法,就需要先將state改成0。所以在runWorker方法中開始部分就會調用unlock將state設置成0.
//通過state==-1,不能執行加鎖操作來禁用中斷
//0代表當前未加鎖,1代表當前加鎖中
setState(-1);
//賦值第一個任務
this.firstTask = firstTask;
//通過線程工廠創建線程
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
//內部方法(主要是提供出來供子類復寫),是否已經被加鎖
protected boolean isHeldExclusively() {
return getState() != 0;
}
//內部方法(主要是提供出來供子類復寫),嘗試加鎖,成功返回true,失敗返回false
//注意:這里的鎖是不可重入的。這里的入參也是沒有使用到的。里面只是cas的方式將state從0設置成1
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//內部方法(主要是提供出來供子類復寫),解鎖
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//加鎖,如果不成功,會阻塞。直到成功或拋出異常
public void lock() { acquire(1); }
//嘗試加鎖,不阻塞。成功返回tue,失敗返回false
public boolean tryLock() { return tryAcquire(1); }
//解鎖
public void unlock() { release(1); }
//判斷是否已經加鎖
public boolean isLocked() { return isHeldExclusively(); }
//如果當前線程已經啟動且當前線程沒有被中斷,就中斷當前線程
void interruptIfStarted() {
Thread t;
//通過state>=0,表示已經執行了runWorker方法
//初始化的state==-1,在runWorker方法開始就會將state從-1設置成0,此后state不會小於0
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
//入參targetState取值是SHUTDOWN 或 STOP
//如果runState小於targetState,就將runState設置成targetState
private void advanceRunState(int targetState) {
// assert targetState == SHUTDOWN || targetState == STOP;
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
//如果(關閉和池和隊列為空)或(停止和池為空),則轉換到 TERMINATED 狀態。
//如果有資格終止但 workerCount 非零,則中斷空閑的工作程序以確保關閉信號傳播。
//必須在任何可能使終止成為可能的操作之后調用此方法 - 在關閉期間減少工作線程數或從隊列中刪除任務。
//該方法是非私有的,允許從 ScheduledThreadPoolExecutor 訪問。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//1.如果當前runState==RUNNING說明當前線程池正在運行,就直接返回
//2.runStateAtLeast(c, TIDYING) ,當前runState==TIDYING或者TERMINATED,不需要處理
//3.runStateLessThan(c, STOP) 這個條件成立的runState取值只有RUNNING和SHUTDOWN,RUNNING在前面已經處理
//所以這里的runState就只有SHUTDOWN,所以也就是如果當前狀態是SHUTDOWN且隊列中還有任務,也不處理
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
//能走到下面,說明當前runState==SHUTDOWN或者runState==STOP
//如果當前還有線程存活,就中斷一個線程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//能走到下面,說明當前runState==SHUTDOWN或者runState==STOP,且當前線程存活數==0
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//將runState設置成TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//這個方法是留給子類去實現的,子類可以在這個方法中執行線程池停止后的清理工作
terminated();
} finally {
//將runState設置成TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//喚醒在awaitTermination方法上的掛起
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
private void checkShutdownAccess() {
// assert mainLock.isHeldByCurrentThread();
@SuppressWarnings("removal")
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
for (Worker w : workers)
security.checkAccess(w.thread);
}
}
//中斷所有的線程
private void interruptWorkers() {
// assert mainLock.isHeldByCurrentThread();
for (Worker w : workers)
w.interruptIfStarted();
}
//中斷空閑的線程
//參數onlyOne==true表示中斷一個線程
//參數onlyOne==false表示中斷所有線程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//首先判斷當前線程是否已經被中斷,如果已經中斷就再執行中斷
//w.tryLock()是判斷當前線程是否在執行任務(每次在獲取任務后會執行lock,任務執行完后會執行unlock),
//tryLock方法執行成功說明線程當前空閑中(在等待獲取任務),這時就執行中斷
//注意:onlyOne在這個if外面,就說明有可能一個中斷都沒執行,onlyOne==false時,也可能一個中斷都沒執行
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
//中斷一個空閑的線程
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
//是否中斷一個線程
private static final boolean ONLY_ONE = true;
//將任務交給拒絕任務執行器去處理
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
//在調用關閉時運行狀態轉換后執行任何進一步的清理。 此處無操作,但被 ScheduledThreadPoolExecutor 用於取消延遲任務。
void onShutdown() {
}
//將任務隊列中的任務轉移出去
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
//添加一個線程到線程池,firstTask是新創建線程的第一個任務,core表示當前添加的是否是核心線程
//返回的結果表示新增線程是否成功
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
//這個條件寫的比較繞:主要也就是兩種場景
//1.runState==SHUTDOWN條件下,firstTask!=null或者任務隊列是空的,就會直接返回false
//2.runState>=STOP條件下,這種場景下就直接返回false
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
//這個if主要是根據我們傳入的core來判斷當前線程池的線程數
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
//將線程數計數+1,在多線程場景下有可能失敗,所以需要重新獲取c,重新執行
if (compareAndIncrementWorkerCount(c))
//走到這里,就會直接跳出雙層循環
break retry;
c = ctl.get(); // Re-read ctl
//如果runState不是RUNNING,就又回回到上面去重新開始,根據實際情況從上面兩個return false的地方返回
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//走到下面就是需要去新增線程了
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//創建線程池中線程的載體
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//這里會再次獲取runState
int c = ctl.get();
//判斷線程池的狀態,
//1.runState==RUNNING
//2.runState==SHUTDOWN且firstTask==null就會去執行后續操作
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
//這里會對線程的狀態進行判斷,線程在執行start方法之前的狀態== Thread.State.NEW
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
//將創建的線程的載體加到線程池中
workers.add(w);
//標記線程增加成功
workerAdded = true;
int s = workers.size();
//更新線程數量
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
//如果線程添加成功,就啟動線程,標記線程啟動成功
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果線程啟動沒有標記為true,說明線程增加沒有成功,后續就需要執行對應的清理工作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
//這里就會對增加線程失敗進行處理
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//從線程隊列里面刪除線程,這個remove 不存在也不會報錯
if (w != null)
workers.remove(w);
//線程計數-1
decrementWorkerCount();
//因為有可能是runState變化,導致添加線程失敗,所以這里需要runState去判斷是否能結束線程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
//這個方法只會在runWorker中調用
//completedAbruptly表示是不是執行過程中出現異常導致線程結束。true表示出現異常,false表示正常原因結束(getTask()返回值是null)
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果是異常原因導致線程退出,之前線程計數中沒有進行-1操作,這里就需要補操作
//對於正常原因導致線程退出的,這個-1操作已經在getTask()中執行了
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//將退出線程完成的任務數加到總完成任務數里面
completedTaskCount += w.completedTasks;
//從線程集合中移除線程
workers.remove(w);
} finally {
mainLock.unlock();
}
//嘗試去結束線程池
tryTerminate();
int c = ctl.get();
//如果runState==RUNNING或SHUTDOWN,就會去根據實際需要增加線程
if (runStateLessThan(c, STOP)) {
//如果線程是正常結束,判斷線程池允許的最小線程數
//如果允許核心線程超時,那最小線程數就是0;否則就是corePoolSize
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果任務隊列中還有任務,那最小線程數就需要大於0
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果當前線程大於最小需求量,那就不需要增加線程數,直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//作為非核心線程加到線程池中
addWorker(null, false);
}
}
//在這里就會從任務隊列中獲取任務,如果返回null,那就是對應線程需要正常退出了
private Runnable getTask() {
//表示從任務隊列中獲取數據的時候是否已經超時
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// Check if queue empty only if necessary.
//這里有兩個條件
//1.當前狀態是SHUTDOWN,任務隊列為空,工作線程數-1,這里直接返回null
//2.當前狀態>=STOP,這時不會去判斷任務隊列,工作線程數-1,直接返回null
//這里也重點體現了SHUTDOWN和STOP的區別,
//對於SHUTDOWN,還會繼續從任務隊列中獲取任務;STOP就不會從任務隊列中獲取任務,直接從這里返回null
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//獲取到當前線程數
int wc = workerCountOf(c);
// Are workers subject to culling?
//這里會判斷當前線程是否允許超時,這里也有兩種場景
//1.如果設置了允許核心線程超時,那就表示所有的線程都會超時,當前線程就允許超時
//2.如果當前線程池中線程數>核心線程數,當前線程也允許超時
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1.如果當前線程數大於了最大線程數,這時就要減少線程數,直到降低到等於maximumPoolSize大小
//2.如果當前線程允許超時,且已經超時
//在上面兩種情況下,還需要判斷線程數>0或者任務隊列為空,這時才會去減少線程。
//主要是為了避免,任務隊列中有任務,但是當前線程池已經沒有線程了這種情況
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//這里就是從任務隊列中獲取任務了,如果當前線程允許超時,那就通過poll的方式來獲取,要不返回獲取的任務,要不返回null
//如果當前線程不允許超時,那就通過take使用阻塞的方式來獲取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//如果獲取到了任務,就會從這里返回任務,后續去對獲取到的任務進行處理
if (r != null)
return r;
//走到這里說明沒有獲取到任務,是由於poll時間到達,返回了null
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
//這個是執行任務的主方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
//這里會循環通過getTask來獲取任務后調用任務的run方法來執行
//如果getTask返回是null,那說明當前線程需要結束,后面就會結束當前線程
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//1.判斷當前runState>=STOP,當前線程沒有被中斷,則需要中斷當前線程
//2.如果當前線程已經被中斷且當前runState>=STOP,Thread.interrupted()這個會清除當前的中斷標記
//上面兩種條件都成立的情況下,這時就要去中斷當前線程
//注意:我們從這里也能看到如果當前runState<STOP,這時由於調用了Thread.interrupted(),就只會清除當前的中斷標記
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//這個是留給子類去擴展的
beforeExecute(wt, task);
try {
//調用任務的run方法去執行任務
task.run();
//留給子類去擴展
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
//當前線程執行的任務書+1
w.completedTasks++;
w.unlock();
}
}
//這個沒有在finally類面,沒有拋出異常才會走到這里
//通過這個參數判斷當前線程是由於沒有了任務正常結束,還是由於拋出異常走到這里
//對於異常原因,processWorkerExit方法中會讓線程數-1,同時會重新創建一個新的線程
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// Public constructors and methods
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters, the
* {@linkplain Executors#defaultThreadFactory default thread factory}
* and the {@linkplain ThreadPoolExecutor.AbortPolicy
* default rejected execution handler}.
*
* <p>It may be more convenient to use one of the {@link Executors}
* factory methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and the {@linkplain ThreadPoolExecutor.AbortPolicy
* default rejected execution handler}.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and the
* {@linkplain Executors#defaultThreadFactory default thread factory}.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@link RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
//這個是我們外部代碼調用的入口
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//1.如果少於 corePoolSize 線程正在運行,嘗試以給定的任務作為第一個啟動一個新線程任務。
//對 addWorker 的調用以原子方式檢查 runState 和workerCount,因此可以防止會增加的誤報不應該線程時,返回 false。
//2.如果一個任務可以添加到任務隊列,那么還需要檢查是否應該添加一個線程(因為現有的自上次檢查后死亡)或線程池在進入此方法后
//關閉。 所以需要重新檢查狀態,並在必要時回滾入隊,如果停止,如果沒有,則啟動一個新線程。
//3.如果任務不能添加到任務隊列中,那么嘗試添加一個新的線。 如果它失敗了,那么說明已經關閉或飽和了所以拒絕任務。
int c = ctl.get();
//這里就是上面說的第1種情況,
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//這里說的是第2中情況
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//這里對線程池的狀態進行了重新判斷,確保將任務添加到任務隊列中整個過程線程池的狀態都是RUNNING
//如果將任務添加到任務隊列后,發現線程池狀態已經不是RUNNING了,這時就需要將任務從任務隊列中移除掉
if (! isRunning(recheck) && remove(command))
//交給RejectedExecutionHandler去處理
reject(command);
//任務添加到任務隊列了,但是當前線程池中沒有線程了,這時就要新創建一個線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//第3中情況,核心線程數滿了,任務隊列也滿了,就會走到這里,去創建個新線程去執行任務,
//如果當前線程池中的線程數已經>=maximumPoolSize,addWorker也會返回false
else if (!addWorker(command, false))
reject(command);
}
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
//shutdown和shutdownNow兩個方法代碼基本是一樣的,只不過一個是將runState設置成SHUTDOWN,一個是設置成STOP
//SHUTDOWN和STOP都不會接收提交給線程池的任務,區別是STOP不會運行繼續執行任務隊列的任務,而SHUTDOWN將繼續執行任務隊列中的任務
//SHUTDOWN只會中斷當前沒有執行任務的線程,而STOP會中斷所有的線程
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//設置當前線程池狀態為SHUTDOWN
advanceRunState(SHUTDOWN);
//嘗試去中斷所有空閑的線程,
//interruptIdleWorkers方法中會首先嘗試調用worker.tryLock()方法,成功后就會去中斷對應線程
//worker類在runWorker方法執行任務先后會分別調用worker.lock,worker.unlock。
//所以這里只會中斷沒有執行任務的線程
interruptIdleWorkers();
//留給子類去實現
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//嘗試去結束線程池
tryTerminate();
}
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* interrupts tasks via {@link Thread#interrupt}; any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
//這里和上面的shutdown方法也有區別,這里會中斷所有的線程
interruptWorkers();
//STOP狀態不會執行任務隊列中剩余任務,這里會將剩余任務返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
public boolean isShutdown() {
return runStateAtLeast(ctl.get(), SHUTDOWN);
}
/** Used by ScheduledThreadPoolExecutor. */
boolean isStopped() {
return runStateAtLeast(ctl.get(), STOP);
}
/**
* Returns true if this executor is in the process of terminating
* after {@link #shutdown} or {@link #shutdownNow} but has not
* completely terminated. This method may be useful for
* debugging. A return of {@code true} reported a sufficient
* period after shutdown may indicate that submitted tasks have
* ignored or suppressed interruption, causing this executor not
* to properly terminate.
*
* @return {@code true} if terminating but not yet terminated
*/
//只有RUNNING是運行的狀態,其他狀態除了TERMINATED都可以認為是中間的過度狀態
//這里就通過runState來判斷線程池是否在停止中,但是還沒有完全停止
public boolean isTerminating() {
int c = ctl.get();
return runStateAtLeast(c, SHUTDOWN) && runStateLessThan(c, TERMINATED);
}
public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
}
//這個是等待當前線程池結束
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
while (runStateLessThan(ctl.get(), TERMINATED)) {
if (nanos <= 0L)
return false;
//這里通過Condition來實現的等待,如果線程池結束,tryTerminate方法最后就會調用signalAll喚醒這里
nanos = termination.awaitNanos(nanos);
}
return true;
} finally {
mainLock.unlock();
}
}
// Override without "throws Throwable" for compatibility with subclasses
// whose finalize method invokes super.finalize() (as is recommended).
// Before JDK 11, finalize() had a non-empty method body.
/**
* @implNote Previous versions of this class had a finalize method
* that shut down this executor, but in this version, finalize
* does nothing.
*/
@Deprecated(since="9")
protected void finalize() {}
/**
* Sets the thread factory used to create new threads.
*
* @param threadFactory the new thread factory
* @throws NullPointerException if threadFactory is null
* @see #getThreadFactory
*/
public void setThreadFactory(ThreadFactory threadFactory) {
if (threadFactory == null)
throw new NullPointerException();
this.threadFactory = threadFactory;
}
/**
* Returns the thread factory used to create new threads.
*
* @return the current thread factory
* @see #setThreadFactory(ThreadFactory)
*/
public ThreadFactory getThreadFactory() {
return threadFactory;
}
/**
* Sets a new handler for unexecutable tasks.
*
* @param handler the new handler
* @throws NullPointerException if handler is null
* @see #getRejectedExecutionHandler
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
if (handler == null)
throw new NullPointerException();
this.handler = handler;
}
/**
* Returns the current handler for unexecutable tasks.
*
* @return the current handler
* @see #setRejectedExecutionHandler(RejectedExecutionHandler)
*/
public RejectedExecutionHandler getRejectedExecutionHandler() {
return handler;
}
/**
* Sets the core number of threads. This overrides any value set
* in the constructor. If the new value is smaller than the
* current value, excess existing threads will be terminated when
* they next become idle. If larger, new threads will, if needed,
* be started to execute any queued tasks.
*
* @param corePoolSize the new core size
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* or {@code corePoolSize} is greater than the {@linkplain
* #getMaximumPoolSize() maximum pool size}
* @see #getCorePoolSize
*/
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
/**
* Returns the core number of threads.
*
* @return the core number of threads
* @see #setCorePoolSize
*/
public int getCorePoolSize() {
return corePoolSize;
}
/**
* Starts a core thread, causing it to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed. This method will return {@code false}
* if all core threads have already been started.
*
* @return {@code true} if a thread was started
*/
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
/**
* Same as prestartCoreThread except arranges that at least one
* thread is started even if corePoolSize is 0.
*/
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
/**
* Starts all core threads, causing them to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed.
*
* @return the number of threads started
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
/**
* Returns true if this pool allows core threads to time out and
* terminate if no tasks arrive within the keepAlive time, being
* replaced if needed when new tasks arrive. When true, the same
* keep-alive policy applying to non-core threads applies also to
* core threads. When false (the default), core threads are never
* terminated due to lack of incoming tasks.
*
* @return {@code true} if core threads are allowed to time out,
* else {@code false}
*
* @since 1.6
*/
public boolean allowsCoreThreadTimeOut() {
return allowCoreThreadTimeOut;
}
/**
* Sets the policy governing whether core threads may time out and
* terminate if no tasks arrive within the keep-alive time, being
* replaced if needed when new tasks arrive. When false, core
* threads are never terminated due to lack of incoming
* tasks. When true, the same keep-alive policy applying to
* non-core threads applies also to core threads. To avoid
* continual thread replacement, the keep-alive time must be
* greater than zero when setting {@code true}. This method
* should in general be called before the pool is actively used.
*
* @param value {@code true} if should time out, else {@code false}
* @throws IllegalArgumentException if value is {@code true}
* and the current keep-alive time is not greater than zero
*
* @since 1.6
*/
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
/**
* Sets the maximum allowed number of threads. This overrides any
* value set in the constructor. If the new value is smaller than
* the current value, excess existing threads will be
* terminated when they next become idle.
*
* @param maximumPoolSize the new maximum
* @throws IllegalArgumentException if the new maximum is
* less than or equal to zero, or
* less than the {@linkplain #getCorePoolSize core pool size}
* @see #getMaximumPoolSize
*/
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
/**
* Returns the maximum allowed number of threads.
*
* @return the maximum allowed number of threads
* @see #setMaximumPoolSize
*/
public int getMaximumPoolSize() {
return maximumPoolSize;
}
/**
* Sets the thread keep-alive time, which is the amount of time
* that threads may remain idle before being terminated.
* Threads that wait this amount of time without processing a
* task will be terminated if there are more than the core
* number of threads currently in the pool, or if this pool
* {@linkplain #allowsCoreThreadTimeOut() allows core thread timeout}.
* This overrides any value set in the constructor.
*
* @param time the time to wait. A time value of zero will cause
* excess threads to terminate immediately after executing tasks.
* @param unit the time unit of the {@code time} argument
* @throws IllegalArgumentException if {@code time} less than zero or
* if {@code time} is zero and {@code allowsCoreThreadTimeOut}
* @see #getKeepAliveTime(TimeUnit)
*/
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < 0)
interruptIdleWorkers();
}
/**
* Returns the thread keep-alive time, which is the amount of time
* that threads may remain idle before being terminated.
* Threads that wait this amount of time without processing a
* task will be terminated if there are more than the core
* number of threads currently in the pool, or if this pool
* {@linkplain #allowsCoreThreadTimeOut() allows core thread timeout}.
*
* @param unit the desired time unit of the result
* @return the time limit
* @see #setKeepAliveTime(long, TimeUnit)
*/
public long getKeepAliveTime(TimeUnit unit) {
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
/* User-level queue utilities */
/**
* Returns the task queue used by this executor. Access to the
* task queue is intended primarily for debugging and monitoring.
* This queue may be in active use. Retrieving the task queue
* does not prevent queued tasks from executing.
*
* @return the task queue
*/
public BlockingQueue<Runnable> getQueue() {
return workQueue;
}
/**
* Removes this task from the executor's internal queue if it is
* present, thus causing it not to be run if it has not already
* started.
*
* <p>This method may be useful as one part of a cancellation
* scheme. It may fail to remove tasks that have been converted
* into other forms before being placed on the internal queue.
* For example, a task entered using {@code submit} might be
* converted into a form that maintains {@code Future} status.
* However, in such cases, method {@link #purge} may be used to
* remove those Futures that have been cancelled.
*
* @param task the task to remove
* @return {@code true} if the task was removed
*/
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
/**
* Tries to remove from the work queue all {@link Future}
* tasks that have been cancelled. This method can be useful as a
* storage reclamation operation, that has no other impact on
* functionality. Cancelled tasks are never executed, but may
* accumulate in work queues until worker threads can actively
* remove them. Invoking this method instead tries to remove them now.
* However, this method may fail to remove tasks in
* the presence of interference by other threads.
*/
public void purge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {
// Take slow path if we encounter interference during traversal.
// Make copy for traversal and call remove for cancelled entries.
// The slow path is more likely to be O(N*N).
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
q.remove(r);
}
tryTerminate(); // In case SHUTDOWN and now empty
}
/* Statistics */
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
/**
* Returns the largest number of threads that have ever
* simultaneously been in the pool.
*
* @return the number of threads
*/
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
/**
* Returns the approximate total number of tasks that have ever been
* scheduled for execution. Because the states of tasks and
* threads may change dynamically during computation, the returned
* value is only an approximation.
*
* @return the number of tasks
*/
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
/**
* Returns the approximate total number of tasks that have
* completed execution. Because the states of tasks and threads
* may change dynamically during computation, the returned value
* is only an approximation, but one that does not ever decrease
* across successive calls.
*
* @return the number of tasks
*/
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
/**
* Returns a string identifying this pool, as well as its state,
* including indications of run state and estimated worker and
* task counts.
*
* @return a string identifying this pool, as well as its state
*/
public String toString() {
long ncompleted;
int nworkers, nactive;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
ncompleted = completedTaskCount;
nactive = 0;
nworkers = workers.size();
for (Worker w : workers) {
ncompleted += w.completedTasks;
if (w.isLocked())
++nactive;
}
} finally {
mainLock.unlock();
}
int c = ctl.get();
String runState =
isRunning(c) ? "Running" :
runStateAtLeast(c, TERMINATED) ? "Terminated" :
"Shutting down";
return super.toString() +
"[" + runState +
", pool size = " + nworkers +
", active threads = " + nactive +
", queued tasks = " + workQueue.size() +
", completed tasks = " + ncompleted +
"]";
}
/* Extension hooks */
/**
* Method invoked prior to executing the given Runnable in the
* given thread. This method is invoked by thread {@code t} that
* will execute task {@code r}, and may be used to re-initialize
* ThreadLocals, or to perform logging.
*
* <p>This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke {@code super.beforeExecute} at the end of
* this method.
*
* @param t the thread that will run task {@code r}
* @param r the task that will be executed
*/
protected void beforeExecute(Thread t, Runnable r) { }
/**
* Method invoked upon completion of execution of the given Runnable.
* This method is invoked by the thread that executed the task. If
* non-null, the Throwable is the uncaught {@code RuntimeException}
* or {@code Error} that caused execution to terminate abruptly.
*
* <p>This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke {@code super.afterExecute} at the
* beginning of this method.
*
* <p><b>Note:</b> When actions are enclosed in tasks (such as
* {@link FutureTask}) either explicitly or via methods such as
* {@code submit}, these task objects catch and maintain
* computational exceptions, and so they do not cause abrupt
* termination, and the internal exceptions are <em>not</em>
* passed to this method. If you would like to trap both kinds of
* failures in this method, you can further probe for such cases,
* as in this sample subclass that prints either the direct cause
* or the underlying exception if a task has been aborted:
*
* <pre> {@code
* class ExtendedExecutor extends ThreadPoolExecutor {
* // ...
* protected void afterExecute(Runnable r, Throwable t) {
* super.afterExecute(r, t);
* if (t == null
* && r instanceof Future<?>
* && ((Future<?>)r).isDone()) {
* try {
* Object result = ((Future<?>) r).get();
* } catch (CancellationException ce) {
* t = ce;
* } catch (ExecutionException ee) {
* t = ee.getCause();
* } catch (InterruptedException ie) {
* // ignore/reset
* Thread.currentThread().interrupt();
* }
* }
* if (t != null)
* System.out.println(t);
* }
* }}</pre>
*
* @param r the runnable that has completed
* @param t the exception that caused termination, or null if
* execution completed normally
*/
protected void afterExecute(Runnable r, Throwable t) { }
/**
* Method invoked when the Executor has terminated. Default
* implementation does nothing. Note: To properly nest multiple
* overridings, subclasses should generally invoke
* {@code super.terminated} within this method.
*/
protected void terminated() { }
/* Predefined RejectedExecutionHandlers */
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* A handler for rejected tasks that throws a
* {@link RejectedExecutionException}.
*
* This is the default handler for {@link ThreadPoolExecutor} and
* {@link ScheduledThreadPoolExecutor}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded. This policy is
* rarely useful in cases where other threads may be waiting for
* tasks to terminate, or failures must be recorded. Instead consider
* using a handler of the form:
* <pre> {@code
* new RejectedExecutionHandler() {
* public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
* Runnable dropped = e.getQueue().poll();
* if (dropped instanceof Future<?>) {
* ((Future<?>)dropped).cancel(false);
* // also consider logging the failure
* }
* e.execute(r); // retry
* }}}</pre>
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
}
-
總結
-
線程池的狀態
-
線程池的狀態runState這個不存在的變量來表示的。線程池創建之初,runState就是RUNNING,
這是通過
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
這句來設置的,初始化狀態是RUNNING,線程數是0狀態都是int類型,故它們的大小關系是RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
-
這幾種狀態之間的轉換關系是這樣的:
-
初始狀態是RUNNING,最終狀態是TERMINATED
-
在調用了
shutdown
方法后狀態會變成SHUTDOWN,調用了shutdownNow
方法后狀態變成shutdownNow,在這兩個方法中,都會 調用到tryTerminate
,當線程數==0是,狀態變成TIDYING;調用完terminated
后,變成最終狀態TERMINATED
+ SHUTDOWN和STOP的區別
這兩個狀態比較相近,但是它們還是很大區別的
1. SHUTDOWN會繼續執行任務隊列中的任務,STOP不會執行任務隊列中的任務。這個主要提交在`getTask`方法中
2. 在當前線程結束時,SHUTDOWN會根據情況去判斷是否新創建一個線程。STOP不會進行此操作。這個主要體現在`processWorkerExit`方法中
2. SHUTDOWN只會中斷當前沒有執行任務的線程(體現在`shutdown`方法中調用的是`interruptIdleWorkers`),而STOP會中斷所有的線程(體現在`shutdownNow`中調用的是`interruptWorkers`)
- 關於中斷
我們能看到線程池實現中大量使用了線程中斷。線程中斷的知識不多,但是很重要。具體可以看看我之前關於中斷的博文關於Thread的interrupt
線程池中中斷主要時配合runState狀態的變化,來設置對應線程的中斷,使對應線程能夠感知到對應中斷,做出對應的調整。
-
對於提交給線程池的任務
對於提交給線程池的任務,線程池是不會幫我們去停止的,線程池唯一能做的就是設置我們執行任務的線程中斷。我們我們的任務中可以通過會拋出中斷異常的方法或者手動判斷當前線程是否已經被中斷來響應線程池狀態的變化。
需要注意的第一是如果我們提交到線程池的任務一直結束不了,這會阻塞線程池關閉的
像下面的例子,由於我們提交的任務一直無法結束,且我們的任務沒有對中斷進行合適的響應,就會由於我們的任務一直在運行,阻止線程池結束。
import java.time.LocalDateTime; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class InterrruptDemo { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { while (true) { System.out.println(Thread.currentThread().getName() + " "+LocalDateTime.now()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread.sleep(1000); executorService.shutdownNow(); while (!executorService.isTerminated()) { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " 線程池當前沒有完全關閉"); } System.out.println(Thread.currentThread().getName() + " 線程池已經完全關閉"); } }
像下面這樣,就可以正確響應線程池狀態的變化,在線程池關閉的時候,結束我們的任務
import java.time.LocalDateTime; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class InterrruptDemo { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute(() -> { try { while (true) { System.out.println(Thread.currentThread().getName() + " " + LocalDateTime.now()); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread.sleep(1000); executorService.shutdownNow(); while (!executorService.isTerminated()) { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " 線程池當前沒有完全關閉"); } System.out.println(Thread.currentThread().getName() + " 線程池已經完全關閉"); } }