Java提供了幾種便捷的方法創建線程池,通過這些內置的api就能夠很輕松的創建線程池。在java.util.concurrent
包中的Executors
類,其中的靜態方法就是用來創建線程池的:
- newFixedThreadPool():創建一個固定線程數量的線程池,而且線程池中的任務全部執行完成后,空閑的線程也不會被關閉。
- newSingleThreadExecutor():創建一個只有一個線程的線程池,空閑時也不會被關閉。
- newCachedThreadPool():創建一個可緩存的線程池,線程的數量為
Integer.MAX_VALUE
,空閑線程會臨時緩存下來,線程會等待60s
還是沒有任務加入的話就會被關閉。
Executors
類中還有一些創建線程池的方法(jdk8新加的),但是現在這個觸極到我的知識盲區了~~
上面那幾個方法,其實都是創建了一個ThreadPoolExecutor
對象作為返回值,要搞清楚線程池的原理主要還是要分析ThreadPoolExecutor
這個類。
ThreadPoolExecutor
的構造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
ThreadPoolExecutor
的構造方法包含以下幾個參數:
- corePoolSize: 核心線程數量,常駐線程池中的線程,即時線程池中沒有任務可執行,也不會被關閉。
- maximumPoolSize:最大線程數量
- keepAliveTime:空閑線程存活時間
- unit: 空閑線程存活時間的單位
- workQueue:工作隊列,線程池一下忙不過來,那新來的任務就需要排隊,排除中的任務就會放在workQueue中
- threadFactory:線程工廠,創建線程用的
- handler:
RejectedExecutionHandler
實例用於在線程池中沒有空閑線程能夠執行任務,並且workQueue
中也容不下任務時拒絕任務時的策略。
ThreadPoolExecutor
中的線程統稱為工作線程,但有一個小概念是核心線程
,核心線程由參數corePoolSize
指定,如corePoolSize
設置5,那線程池中就會有5條線程常駐線程池中,不會被回收掉,但是也會有例外,如果allowCoreThreadTimeOut
為true
空閑一段時間后,也會被關閉。
線程的狀態和工作線程數量
線程中的狀態和工作線程和數量都是由ctl
表示,是一個AtomicInteger
類型的屬性:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl的高四位為線程的狀態,其他位數為工作線程的數量,所以線程中最大的工作線程數量為(2^29)-1
。
線程池中的狀態有五種:
- RUNNING:接收新的任務和處理隊列中的任務
- SHUTDOWN:不能新增任務,但是會繼續處理已經添加的任務
- STOP:不能新增任務,不會繼續處理已經添加任務
- TIDYING:所有的任務已經被終止,工作線程為0
- TERMINATED:terminated()方法執行完成
狀態碼的定義如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
創建線程池
如果有面試官問:如何正確的創建線程池?千萬不要說使用Executors
創建線程,雖然Executors
能很方便的創建線程池,但是他提供的靜態創建方法會有一些坑。
主要的原因是:maximumPoolSize
和workQueue
這兩個參數
Executors
靜態方法在創建線程池時,如果maximumPoolSize
設置為Integer.MAX_VALUE
,這樣會導致線程池可以一直要以接收運行任務,可能導致cpu負載過高。
workQueue
是一個阻塞隊列的實例,用於放置正在等待執行的任務。如果在創建線程種時workQueue
實例沒有指定任務的容量,那么等待隊列中可以一直添加任務,極有可能導致oom
。
所以創建線程,最好是根據線程池的用途,然后自己創建線程。
添加任務
調用線程池的execute
並不是立即執行任務,線程池內部用經過一頓操作,如:判斷核心線程數、是否需要添加到等待隊列中。
下來的代碼是execute
的源碼,代碼很簡潔只有2個if
語句:
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- 第一個if,如果當前線程池中的工作線程數量小於
corePoolSize
,直接創建一個工作線程執行任務 - 第二個if,當線程池處於運行狀態,調用
workQueue.offer(command)
方法將任務添加到workQueue
,否則調用addWorker(command, false)
嘗試去添加一個工作線程。
整理了一張圖,把線程池分為三部分Core Worker
、Worker
、workQueue
:
換一種說法,在調用execute
方法時,任務首先會放在Core Worker
內,然后才是workQueue
,最后才會考慮Worker
。
這樣做的原因可以保證Core Worker
中的任務執行完成后,能立即從workQueue
獲取下一個任務,而不需要啟動別的工作線程,用最少的工作線程辦更多的事。
創建工作線程
在execute
方法中,有三個地方調用了addWorker
。addWorker
方法可以分為二部分:
- 增加工作線程數量
- 啟動工作線程
addWorker
的方法簽名如下:
private boolean addWorker(Runnable firstTask, boolean core)
- firstTask:第一個運行的任務,可以為空。如果為空任務會從
workQueue
中獲取。 - core: 是否是核心工作線程
增加工作線程數量
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
....
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
上面代碼省略了一部分代碼,主要代碼都在for
循環中,利用CAS
鎖,安全的完成線程池狀態的檢查與增加工作線程的數量。其中的compareAndIncrementWorkerCount(c)
調用就是將工作線程數量+1。
啟動工作線程
增加工作線程的數量后,緊接着就會啟動工作線程:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
啟動工作線程的流程:
- 創建一個
Worker
實例,Worker
構造方法會使用ThreadFactory
創建一個線程
w = new Worker(firstTask);
final Thread t = w.thread;
就不說Worker
類的實現了,直接給出構造方法來細品:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
- 如果線程池狀態是在運行中,或者已經關閉,但工作線程要從
workQueue
中獲取任務,才能添加工作線程
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
注意::當線程池處於SHUTDOWN
狀態時,它不能接收新的任務,但是可以繼續執行未完成的任務。任務是否從workQueue
中獲取,是根據firstTask
判斷,每個Worker
實例都有一個firstTask
屬性,如果這個值為null
,工作線程啟動的時候就會從workQueue
中獲取任務,否則會執行firstTask
。
- 啟動線程
調用線程的start
方法,啟動線程。
if (workerAdded) {
t.start();
workerStarted = true;
}
執行任務
回過頭來看一個Worker
類的定義:
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
...
}
Worker
類實現了Runnable
接口,同時在構造方法中會將this
傳遞給線程,到這里你就知道了Worker
實例中有run
方法,它會在線程啟動后執行:
public void run() {
runWorker(this);
}
run
方法內部接着調用runWorker
方法運行任務,在這里才是真正的開始運行任務了:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- 獲取任務
首先將firstTask
傳遞給task
臨時變量:
Runnable task = w.firstTask;
然后循環檢查task
或者從workQueue
中獲取任務:
while (task != null || (task = getTask()) != null) {
...
}
getTask()
稍后再做分析。
- 運行任務
去掉一些狀態檢查、異常捕獲、和勾子方法調用后,保留最重要的調用task.run()
:
while (task != null || (task = getTask()) != null) {
...
task.run();
...
}
task
其實就是通過調用execute
方法傳遞進來的Runnable
實例,也就是你的任務。只不過它可能保存在Worker.firstTask
中,或者在workQueue
中,保存在哪里在前面的任務添加順序
中已經說明。
從workQueue中獲取任務
試想一下如果每個任務執行完成,就關閉掉一個線程那有多浪費資源,這樣使用線程池也沒有多大的意義。所以線程的主要的功能就是線程復用,一旦任務執行完成直接去獲取下一個任務,或者掛起線程等待下一個提交的任務,然后等待一段時間后還是沒有任務提交,然后才考慮是否關閉部分空閑的線程。
runWorker
中會循環的獲取任務:
while (task != null || (task = getTask()) != null) {
...
task.run();
...
}
上面的代碼getTask()
就是從workQueue
中獲取任務:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
...
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
...
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
獲取任務的時候會有兩種方式:
- 超時等待獲取任務
- 一直等待任務,直到有新任務
如果allowCoreThreadTimeOut
為true
,corePoolSize
指定的核心線程數量會被忽略,直接使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
獲取任務,否則的話會根據當前工作線程的數量,如果wc > corePoolSize
為false
則當前會被認為是核心線程,調用workQueue.take()
一直等待任務。
工作線程的關閉
還是在runWorker
方法中:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
task.run();
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- completedAbruptly變量:標記當前工作線程是正常執行完成,還是異常完成的。completedAbruptly為
false
可以確定線程池中沒有可執行的任務了。
上面代碼是簡潔后的代碼,一個while
循環保證不間斷的獲取任務,沒有任務可以執行(task為null)退出循環,最后再才會調用processWorkerExit
方法:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
processWorkerExit
接收一個Worker
實例與completedAbruptly
變量。processWorkerExit的大致工作流程:
- 判斷當前工作線程是否異常完成,如果是直接減少工作線程的數量,簡單的說就是校正一下工作線程的數量。
- 增加完成的任務數量,將
Worker
從workers
中移除 - tryTerminate() 檢查線程池狀態,因為線程池可以延遲關閉,如果你調用
shutdown
方法后不會立即關閉,要等待所有的任務執行完成,所以這里調用tryTerminate()方法,嘗試去調用terminated
方法。
工作線程完成策略
如果某個工作線程完成,線程池內部會判斷是否需要重新啟動一個:
//判斷線程池狀態
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
//獲取最小工作線程數量
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果最小工作線程數量為0,但是workQueue中還有任務,那重置最小工作線程數量1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果當前工作線程數數量大於或等於最小工作線程數量,則不需要啟動新的工作線程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//啟動一個新的工作線程
addWorker(null, false);
}
工作線程完成后有兩種處理策略:
- 對於異常完成的工作線程,直接啟動一個新的替換
- 對於正常完成的工作線程,判斷當前工作線程是否足夠,如果足夠則不需要新啟動工作線程
注意:這里的完成,表示工作線程的任務執行完成,workQueue
中也沒有任務可以獲取了。
線程池的關閉
關閉線程池有可以通過shutdown
方法:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdown
方法,第一步就是先改變線程池的狀態,調用advanceRunState(SHUTDOWN)
方法,將線程池當前狀態更改為SHUTDOWN
,advanceRunState代碼如下:
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
然后立即調用interruptIdleWorkers()
方法,interruptIdleWorkers()
內部會調用它的重載方法interruptIdleWorkers(boolean onlyOne)
同時onlyOne參數傳遞的false
來關閉空閑的線程:
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
以上代碼會遍歷workers
中的Worker
實例,然后調用線程的interrupt()
方法。
什么樣的線程才是空閑工作線程?
前面提到過在getTask()
中,線程從workQueue
中獲取任務時會阻塞,被阻塞的線程就是空閑的。
再次回到getTask()
的代碼中:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
...
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
...
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
再次分析getTask()
中的代碼中有一段捕獲InterruptedException
的代碼塊,interruptIdleWorkers方法中斷線程后,getTask()
會捕獲中斷異常,因為外面是一個for
循環,隨后代碼走到判斷線程池狀態的地方:
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
上面的代碼的會判斷當前線程池狀態,如果狀態大於STOP
或者狀態等於SHUTDOWN
並且workQueue
為空時則返回null
,getTask()
返回空那么在runWorker
中循環就會退出,當前工作線程的任務就完成了,可以退出了:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
task.run();
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
shutdownNow
除了shutdown方法能關閉線程池,還有shutdownNow
也可以關閉線程池。它兩的區別在於:
shutdownNow
會清空workQueue
中的任務shutdownNow
還會中止當前正在運行的任務shutdownNow
會使線程進入STOP
狀態,而shutdown()
是SHUTDOWN
狀態
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
上面代碼基本流程:
- advanceRunState(STOP): 使線程池進行
STOP
狀態,與shutdown()
中的一致 ,只是使用的狀態碼是STOP
- interruptWorkers(): 與
shutdown()
中的一致 - drainQueue(): 清空隊列
任務是中止執行還是繼續執行?
調用shutdownNow()后線程池處於STOP
狀態,緊接着所有的工作線程都會被調用interrupt
方法,如果此時runWorker
還在運行會發生什么?
在runWorker
有一段代碼,就是工作線程中止的重要代碼:
final void runWorker(Worker w) {
...
while (task != null || (task = getTask()) != null) {
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
task.run();
}
...
}
重點關注:
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
這個if看起來有點難理解,理解下來大致意思是:如果線程池狀態大於等於STOP
,立即中斷線程,否則清除線程的中斷標記,也就是說當線程池狀態為RUNNING
和SHUTDOWN
時,線程的中斷標記會被清除(線程的中斷代碼在interruptWorkers
方法中),可以繼續執行任務。
以上代碼執行完成后,緊接着就會調用task.run()
方法,這里面我們自己就可以根據線程的中斷標記來判斷任務是否被中斷。
總結
個人水平有限,文中如有錯誤,謝謝大家指正。
本文從線程池的源碼入手,分析線程池的創建、添加任務、運行任務等流程,整個分析下來基本上大多數公司關於線程池面試的問題都可以回答得上來,當然還有一些小細節如:Worker
類是繼承AQS
的,為什么這么做其實源碼中都有一些苗頭,Worker
在運行時會鎖住運行的代碼塊,而shutdown
在關閉空閑的Worker
時,首先就要去獲取Worker
的同步鎖才能繼續操作,這樣才能安全的關閉工作線程。
歡迎關注我的公眾號:架構文摘,獲得獨家整理120G的免費學習資源助力你的架構師學習之路!
公眾號后台回復
arch028
獲取資料: