線程池中run
方法解析
概覽
上篇我們說到線程池中從隊列中去任務的地方時在Worker
類中的方法,這篇我們就來分析一下,這個方法。
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
//如果線程池是stop狀態,確保線程被中斷,如果線程池不是,確保線程池沒有被中斷。
//當我們清空中斷標志時,第二種情況需要需要有一個recheck來應對shutdownNow方法。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
這部分代碼實際是十分容易理解,但是,其中關於中斷的處理是我們需要注意的,也就是有注釋的那一段,我們需要詳細理解一下。
理解這部分,首先我們要對線程的中斷有詳細的了解。
線程的中斷
首先我們要了解stop
方法已經是一個過時的方法了,我們不應該再使用這種方式來中斷線程。
然后,我們需要了解三個方法,這三個方法看起來像是英語詞匯辨析,(⊙o⊙)…
isInterrupted
方法interrupted
方法
這里我們先來分析前兩個方法,這兩個方法都是用來驗證線程是否被中斷的,那么,這兩個方法有什么區別呢?
首先isInterrupted
方法是Thread
類的普通方法,會返回調用方法的類的狀態,而interrupted
方法是Thread
的靜態方法,返回的是調用方法的線程的狀態。
另外,還有一個區別就是interrupted
方法會清除線程的中斷狀態,也就是說,如果線程已經是中斷狀態,那么第一次調用,返回值為真,第二次調用,返回值就會為假。
還用一個方法interrupt
是用來中斷線程的,也就是將中斷標志設置為true
的方法。
了解了這三個方法,我們就會注意到,所謂的中斷只是將中斷標志設置一下,並沒有真正的中斷線程的運行,所以,一般來說,我們需要自己來檢查線程的中斷狀態,並設計如何應對中斷,也就是如何真正的結束線程。
需要注意的是,方法sleep
、wait
以及join
會對中斷標志有所處理,當線程中斷標志為true
時,將會拋出異常。這也並不難理解,當我們的線程進入這三種狀態的時候,除了等待我們並沒有任何方式讓線程跳出,那么中斷線程就是唯一的后悔葯。
好了,已經了解了以上知識,我們可以回歸正題了。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
由於,這里實際是處理shutdownNow
方法,所以,我們先插入一段關於線程池關閉方法的分析。
我們知道,關閉線程池的方法有兩個,一個是shutdown
,一個是shutdownNow
。
shutdown
方法會告訴線程池拒絕接受新的任務,但是,已經開始執行的以及進入隊列中的任務將會完成執行。
而shutdownNow
方法也同樣會告訴線程池拒絕接受新的任務,但是不同的是,他會試圖將已經開始的任務以及隊列中的任務取消。這種取消是通過中斷線程來實現的,也就是說,如果我們的任務中沒有針對線程中斷作處理,那么,在實際的使用體驗上,shutdownNow
與shutdown
是相同的。
當我們調用shutdownNow
方法時,線程池將會變為stop狀態。那么運行過程中,將會執行到上面那段代碼。
我們需要確保當線程池狀態是stop時,線程應該是中斷狀態的,同樣的,如果線程池狀態不是,那么線程的狀態也不應該是中斷的。
對於第二種情況,我們先清空中斷狀態,然后recheck,以防中間調用了shutdownNow
方法,這樣來確保滿足上面的情況。
取任務部分代碼
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
首先我們要明確,當我們需要到線程池中取任務的時候,當前Worker
中的任務肯定是null
。
那么,我們接着來看這段代碼,首先還是老樣子,需要檢查線程池的狀態。如果狀態大於shutdown,那么我就需要區別對待,如果是shutdown狀態,根據前面我們談到的線程池的關閉,如果隊列為空了,那么當前線程實際就可以減掉了,如果狀態已經大於stop,那么肯定會減掉。所以這里我們看到有一個decrementWorkerCount
方法,實際上是CAS方法。
這里解釋一下allowCoreThreadTimeOut
變量的使用,這個變量默認值是false,也就是說無論你是否設置超時時間,核心線程是不會過期的;如果這個變量設置為true,那么核心線程也會因為空閑超時。這里的是否超時,實際是受keepAliveTime
控制。
這樣,我們就不難理解timed
變量了,這個變量實際上是用來判定當前線程是否會超時的一個變量。
如果獲取任務成功,那么就可以直接返回,否則,我們將會再次進入循環。