系列傳送門:
- Java並發包源碼學習系列:AbstractQueuedSynchronizer
- Java並發包源碼學習系列:CLH同步隊列及同步資源獲取與釋放
- Java並發包源碼學習系列:AQS共享式與獨占式獲取與釋放資源的區別
- Java並發包源碼學習系列:ReentrantLock可重入獨占鎖詳解
- Java並發包源碼學習系列:ReentrantReadWriteLock讀寫鎖解析
- Java並發包源碼學習系列:詳解Condition條件隊列、signal和await
- Java並發包源碼學習系列:掛起與喚醒線程LockSupport工具類
- Java並發包源碼學習系列:JDK1.8的ConcurrentHashMap源碼解析
- Java並發包源碼學習系列:阻塞隊列BlockingQueue及實現原理分析
- Java並發包源碼學習系列:阻塞隊列實現之ArrayBlockingQueue源碼解析
- Java並發包源碼學習系列:阻塞隊列實現之LinkedBlockingQueue源碼解析
- Java並發包源碼學習系列:阻塞隊列實現之PriorityBlockingQueue源碼解析
- Java並發包源碼學習系列:阻塞隊列實現之DelayQueue源碼解析
- Java並發包源碼學習系列:阻塞隊列實現之SynchronousQueue源碼解析
- Java並發包源碼學習系列:阻塞隊列實現之LinkedTransferQueue源碼解析
- Java並發包源碼學習系列:阻塞隊列實現之LinkedBlockingDeque源碼解析
- Java並發包源碼學習系列:基於CAS非阻塞並發隊列ConcurrentLinkedQueue源碼解析
ThreadPoolExecutor概述
線程池解決的優點
- 當執行大量異步任務時線程池能夠提供較好的性能,因為線程池中的線程是可復用的,不需要每次執行異步任務時都創建和銷毀線程。
- 提供資源限制和管理的手段,比如可以限制線程的個數,動態新增線程等等。
線程池處理流程
ThreadPoolExecutor執行execute時,流程如下:
- 如果當前運行的線程少於corePoolSize,則創建新線程來執行任務,這里需要加全局鎖。
- 如果運行的線程數>=corePoolSize,則將任務加入BlockingQueue。
- 如果此時BlockingQueue已滿,則創建新的線程來處理任務,這里也需要加全局鎖。
- 如果創建新線程將使當前運行的線程超出maximumPoolSize,則按照拒絕策略拒絕任務。
當然啦,這篇文章意在從源碼角度學習線程池這些核心步驟的具體實現啦,線程池概念性的東西,可以參考一些其他的博客:
創建線程池
創建線程池有幾種方法,一種是使用Executors工具類快速創建內置的幾種線程池,也可以自定義。
一、通過Executor框架的工具類Executors可以創建三種類型的ThreadPoolExecutor。
二、使用ThreadPoolExecutor的各種構造方法。
《阿里巴巴 Java 開發手冊》中:強制線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險
Executors 返回線程池對象的弊端如下:
- FixedThreadPool 和 SingleThreadExecutor : 允許請求的隊列長度為 Integer.MAX_VALUE ,可能堆積大量的請求,從而導致 OOM。
- CachedThreadPool 和 ScheduledThreadPool : 允許創建的線程數量為 Integer.MAX_VALUE ,可能會創建大量線程,從而導致 OOM。
本篇的重點就是這個ThreadPoolExecutor。
重要常量及字段
public class ThreadPoolExecutor extends AbstractExecutorService {
// 原子的Integer變量ctl,用於記錄線程池狀態【高3位】和線程池中線程個數【低29位】,這里假設Integer是32位的
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 其實並不是每個平台的Integer二進制都是32位的,實際上是,二進制位-3代表線程個數
private static final int COUNT_BITS = Integer.SIZE - 3;
// 線程最大個數【約5億】 低COUNT_BITS都是1 000 11111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 111 00000000000000000000000000000 高3位是 111
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000000000000000000000000000 高3位是 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000 高3位是 001
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000 高3位是 110
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000 高3位是 011
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 獲取高3位的運行狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 獲取低29位的線程個數
private static int workerCountOf(int c) { return c & CAPACITY; }
// 通過RunState和WorkCount計算ctl的值
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 線程池狀態變換是單調遞增的
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 只有RUNNING 是小於SHUTDOWN的
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// ...
// 阻塞隊列
private final BlockingQueue<Runnable> workQueue;
// 獨占鎖 同步保證
private final ReentrantLock mainLock = new ReentrantLock();
// 存放 線程池中的工作線程
private final HashSet<Worker> workers = new HashSet<Worker>();
// 條件隊列,線程調用awaitTermination時存放阻塞的線程
private final Condition termination = mainLock.newCondition();
// ...
// 繼承AQS和Runnable,任務線程
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{ /*.. */}
}
- ThreadPoolExecutor通過AtomicInteger類的變量ctl記錄線程池狀態和線程池中線程個數,這里以Integer為32為例。
- 高3位表示線程池的狀態,低29位表示線程個數,分別通過
runStateOf
和workerCountOf
計算。
線程池的五種狀態及轉換
- 線程池的狀態有五種,他們提供了線程池聲明周期的控制:
- RUNNING:能夠接收新任務,並且處理阻塞隊列里的任務。
- SHUTDOWN:拒絕新任務,但會處理阻塞隊列里的任務。
- STOP:拒絕新任務,並且拋棄阻塞隊列里的任務,同時會中斷正在處理的任務。
- TIDYING:所有任務都執行完后當前線程池workerCount為0,將調用terminated()這個鈎子方法。
- TERMINATED:終止狀態。terminated方法調用完成。
- 線程池的狀態是有規律的,保證單調遞增,但是不一定每個狀態都會經歷,比如有以下幾種轉換:
- RUNNING -> SHUTDOWN:可能是顯式調用了
shutdown()
方法,也可能在finalize()
里隱式調用。 - RUNNING或SHUTDOWN -> STOP:調用了
shutdownNow()
方法。 - SHUTDOWN -> TIDYING:隊列和線程池都為空的時候。
- STOP -> TIDYING:線程池為空的時候。
- TIDYING -> TERMINATED:鈎子方法
terminated()
調用完成的時候。
- RUNNING -> SHUTDOWN:可能是顯式調用了
由於awaitTermination()方法而阻塞在條件隊列中的線程將會在線程池TERMINATED的時候返回。
ThreadPoolExecutor構造參數及參數意義
ThreadPoolExecutor方法的構造參數有很多,我們看看最長的那個就可以了:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize
:核心線程數定義了最小可以同時運行的線程數量。maximumPoolSize
:當隊列中存放的任務達到隊列容量的時候,當前可以同時運行的線程數量變為最大線程數。【如果使用的無界隊列,這個參數就沒啥效果】workQueue
: 阻塞隊列,當新任務來的時候會先判斷當前運行的線程數量是否達到核心線程數,如果達到核心線程數的話,新任務就會被存放在隊列中。keepAliveTime
:當線程池中的線程數量大於corePoolSize
的時候,如果這時沒有新的任務提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了keepAliveTime
才會被回收銷毀。unit
:keepAliveTime
的時間單位。threadFactory
:用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字,默認使用Executors的靜態內部類DefaultThreadFactory
。handler
:飽和策略,當前同時運行的線程數量達到最大線程數量【maximumPoolSize
】並且隊列也已經被放滿時,執行飽和策略。
關於各個參數的意義,強烈推薦這篇博客:閃客sun : 圖解 | 原來這就是線程池
Work類
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 具體執行任務的線程 */
final Thread thread;
/** 執行的第一個任務 */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 線程啟動時,執行runWorker方法 */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 不可重入的,state = 1表示已獲取
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// state = 0 表示鎖未被獲取
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 如果線程啟動,則中斷線程 state只有初始化的時候才是-1,其他的時間都是滿足>=0的
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker繼承了AQS和Runnable接口,是具體承載任務的對象。
基於AQS,Worker實現了不可重入的獨占鎖,state == 0 表示鎖未被獲取,state == 1表示鎖已經被獲取, state == -1為初始狀態。
firstTask記錄該工作線程執行的第一個任務,thread是執行任務的線程。
interruptIfStarted()
方法會在shutdownNow中調用,意在中斷Worker線程,state初始化為-1,是不滿足getState條件的。
void execute(Runnable command)
execute方法就是向線程池提交一個command任務進行執行。
public void execute(Runnable command) {
// 提交任務為null, 拋出空指針異常
if (command == null)
throw new NullPointerException();
// 獲取當前ctl的值 : 線程池狀態 + 線程個數
int c = ctl.get();
// 如果當前線程池中線程個數小於核心線程數corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 通過addWorker新建一個線程,然后,啟動該線程從而執行任務
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果線程池處於RUNNING狀態,則添加任務到阻塞隊列
if (isRunning(c) && workQueue.offer(command)) {
// double-check
int recheck = ctl.get();
// 如果線程池不是處於RUNNING, 則從隊列中移除任務
if (! isRunning(recheck) && remove(command))
// 並執行拒絕策略
reject(command);
// 如果當前線程個數為0, 則添加一個線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果隊列滿,則新增線程,新增失敗則執行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
- 如果線程池當前線程數小於corePoolSize,則調用addWorker創建新線程執行任務,成功則直接返回。
- 如果線程池處於RUNNING狀態,則添加任務到阻塞隊列,如果添加成功,進行double-check,檢測出當前不是RUNNING,則進行移除操作,並執行拒絕策略。否則添加一個線程,確保有線程可以執行。
- 如果線程池不是處於RUNNING或加入阻塞隊列失敗,並采取拒絕策略。
boolean addWorker(firstTask, core)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 檢測隊列是否只在必要時為空
// 等價為:下面幾種情況返回false
/* if (rs >= SHUTDOWN && rs 為STOP TIDYING TERMINATED時返回false
(rs != SHUTDOWN || rs不為SHUTDOWN
firstTask != null || rs為SHUTDOWN 但 已經有了第一個任務
workQueue.isEmpty())) rs為SHUTDOWN 並且任務隊列為空
*/
if (rs >= SHUTDOWN && //
! (rs == SHUTDOWN && //
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 循環, 通過CAS操作來增加線程個數
for (;;) {
int wc = workerCountOf(c);
// 線程個數如果超過限制,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS增加線程個數,操作成功跳出循環break
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS失敗,檢測線程狀態是否發生了變化,如果發生變化,則跳到retry外層循環重新嘗試
// 否則在內層循環重新CAS
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 走到這代表CAS操作已經成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創建worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 獨占鎖保證同步
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 重新檢查線程池狀態,以避免在獲取鎖前調用了shutdown接口
int rs = runStateOf(ctl.get());
// 1. 線程池處於RUNNING
// 2. 線程池處於SHUTDOWN 並且firstTask為null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果t已經啟動
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加任務
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加任務成功后, 執行任務
if (workerAdded) {
t.start(); // 執行
workerStarted = true;
}
}
} finally {
// 任務未執行成功
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
主要分為兩步:
- 雙重循環通過CAS操作增加線程數。
- 使用全局的獨占鎖來控制:將並發安全的任務添加到works里,並啟動。
final void runWorker(Worker w)
用戶線程提交任務到線程池后,由Worker執行,通過while循環不斷地從工作隊列里獲取任務執行。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// Worker啟動執行runWorker
public void run() {
runWorker(this);
}
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // state設置為0, 允許中斷
boolean completedAbruptly = true;
try {
// 如果task不為null 或者 task為null 但是 getTask從任務隊列獲取的任務不為null
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果線程池當前STOP,則確保線程是中斷狀態
// 如果不是STOP,確保線程沒有被中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 任務執行之前的hook方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 執行任務
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任務執行之后的hook方法
afterExecute(task, thrown);
}
} finally {
task = null;
// 統計當前的Worker完成的任務數量
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 清理工作
processWorkerExit(w, completedAbruptly);
}
}
Runnable getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 循環
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 線程池狀態 >= SHUTDOWN && 工作隊列為空
// 2. 線程池狀態 >= STOP
// 兩種情況,都直接數量 -1 , 返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 工作線程的數量
int wc = workerCountOf(c);
// 需否需要超時控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1. 工作線程的數量超過了maximumPoolSize 或者 需要超時控制,且poll出為null,就是沒拿到
//2. 工作線程數量 > 1 或者 工作隊列為空
// 兩者都滿足, 則數量 -1 , 返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 從工作隊列里取出任務
Runnable r = timed ?
// keepAliveTime時間內還沒有獲取到任務, 繼續循環
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
void processWorkerExit(w, completedAbruptly)
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// completedAbruptly 為true表示用戶線程運行異常,需要wc - 1
// 否則是不需要處理的,在getTask中已經處理過了
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
// 統計線程池完成的任務個數, 從workers中移除當前worker
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
//如果當前線程池狀態為SHUTDOWN且工作隊列為空,
//或者STOP狀態但線程池里沒有活動線程,則設置線程池狀態為TERMINATED。
tryTerminate();
int c = ctl.get();
// 如果線程池為 RUNNING 或SHUTDOWN 表示,tryTerminate()沒有成功
// 判斷是否需要新增一個線程,如果workerCountOf(c) < min 新增一個線程
if (runStateLessThan(c, STOP)) {
// 表示正常退出
if (!completedAbruptly) {
// min 默認是corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 如果workerCountOf(c) < min 新增一個線程
addWorker(null, false);
}
}
void shutdown()
SHUTDOWN : 拒絕新任務但是處理阻塞隊列里的任務。
調用該方法之后,線程池不再接收新任務,但是工作隊列里的任務還需要處理。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 檢查權限,判斷當前調用shutdown的線程是否擁有關閉線程的權限
checkShutdownAccess();
// 設置線程池狀態為SHUTDOWN
advanceRunState(SHUTDOWN);
// 設置中斷標志
interruptIdleWorkers();
// 鈎子方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 嘗試設置線程池狀態為TERMINATED
tryTerminate();
}
void advanceRunState(int targetState)
// 設置線程池狀態為SHUTDOWN
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
// 當前的狀態已經是SHUTDOWN了就直接break返回,如果不是就CAS設置一下
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
void interruptIdleWorkers()
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
// onlyOne如果不傳,默認為false
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍歷所有的Worker
for (Worker w : workers) {
Thread t = w.thread;
// 如果工作線程沒有被中斷 且 獲取Worker的鎖成功,則設置中斷標志
// 這里:獲取鎖成功代表,設置的是沒有在執行任務的線程,因為
// 正在執行任務的線程是已經獲取了鎖的,你tryLock不會成功的
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 只用設置一個
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
final void tryTerminate()
如果當前線程池狀態為SHUTDOWN且工作隊列為空,或者STOP狀態但線程池里沒有活動線程,則設置線程池狀態為TERMINATED。
final void tryTerminate() {
// 循環
for (;;) {
int c = ctl.get();
// 如果RUNNING TIDYING TERMINATED
// 如果SHUTDOWN 且任務隊列不為空,還需要處理queue里的任務
// 就不需要下面的操作了, 直接返回好了
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// STOP 但 線程池里還有活動線程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// CAS設置rs為TIDYING,且wc為0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 鈎子方法
terminated();
} finally {
// terminated() 完成之后, 就設置為TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 激活所有因為await等待的線程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
List(Runnable) shutdownNow()
STOP:拒絕新任務並且拋棄任務隊列里的任務,同時會中斷正在處理的任務。
調用該方法后,將線程池狀態設置為STOP,拒絕新任務並且拋棄任務隊列里的任務,同時會中斷正在處理的任務,返回隊列里被丟棄的任務列表。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 檢查權限
checkShutdownAccess();
// 設置為STOP
advanceRunState(STOP);
// 設置中斷標志
interruptWorkers();
// 將隊列任務移到tasks中
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
void interruptWorkers()
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 如果線程啟動,則中斷線程【正在執行 + 空閑的所有線程都會被中斷】
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
boolean awaitTermination(timeout, unit)
當該方法被調用時,當前線程會被阻塞,直到超時時間到了,返回false。或者線程池狀態為TERMINATED時,返回true。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 線程池狀態為TERMINATED 返回true
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
// 超時了, 返回false
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
參考閱讀
-
《Java並發編程之美》
-
《Java並發編程的藝術》