碎碎念
關於JDK源碼相關的文章這已經是第四篇了,原創不易,粉絲從幾十人到昨天的666
人,真的很感謝之前幫我轉發文章的一些朋友們。
從16年開始寫技術文章,到現在博客園已經發表了222
篇文章,大多數都是原創,共有800多粉絲,基本上每個月都會有文章的產出。
回顧這幾年以來寫作的心路歷程,一直都是偷偷的寫,偷偷的發,害怕被人知道,怕被人罵文章寫的太水(之前心理太脆弱了,哈哈)。后面和cxuan聊過后,他建議我給他投稿試試,於是就有了那一篇的萬字的AQS
文章。
最近也有好多讀者加到我的微信,問一些文章中的問題,我也都會認真解答,看到有人閱讀我的文章並有所收獲,我真的挺欣慰,這就是寫作的動力吧。
幫助別人的同時也是在幫助自己,自己學的技術和理解的內容都是有局限性的。通過寫文章結識到了很多朋友,聽聽別人的分析和見解,我也能學到很多。
每次看到博客中有人留言都很激動,也會第一時間去回復。感謝下面的公眾號大佬們之前無私的幫助,大家也可以關注一下他們,都是很nice
的大佬:
Java建設者、Java團長、程序猿石頭、碼象全棧、Java3y、JAVA小咖秀、Bella的技術輪子、石杉的架構筆記、武培軒、程序通事
前言
Java中的線程池已經不是什么神秘的技術了,相信在看的讀者在項目中也都有使用過。關於線程池的文章也是數不勝數,我們站在巨人的肩膀上來再次梳理一下。
本文還是保持原有的風格,圖文解析,盡量做到多畫圖!全文共20000+字,建議收藏后細細品讀,閱讀期間搭配源碼食用效果更佳!
讀完此文你將學到:
ThreadPoolExecutor
中常用參數有哪些?ThreadPoolExecutor
中線程池狀態和線程數量如何存儲的?ThreadPoolExecutor
有哪些狀態,狀態之間流轉是什么樣子的?ThreadPoolExecutor
任務處理策略?ThreadPoolExecutor
常用的拒絕策略有哪些?Executors
工具類提供的線程池有哪些?有哪些缺陷?ThreadPoolExecutor
核心線程池中線程預熱功能?ThreadPoolExecutor
中創建的線程如何被復用的?ThreadPoolExecutor
中關閉線程池的方法shutdown
與shutdownNow
的區別?ThreadPoolExecutor
中存在的一些擴展點?ThreadPoolExecutor
支持動態調整核心線程數、最大線程數、隊列長度等一些列參數嗎?怎么操作?
本文源碼基於JDK1.8
線程池基本概念
線程池是一種池化思想的產物,如同我們數據庫有連接池、Java中的常量池。線程池可以幫助我們管理線程、復用線程,減少線程頻繁新建、銷毀等帶來的開銷。
在Java中是通過ThreadPoolExecutor
類來創建一個線程池的,一般我們建議項目中自己去定義線程池,不推薦使用JDK
提供的工具類Executors
去構建線程池。
查看阿里巴巴開發手冊中也有對線程池的一些建議:
【強制】
創建線程或線程池時請指定有意義的線程名稱,方便出錯時回溯。
正例:
自定義線程工廠,並且根據外部特征進行分組,比如,來自同一機房的調用,把機房編號賦值給whatFeaturOfGroup
public class UserThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger(1);
UserThreadFactory(String whatFeaturOfGroup) {
namePrefix = "From UserThreadFactory's " + whatFeaturOfGroup + "-Worker-";
}
@Override
public Thread newThread(Runnable task) {
String name = namePrefix + nextId.getAndIncrement();
Thread thread = new Thread(null, task, name, 0, false);
System.out.println(thread.getName());
return thread;
}
}
【強制】
線程資源必須通過線程池提供,不允許在應用中自行顯式創建線程。
說明:線程池的好處是減少在創建和銷毀線程上所消耗的時間以及系統資源的開銷,解決資源不足的問題。
如果不使用線程池,有可能造成系統創建大量同類線程而導致消耗完內存或者“過度切換”的問題。
【強制】
線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這
樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
說明:Executors 返回的線程池對象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
2) CachedThreadPool:
允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。
線程池使用示例
下面是一個自定義的線程池,這是之前公司在用的一個線程池,修改其中部分屬性和備注做脫敏處理:
public class MyThreadPool {
static final Logger LOGGER = LoggerFactory.getLogger(MyThreadPool.class);
private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
private static final String THREAD_POOL_NAME = "MyThreadPool-%d";
private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
.daemon(true).build();
private static final int DEFAULT_SIZE = 500;
private static final long DEFAULT_KEEP_ALIVE = 60L;
private static ExecutorService executor;
private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE);
static {
try {
executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT + 2, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, executeQueue, FACTORY);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("MyThreadPool shutting down.");
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
LOGGER.error("MyThreadPool shutdown immediately due to wait timeout.");
executor.shutdownNow();
}
} catch (InterruptedException e) {
LOGGER.error("MyThreadPool shutdown interrupted.");
executor.shutdownNow();
}
LOGGER.info("MyThreadPool shutdown complete.");
}
}));
} catch (Exception e) {
LOGGER.error("MyThreadPool init error.", e);
throw new ExceptionInInitializerError(e);
}
}
private MyThreadPool() {
}
public static boolean execute(Runnable task) {
try {
executor.execute(task);
} catch (RejectedExecutionException e) {
LOGGER.error("Task executing was rejected.", e);
return false;
}
return true;
}
public static <T> Future<T> submitTask(Callable<T> task) {
try {
return executor.submit(task);
} catch (RejectedExecutionException e) {
LOGGER.error("Task executing was rejected.", e);
throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
}
}
}
這里主要就是使用調用ThreadPoolExecutor
構造函數來構造一個線程池,指定自定義的ThreadFactory
,里面包含我們自己線程池的poolName
等信息。重寫里面的execute()
和submitTask()
方法。 添加了系統關閉時的鈎子函數shutDownHook()
,在里面調用線程池的shutdown()
方法,使得系統在退出(使用ctrl c或者kill -15 pid)時能夠優雅的關閉線程池。
如果有看不懂的小伙伴也沒有關系,后面會詳細分析ThreadPoolExecutor
中的源碼,相信看完后面的代碼再回頭來看這個用例 就完全是小菜一碟了。
線程池實現原理
通過上面的示例代碼,我們需要知道創建線程池時幾個重要的屬性:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
corePoolSize
: 線程池核心線程數量
maximumPoolSize
: 線程池最大線程數量
workQueue
: 線程池中阻塞隊列,一般指定隊列大小
線程池中數據模型可以簡化成下圖所示,其中Thread
應該是添加的一個個Worker
,這里標注的Thread
是為了方便理解:
線程池中提交一個任務具體執行流程如下圖:
提交任務時,比較當前線程池中線程數量和核心線程數的大小,根據比較結果走不同的任務處理策略,這個下面會有詳細說明。
線程池中核心方法調用鏈路:
TheadPoolExecutor源碼初探
TheadPoolExecutor
中常用屬性和方法較多,我們可以先分析下這些,然后一步步往下深入,常用屬性和方法如下:
具體代碼如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
}
- ctl
ctl
代表當前線程池狀態和線程池線程數量的結合體,高3位標識當前線程池運行狀態,后29位標識線程數量。ctlOf方法就是rs(線程池運行狀態)和wc(線程數量)按位或操作
- COUNT_BITS
COUNT_BITS = Integer.SIZE - 3 = 29,在ctl中,低29位用於存放當前線程池中線程的數量
- CAPACITY
CAPACITY = (1 << COUNT_BITS) - 1
我們來計算一下:
1 << 29 = 0010 0000 0000 0000 0000 0000 0000 0000
(1 << 29) - 1 = 0001 1111 1111 1111 1111 1111 1111 1111
這個屬性是用來線程池能裝載線程的最大數量,也可以用來做一些位運算操作。
- 線程池幾種狀態
RUNNING:
(1) 狀態說明:線程池處在RUNNING狀態時,能夠接收新任務,以及對已添加的任務進行處理。
(2) 狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被創建,就處於RUNNING狀態,並且線程池中的任務數為0
SHUTDOWN:
(1) 狀態說明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。
(2) 狀態切換:調用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN
STOP:
(1) 狀態說明:線程池處在STOP狀態時,不接收新任務,不處理已添加的任務,並且會中斷正在處理的任務。
(2) 狀態切換:調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP
TIDYING:
(1) 狀態說明:當所有的任務已終止,ctl記錄的"任務數量"為0,線程池會變為TIDYING狀態。當線程池變為TIDYING狀態時,會執行鈎子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變為TIDYING時,進行相應的處理;可以通過重載terminated()函數來實現。
(2) 狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列為空並且線程池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。
當線程池在STOP狀態下,線程池中執行的任務為空時,就會由STOP -> TIDYING
TERMINATED:
(1) 狀態說明:線程池徹底終止,就變成TERMINATED狀態。
(2) 狀態切換:線程池處在TIDYING狀態時,執行完terminated()之后,就會由 TIDYING -> TERMINATED
狀態的變化流轉:
- runStateOf()
計算線程池運行狀態的,就是計算ctl前三位的數值。`unStateOf() = c & ~CAPACITY,CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111,那么~CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000,它與任何數按位與的話都是只看這個數前三位
- workerCountOf()
計算線程池的線程數量,就是看ctl的后29位,workerCountOf() = c & CAPACITY, CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111與任何數按位與,就是看這個數的后29位
- ctlOf(int rs, int wt)
在獲取當前線程池ctl的時候會用到,在后面源碼中會有很多地方調用, 傳遞的參數rs代表線程池狀態,wt代表當前線程次線程(worker)的數量
- runStateLessThan(int c, int s)
return c < s,c一般傳遞的是當前線程池的ctl值。比較當前線程池ctl所表示的狀態是否小於某個狀態s
- runStateAtLeast(int c, int s)
return c >= s,c一般傳遞的是當前線程池的ctl值。比較當前線程池ctl所表示的狀態,是否大於等於某個狀態s
- isRunning(int c)
c < SHUTDOWN, 判斷當前線程池是否是RUNNING狀態,因為只有RUNNING的值小於SHUTDOWN
- compareAndIncrementWorkerCount()/compareAndDecrementWorkerCount()
使用CAS方式 讓ctl值分別加一減一 ,成功返回true, 失敗返回false
- decrementWorkerCount()
將ctl值減一,這個方法用了do...while循環,直到成功為止
- completedTaskCount
記錄線程池所完成任務總數 ,當worker退出時會將 worker完成的任務累積到completedTaskCount
- Worker
線程池內部類,繼承自AQS且實現Runnable接口。Worker內部有一個Thread thread是worker內部封裝的工作線程。Runnable firstTask用來接收用戶提交的任務數據。在初始化Worker時候會設置state為-1(初始化狀態),通過threadFactory創建一個線程。
- ThreadPoolExecutor初始化參數
corePoolSize: 核心線程數限制
maximumPoolSize: 最大線程限制
keepAliveTime: 非核心的空閑線程等待新任務的時間 unit: 時間單位。配合allowCoreThreadTimeOut也會清理核心線程池中的線程。
workQueue: 任務隊列,最好選用有界隊列,指定隊列長度
threadFactory: 線程工廠,最好自定義線程工廠,可以自定義每個線程的名稱
handler: 拒絕策略,默認是AbortPolicy
execute()源碼分析
當有任務提交到線程池時,就會直接調用ThreadPoolExecutor.execute()
方法,執行流程如下:
從流程圖可看,添加任務會有三個分支判斷,源碼如下:
java.util.concurrent.ThreadPoolExecutor.execute()
:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
c
在這里代表線程池ctl
的值,包含工作任務數量以及線程池的狀態,上面有解釋過。
接着看下面幾個分支代碼:
分支一: if (workerCountOf(c) < corePoolSize)
,條件成立表示當前線程數量小於核心線程數,此次提交任務,直接創建一個新的worker
。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
如果線程數小於核心線程數,執行addWorker
操作,這個后面會講這個方法的細節,如果添加成功則直接返回,失敗后會重新計算ctl
的值,然后執行分支二。
針對addWorker()
執行失敗的情況,有以下幾種可能:
- 存在並發情況,
execute()
方法是可能有多個線程同時調用的,當多個線程同時workerCountOf(c) < corePoolSize成立后,就會向線程池中創建worker
,這個時候線程池的核心線程數可能已經達到,在addWorker
中還會再次判斷,所以會有任務添加失敗。
- 當前線程池狀態發生改變,例如線程A執行
addWorker()
方法時,線程B修改線程池狀態,導致線程池不是RUNNING
狀態,此時線程A執行addWorker()
就有可能失敗。
分支二: if (isRunning(c) && workQueue.offer(command)) {}
通過分支一流程的分析,我們可以知道執行到這個分支說明**當前線程數量已經達到corePoolSize
或者addWorker()
執行失敗,我們先看看分支二執行流程:
首先判斷當前線程池是否處於RUNNING
狀態,如果是則嘗試將task
放入到workQueue
中,workQueue
是我們在初始化ThreadPoolExecutor
時傳入進來的阻塞隊列。
如果當前任務成功添加到阻塞隊列中,再次獲取ctl
賦值給recheck
變量,然后執行:
if (!isRunning(recheck) && remove(command))
reject(command);
再次判斷當前線程池是否為RUNNINT
狀態,如果不是則說明提交任務到隊列之后,線程池狀態被其他線程給修改了,比如調用shutdown()/shutdownNow()
等。這種情況就需要把剛剛提交到隊列中的的任務刪除掉。
再看下remove()方法:
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate();
return removed;
}
如果任務提交到隊列之后,線程池中的線程還未將這個任務消費,那么就可以remove
成功,調用reject()
方法來執行拒絕策略。
如果在改變線程池狀態之前,隊列中的數據已經被消費了,此時remove()
就會失敗。
接着走else if
中的邏輯:
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
走這個else if
邏輯有兩種可能,線程池是RUNNING
狀態或者線程池狀態被改變且workQueue
中添加的任務已經被消費導致remove()
失敗。
如果是RUNNING
狀態,線程池中的線程數量是0,此時workQueue
中還有待執行的任務,就需要新增一個worker
(addWorker
里面會有創建線程的操作),繼續消費workqueue
中的任務。
這里要注意一下addWorker(null, false)
,也就是創建一個線程,但並沒有傳入任務,因為任務已經被添加到workQueue
中了,所以worker
在執行的時候,會直接從workQueue
中獲取任務。在workerCountOf(recheck) == 0
時執行addWorker(null, false)
也是為了保證線程池在RUNNING
狀態下必須要有一個線程來執行任務,可以理解為一種擔保兜底機制。
至於線程池中線程為何可以為0?這個如果我們設置了allowCoreThreadTimeOut=true
,那么核心線程也是允許被回收的,后面getTask()
中代碼有提及。
分支三: else if (!addWorker(command, false)) {}
通過分支一和分之二的分析,進入這個分支的前置條件:線程數超過核心線程數且workQueue
中數據已滿。
else if (!addWorker(command, false))
,執行添加worker
操作,如果執行失敗就直接走reject()
拒絕策略。這里添加失敗可能是線程數已經超過了maximumPoolSize
。
addWorker()源碼分析
上面分析提交任務的方法execute()
時多次用到addWorker
方法,接收任務后將任務添加到Worker
中。
Worker
是ThreadPoolExecutor
中的內部類,繼承自AQS
且實現了Runnable
接口。 類中包含Thread thread
,它是worker
內部封裝的工作線程,還有firstTask
屬性,它是一個可執行的Runnable
對象。在Worker
的構造函數中,使用線程工廠創建了一個線程,當thread
啟動的時候,會以worker.run()
為入口啟動線程,這里會直接調用到runWorker()
中。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
流程如下圖:
這里再回頭看下addWorker(Runnable firstTask, boolean core) 方法,這個方法主要是添加一個Worker
到線程池中並執行,firstTask
參數用於指定新增的線程執行的第一個任務,core
參數為true表示在新增線程時會判斷當前活動線程數是否少於corePoolSize
,false
表示在新增線程時會判斷當前活動線程數是否少於maximumPoolSize
addWorker方法整體執行流程圖如下:
接着看下源碼:
java.util.concurrent.ThreadPoolExecutor.addWorker()
:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
這里是有兩層for
循環,外層循環主要是判斷線程池的狀態,如果狀態不合法就直接返回false
.
只有兩種情況屬於合法狀態:
RUNNING
狀態SHUTDOWN
狀態時,隊列中還有未處理的任務,且提交的任務為空。SHUTDOWN
含義就是不再接收新任務,可以繼續處理阻塞隊列的任務。
第二層循環是通過CAS
操作更新workCount
數量,如果更新成功則往線程池中中添加線程,這個所謂的線程池就是一個HashSet
數組。添加失敗時判斷失敗原因,CAS
失敗有兩種原因:線程池狀態被改變或者並發情況修改線程池中workCount
數量,這兩種情況都會導致ctl
值被修改。如果是第二種原因導致的失敗,繼續自旋更新workCount
數量。
接着繼續分析循環內部的實現,先看看第一層循環:c
代表線程池ctl
值,rs
代表線程池運行狀態。
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
條件一:rs >= SHUTDOWN
成立, 說明當前線程池狀態不是RUNNING
狀態
條件二: !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
我們之前提到過,創建任務有兩種情況:
1)RUNNING
狀態可以提交任務,
2)SHUTDOWN
狀態下如果傳遞的任務是空且阻塞隊列中還有任務未處理的情況才是允許創建任務繼續處理的,因為阻塞隊列中的任務仍然需要繼續處理。
上面的條件一和條件二就是處理SHUTDOWN
狀態下任務創建操作的判斷。
接着分析第二層循環,先是判斷線程池workCount
數量是否大於可創建的最大值,或者是否超過了核心線程數/最大線程數,如果是則直接返回,addWorker()
操作失敗。
接着使用compareAndIncrementWorkerCount(c)
將線程池中workCount+1
,這里使用的是CAS
操作,如果成功則直接跳出最外層循環。
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
如果CAS
失敗,說明此時有競爭,會重新獲取ctl
的值,判斷競爭失敗的原因是添加workCount
數量還是修改線程池狀態導致的,如果線程池狀態未發生改變,就繼續循環嘗試CAS
增加workCount
數量,接着看循環結束后邏輯:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
這里workerStarted
代表worker
是否已經啟動,workerAdded
代表創建的worker
是否添加到池子中,這里所謂的池子就是全局定義的一個HashSet
結構的workers
變量。
接着根據傳遞的firstTask
來構建一個Worker
,在Worker
的構造方法中也會通過ThreadFactory
創建一個線程,這里判斷t != null
是因為用戶可以自定義ThreadFactory
,如果這里用戶不是創建線程而是直接返回null
則會出現一些問題,所以需要判斷一下。
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
}
在往池子中添加Worker
的時候,是需要先加鎖的,因為針對全局的workers
操作並不是線程安全的。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
繼續看下面代碼,rs
代表當前線程池的狀態,這里還是判斷線程池的狀態,如果rs < SHUTDOWN
代表線程池狀態是RUNNING
狀態,此時可以直接操作。
如果是SHUTDOWN
狀態,需要滿足firstTask == null
才可以繼續操作。因為在SHUTDOWN
狀態時不會再添加新的任務,但還是可以繼續處理workQueue
中的任務。
t.isAlive()
當線程start
后,線程isAlive
會返回true
,這里還是防止自定義的ThreadFactory
創建線程返回給外部之前,將線程start
了,由此可見Doug lea
考慮問題真的很全面。
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
}
接着將創建的Worker
添加到workers
集合中,設置largestPoolSize
,這個屬性是線程池生命周期內線程數最大值,一般是做統計數據用的。 最后修改workerAdded = true
,代表當前提交的任務所創建的Worker
已經添加到池子中了。
添加worker
成功后,調用線程的start()
方法啟動線程,因為Worker
中重寫了run()
方法,最后會執行Worker.run()
。最后設置workerStarted = true
后釋放全局鎖。
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
orkerAdded = true;
}
這里再回頭看看workerAdded = false
的情形,如果線程池在lock
之前,狀態發生了變化,導致添加失敗。此時workerAdded
也會為false
,最后執行addWorkerFailed(work)
操作,這個方法是將Work
從workers
中移除掉,然后將workCount
數量減一,最后執行tryTerminate(
)來嘗試關閉線程池,這個方法后面會細說。
runWorker()源碼分析
在Worker
類中的run
方法調用了runWorker
來執行任務。上面addWorker()
方法正常的執行邏輯會創建一個Worker
,然后啟動Worker
中的線程,這里其實就會執行到runWorker
方法。
runWorker
的執行邏輯很簡單,啟動一個線程,執行當前傳遞的task
任務,執行完后又不斷的從workQueue
中獲取任務繼續執行,如果當前workCount
數量小於核心線程數且隊列中沒有了任務,當前線程會被阻塞,這個就是getTask()
的邏輯,一會會講到。
如果當前線程數大於核心線程數且隊列中沒有任務,就會返回null
,在runWorker
這邊退出循環,回收多余的worker
數據。
源碼如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
這里w.unlock()
是為了初始化當前Work
中state==0
,然后設置獨占線程為null
,因為在shutDown()
方法中會嘗試獲取Worker
中的鎖,如果獲取成功代表當前線程沒有被加鎖處於空閑狀態,給當前線程一個中斷信號。所以這里在執行線程任務的時候需要加鎖,防止調用shutDown()
的時候給當前worker
線程一個中斷信號。
判斷task
是否為空,如果是一個空任務,那么就去workQueue
中獲取任務,如果兩者都為空就會退出循環。
while (task != null || (task = getTask()) != null) {}
最核心的就是調用task.run()
啟動當前任務,這里面還有兩個可擴展的方法,分別是beforeExecute()/afterExecute(),我們可以在任務執行前和執行后分別自定義一些操作,其中afterExecute()
可以接收到任務拋出的異常信息,方便我們做后續處理。
while (task != null || (task = getTask()) != null) {
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
如果退出循環,說明getTask()
方法返回null
。會執行到finally
中的processWorkerExit(w, completedAbruptly)
方法,此方法是用來清理線程池中添加的work
數據,completedAbruptly=true
代表是異常情況下退出。
try {
while (task != null || (task = getTask()) != null) {
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
runWorker()
中只是啟動了當前線程工作,還需要源源不斷通過getTask()
方法從workQueue
來獲取任務執行。在workQueue
沒有任務的時候,根據線程池workCount
和核心線程數的對比結果來使用processWorkerExit()
執行清理工作。
getTask()源碼分析
getTask
方法用於從阻塞隊列中獲取任務,如果當前線程小於核心線程,那么當阻塞隊列中沒有任務時就會阻塞,反之會等待keepAliveTime
后返回。
這個就是keepAliveTime
的使用含義:非核心的空閑線程等待新任務的時間,當然如果這里設置了allowCoreThreadTimeOut=true
也會回收核心線程。
具體代碼如下:
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
這里核心代碼就是從workQueue
中取任務,采用poll
還是take
取決於allowCoreThreadTimeOut
和線程數量,allowCoreThreadTimeOut
在構造ThreadLocalExecutor
后設置的,默認為false。如果設置為true
則代表核心線程數下的線程也是可以被回收的。如果使用take
則表明workQueue
中沒有任務當前線程就會被阻塞掛起,直到有了新的任務才會被喚醒。
在這里擴展下阻塞隊列的部分方法的含義,這里主要是看poll()
和take()
的使用區別:
阻塞隊列插入方法:
boolean add(E e):隊列沒有滿,則插入數據並返回true;隊列滿時,拋出異常 java.lang.IllegalStateException: Queue full。
boolean offer(E e):隊列沒有滿,則插入數據並返回true;隊列滿時,返回false。
void put(E e):隊列沒有滿,則插入數據;隊列滿時,阻塞調用此方法線程,直到隊列有空閑空間時此線程進入就緒狀態。
boolean offer(E e, long timeout, TimeUnit unit):隊列沒有滿,插入數據並返回true;隊列滿時,阻塞調用此方法線程,若指定等待的時間內還不能往隊列中插入數據,返回false。
阻塞隊列移除(獲取)方法:
E remove():隊列非空,則以FIFO原則移除數據,並返回該數據的值;隊列為空,拋出異常 java.util.NoSuchElementException。
E poll(): 隊列非空,移除數據,並返回該數據的值;隊列為空,返回null。
E take(): 隊列非空,移除數據,並返回該數據的值;隊列為空,阻塞調用此方法線程,直到隊列為非空時此線程進入就緒狀態。
E poll(long timeout, TimeUnit unit):隊列非空,移除數據,並返回該數據的值;隊列為空,阻塞調用此方法線程,若指定等待的時間內隊列都沒有數據可取,返回null。
阻塞隊列檢查方法:
E element(): 隊列非空,則返回隊首元素;隊列為空,拋出異常 java.util.NoSuchElementException。
E peek(): 隊列非空,則返回隊首元素;隊列為空,返回null。
processWorkerExit()源碼分析
此方法的含義是清理當前線程,從線程池中移除掉剛剛添加的worker
對象。
執行processWorkerExit()
代表在runWorker()
線程跳出了當前循環,一般有兩種情況:
task.run()
內部拋出異常,直接結束循環,然后執行processWorkerExit()
getTask()
返回為空,代表線程數量大於核心數量且workQueue
中沒有任務,此時需要執行processWorkerExit()
來清理多余的Worker
對象
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)、
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
針對於線程池workers
的操作都會進行加鎖處理,然后將當前Worker
從池子中移除,累加當前線程池完成的任務總數completedTaskCount
。
接着調用tryTerminate()
嘗試關閉線程池,這個方法后面有詳細說明。
接着判斷if (runStateLessThan(c, STOP)) {}
,含義是當前線程池狀態小於STOP
,即當前線程池狀態當前線程池狀態為 RUNNING
或 SHUTDOWN
,判斷當前線程是否是正常退出。如果當前線程是正常退出,那么completedAbruptly=false
,接着判斷線程池中是否還擁有足夠多的的線程,因為異常退出可能導致線程池中線程數量不足,此時就要執行addWorker()
為線程池添加新的worker
數據,看下面的詳細分析:
執行最后的addWorke()有三種可能:
1)當前線程在執行task
時 發生異常,這里一定要創建一個新worker
頂上去。
2)如果!workQueue.isEmpty()
說明任務隊列中還有任務,這種情況下最起碼要留一個線程,因為當前狀態為 RUNNING || SHUTDOWN這是前提條件。
3)當前線程數量 < corePoolSize值,此時會創建線程,維護線程池數量在corePoolSize
個水平。
tryTerminate()源碼分析
上面移除Worker
的方法中有一個tryTerminate()
方法的調用,這個方法是根據線程池狀態嘗試關閉線程池。
執行流程如下:
實現源碼如下:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
首先是判斷線程池狀態:
條件一:isRunning(c) 成立,直接返回就行,線程池很正常!
條件二:runStateAtLeast(c, TIDYING) 說明 已經有其它線程 在執行 TIDYING -> TERMINATED狀態了,當前線程直接回去。
條件三:(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())
SHUTDOWN特殊情況,如果是這種情況,直接回去。得等隊列中的任務處理完畢后,再轉化狀態。
接着執行:
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
走到這個邏輯,說明線程池狀態 >= STOP或者線程池狀態為SHUTDOWN
且隊列已經空了
當前線程池中的線程數量 > 0,調用interruptIdleWorkers()
中斷一個空閑線程,然后返回。我們來分析下,在getTask()
返回為空時會執行退出邏輯processWorkerExit()
,這里就會調用tryTerminate()
方法嘗試關閉線程池。
如果此時線程池狀態滿足線程池狀態 >= STOP或者線程池狀態為SHUTDOWN
且隊列已經空了,如果此時線程池中線程數不為0,就會中斷一個空閑線程。
為什么這里只中斷一個線程呢?這里的設計思想是,如果線程數量特別多的話,只有一個線程去做喚醒空閑worker
的任務可能會比較吃力,所以,就給了每個 被喚醒的worker
線程 ,在真正退出之前協助 喚醒一個空閑線程的任務,提供吞吐量的一種常用手段。
我們順便看下interruptIdleWorkers()
源碼:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
遍歷workers
,如果線程是空閑狀態(空閑狀態:queue.take()和queue.poll()返回空),則給其一個中斷信號,如果是處於workQueue
阻塞的線程,會被喚醒,喚醒后,進入下一次自旋時,可能會return null
執行退出相關的邏輯,接着又會調用processWorkerExit()->tryTerminate()
,回到上面場景,當前線程退出的時候還是會繼續喚醒下一個空現線程。
接着往下看tryTerminate
的剩余邏輯:
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
執行到這里的線程是誰?
workerCountOf(c) == 0
時,會來到這里。
最后一個退出的線程。 在 (線程池狀態 >= STOP || 線程池狀態為 SHUTDOWN 且 隊列已經空了)
線程喚醒后,都會執行退出邏輯,退出過程中 會 先將 workerCount計數 -1 => ctl -1。
調用tryTerminate
方法之前,已經減過了,所以0時,表示這是最后一個退出的線程了。
獲取全局鎖,進行加鎖操作,通過CAS
設置線程池狀態為TIDYING
狀態,設置成功則執行terminated()
方法,這也是一個自定義擴展的方法,當線程池中止的時候會調用此方法。
最后設置線程池狀態為TERMINATED
狀態,喚醒調用awaitTermination()
方法的線程。
awaitTermination()源碼分析
該方法是判斷線程池狀態是否達到TERMINATED
,如果達到了則直接返回true
,沒有達到則會await
掛起當前線程指定的時間。
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
在每次執行tryTerminate()
后會喚醒所有被await
的線程,繼續判斷線程池狀態。
shutDown()/shutDownNow()源碼分析
shutDown
和shutDown()
方法都是直接改變線程池狀態的方法,一般我們在系統關閉之前會調用此方法優雅的關閉線程池。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutdown
和shutdownNow
方法調用差不多,只是shutdown
是將線程池狀態設置為SHUTDOWN
,shutdownNow
是將線程池狀態設置為STOP
。
shutdownNow
會返回所有未處理的task
集合。
來看看它們共同調用的一些方法:
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
這個方法是設置線程池狀態為指定狀態,runStateAtLeast(c, targetState)
,判斷當前線程池ctl
值,如果小於targetState
則會往后執行。
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))),通過CAS
指令,修改ctl
中線程池狀態為傳入的targetState
。
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
interruptIdleWorkers
含義是為空閑的線程設置中斷標識,這里要清楚worker
什么時候空閑?我們在上面講解runWorker()
方法時,執行task.run()
之前,要針對Worker
對象加鎖,設置Worker
中的state
值為1,防止運行的worker
被添加中斷標識。接着執行getTask()
方法,獲取阻塞隊列中的任務,如果是queue.take()
則會阻塞掛起當前線程,釋放鎖,此時線程處於空閑狀態。如果是queue.pool()
返回為空,runWorker()
會釋放鎖,此時線程也是空閑狀態。
執行interrupt()
后處於queue
阻塞的線程,會被喚醒,喚醒后,進入下一次自旋判斷線程池狀態是否改變,如果改變可能直接返回空,這里具體參看runWorker()
和getTask()
方法。
onShutdown()
也是一個擴展方法,需要子類去重寫,這里代表當線程池關閉后需要做的事情。drainQueue()
方法是獲取workQueue
中現有的的任務列表。
問題回顧
-
ThreadPoolExecutor
中常用參數有哪些?
上面介紹過了,參見的參數是指ThreadPoolExecutor的構造參數,一般面試的時候都會先問這個,要解釋每個參數的含義及作用。 -
ThreadPoolExecutor
中線程池狀態和線程數量如何存儲的?
通過AtomicInteger類型的變量ctl來存儲,前3位代表線程池狀態,后29位代表線程池中線程數量。 -
ThreadPoolExecutor
有哪些狀態,狀態之間流轉是什么樣子的?
RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED
-
ThreadPoolExecutor
任務處理策略?
這個問題就是考察execute()
的執行過程,只要看過源碼就不會有問題。
-
ThreadPoolExecutor
常用的拒絕策略有哪些?
策略處理該任務,線程池提供了4種策略:
1)AbortPolicy:直接拋出異常,默認策略
2)CallerRunsPolicy:用調用者所在的線程來執行任務
3)DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務
4)DiscardPolicy:直接丟棄任務
當然線程池是支持自定義拒絕策略的,需要實現RejectedExecutionHandler接口中rejectedExecution()方法即可。 -
Executors
工具類提供的線程池有哪些?有哪些缺陷?
1) FixedThreadPool 和 SingleThreadPool:允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
2) CachedThreadPool:允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。
所以阿里巴巴也建議我們要自定義線程池核心線程數以及阻塞隊列的長度。 -
ThreadPoolExecutor
核心線程池中線程預熱功能?
在創建線程池后,可以使用prestartAllCoreThreads()來預熱核心線程池。public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }
-
ThreadPoolExecutor
中創建的線程如何被復用的?
這個主要是看runWorker()和getTask()兩個方法的執行流程,當執行任務時調用runWorker()方法,執行完成后會繼續從workQueue中獲取任務繼續執行,已達到線程復用的效果,當然這里還有一些細節,可以回頭看看上面的源碼解析。 -
ThreadPoolExecutor
中關閉線程池的方法shutdown
與shutdownNow
的區別?
最大的區別就是shutdown()會將線程池狀態變為SHUTDOWN,此時新任務不能被提交,workQueue中還存有的任務可以繼續執行,同時會像線程池中空閑的狀態發出中斷信號。
shutdownNow()方法是將線程池的狀態設置為STOP,此時新任務不能被提交,線程池中所有線程都會收到中斷的信號。如果線程處於wait狀態,那么中斷狀態會被清除,同時拋出InterruptedException。 -
ThreadPoolExecutor
中存在的一些擴展點?
鈎子方法:
1)beforeExecute()/afterExecute():runWorker()中線程執行前和執行后會調用的鈎子方法
2)terminated:線程池的狀態從TIDYING狀態流轉為TERMINATED狀態時terminated方法會被調用的鈎子方法。
3)onShutdown:當我們執行shutdown()方法時預留的鈎子方法。 -
ThreadPoolExecutor
支持動態調整核心線程數、最大線程數、隊列長度等一些列參數嗎?怎么操作?
運行期間可動態調整參數的方法:
1)setCorePoolSize():動態調整線程池核心線程數
2)setMaximumPoolSize():動態調整線程池最大線程數
3)setKeepAliveTime(): 空閑線程存活時間,如果設置了allowsCoreThreadTimeOut=true,核心線程也會被回收,默認只回收非核心線程
4)allowsCoreThreadTimeOut():是否允許回收核心線程,如果是true,在getTask()方法中,獲取workQueue就采用workQueue.poll(keepAliveTime),如果超過等待時間就會被回收。
總結
這篇線程池源碼覆蓋到了ThreadPoolExecutor
中大部分代碼,我相信認真閱讀完后肯定會對線程池有更深刻的理解。如有疑問或者建議可關注公眾號給我私信,我都會一一為大家解答。
另外推薦一個我的up主朋友,他自己錄制了好多學習視頻並分享在B站上了,大家有時間可以看一下(PS:非恰飯非利益相關,良心推薦):小劉講源碼-B站UP主