介紹
Condition
是j.u.c
包下提供的一個接口。
可以翻譯成 條件對象,其作用是線程先等待,當外部滿足某一條件時,在通過條件對象喚醒等待的線程。ArrayBlockingQueue
就是通過Condition
實現的。
先看一下Condition
接口提供了哪些方法:
/**
* 條件對象
*/
public interface Condition {
/**
* 讓線程進入等待,如果其他線程調用同一Condition對象的notify/notifyAll,那么等待的線程可能被喚醒
*/
void await() throws InterruptedException;
/**
* 不拋出中斷異常的await方法
*/
void awaitUninterruptibly();
/**
* 帶超時的await
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;
/**
* 帶超時的await(可指定時間單位)
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* 帶超時的await(指定截止時間)
*/
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
* 喚醒等待的線程
*/
void signal();
/**
* 喚醒所有線程
*/
void signalAll();
}
Condition
接口主要提供了兩類方法——讓線程等待的方法(await()等)和喚醒線程的方法(signal())。
AQS
內部提供了Condition
接口的實現——ConditionalObject
。它內部的字段如下:
private static final long serialVersionUID = 1173984872572414699L;
//該ConditionObject維護的等待隊列的頭節點
private transient Node firstWaiter;
//該ConditionObject維護的等待隊列的尾節點
private transient Node lastWaiter;
非常簡單,從上面的字段我們大概可以猜到Condition
內部也維護了一個隊列。
上篇文章中,我們已經分析鎖實現的遠離就是通過節點構成隊列:讓隊列中除頭節點外的其他線程都被Park,當頭節點釋放鎖時,頭節點喚醒下一個節點(Unpark線程),同時更新頭節點。
舉一反三,我們推測Condition
喚醒功能的原理也是通過維護隊列的節點。
接下來就通過分析源碼,(主要是await()
和signal()
方法),驗證我們的猜測。
Condtion對象的獲取
Condition
對象的獲取主要是通過Lock.newCondition()
方法。
一個Lock
對象可以返回多個Condition
對象。
在對Condition
進行等待或者喚醒前,都需要先持有Condition
關聯Lock
對象,否則會拋出IllegalMonitorStateException
異常。
Condition.await()過程
public final void await() throws InterruptedException {
//如果線程已經被標記為中斷,則拋出異常
if (Thread.interrupted())
throw new InterruptedException();
//將線程添加進等待隊列
//注意等待隊列和AQS維護的阻塞隊列是兩個不同的隊列
//正常流程當線程能調用await(),說明線程此時擁有鎖,此時AQS的阻塞隊列中,線程應該在head節點
Node node = addConditionWaiter();
//釋放掉鎖(如果釋放失敗,NODE的waitStatus被更新為CANCELLED)
//同時因為釋放掉了鎖,該線程在阻塞隊列中的節點也已經被移除
int savedState = fullyRelease(node);
//這里會將線程掛起,除非線程節點被移到AQS的阻塞隊列或是線程被外部中斷
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//檢查是否是由於被中斷而喚醒,如果是,則跳出循環
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//在阻塞隊列中嘗試獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//節點已經在阻塞隊列中,與Condition的等待隊列聯系斷開
//對於SIGNAL喚醒的線程而言,SIGNAL時除了將節點移到阻塞隊列,同時也清空了node.nextWaiter
//而對於中斷喚醒的線程而言,只是將節點移到阻塞隊列,並沒有清空node.nextWaiter(因為此時線程不持有,操作等待線程並非線程安全)
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//根據interruptMode 決定是否需要拋出異常
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
await()
方法主要可以分為以下幾個過程:
1)線程將自己包裝成節點,並添加到Condition
的阻塞隊列中
2)線程主動釋放掉鎖
3)線程進入自循環等待(主動通過LockSupport.park()
),醒來時,檢查自己是否已經被移動至Lock的阻塞隊列
4)線程在阻塞隊列中等待,直到獲取鎖(線程在等待時可能又會被LockSupport.park()
掛起)
5)線程獲取鎖,檢查自己在等待過程中(await()
過程)是否有被中斷
6)如果有需要,則清理節點與等待隊列之間的聯系
7)根據中斷狀態確定是否需要拋出異常,以便讓await()
的調用者可以響應線程的中斷狀態
從上面的流程,我們可以清楚的了解到以上步驟1和2,以及步驟5,6,7是持有鎖的,步驟3和4並沒有持有鎖。了解這一點很重要,因為涉及某些方法是否需要以CAS來保證線程安全。
了解了大體流程,接下來就逐步分析各個步驟。
步驟1.線程包裝成節點,添加進Condition
的等待隊列
這一步驟主要是addConditionWaiter()
過程
private Node addConditionWaiter() {
Node t = lastWaiter;
// 找出該ConditionObject的等待隊列中 真正未被取消的最后一個節點,並更新為lastWaiter
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//將該線程包裝成Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果此時ConditionObject隊列為空,初始化鏈表且頭節點為node
if (t == null)
firstWaiter = node;
else //否則將node添加隊尾
t.nextWaiter = node;
//更新鏈表尾節點為node
lastWaiter = node;
return node;
}
主要找出等待隊列的最后一個節點,將線程包裝成Node,添加到隊列的隊尾。
這里要注意的一點是此時Node的waitStatus
為CONDITION
。節點的waitStatus
對判斷等待是否被取消很重要,在等待隊列中等待的節點狀態應該為CONDITION
,如果狀態不為CONDITION
,說明線程已經取消了等待(如果waitStatus為0說明被喚醒或中斷)。
步驟2.線程釋放鎖
釋放鎖的步驟比較簡單。主要通過fullRelease()
更新AQS
的state
為0並且將AQS
的擁有者置為null,同時喚醒阻塞隊列中的后繼節點。
final int fullyRelease(Node node) {
boolean failed = true;
try {
//獲取鎖被持有的此時
int savedState = getState();
//讓鎖直接釋放被持有的次數
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
//如果釋放失敗了,則將節點標記為CANCELLED
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
步驟3.線程掛起,進入循環等待
這一步比較關鍵,線程等待的動作都發生在這一步。
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//檢查是否是由於被中斷而喚醒,如果是,則跳出循環
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
線程執行到這,會被LockSupport.park()
掛起。
如果此時線程被喚醒,線程會檢查是否是因為中斷,如果發生中斷,還需要確定中斷是否發生在SIGNAL前(如果發生在SIGNAL前,之后線程需要拋出異常,讓外部響應)。
如果線程不是因為中斷而喚醒,線程需要確認節點是否已經被移動至AQS
的等待隊列。如果沒有被移動,則繼續被掛起(防止假喚醒)。
checkInterruptWhileWaiting()
就是用來檢測線程在等待的時候是否被中斷。
/**
* 檢查線程在WAITING狀態期間,是否有被中斷
* 如果沒有返回0;如果是在SIGNAL之前被中斷,返回 THROW_IE;如果在SIGNAL之后被中斷,則返回REINTERRUPT
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
如果線程被中斷,還需要通過transferAferCancelledWait
判斷中斷是否發生在SIGNAL
之前。
final boolean transferAfterCancelledWait(Node node) {
//CAS操作,期待值是CONDITION,說明此時喚醒是被取消(中斷),因為如果是SIGNAL,那么waitingStatus不會CONDITION,而是0(可以見SIGNAL流程)
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//即使是取消的,也需要移到AQS的阻塞隊列
enq(node);
return true;
}
//說明線程先收到了SIGNAL信號
//此時要等SIGNAL信號處理完成
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
前文說了Node節點的waitStatus
是一個很重要的狀態,從它可以推斷出線程節點發生了什么操作。
上述方法先用CAS
嘗試將更新節點的waitStatus
為0,期待值為CONDITION
。
如果嘗試成功,說明此時節點未被操作過(SIGNAL信號),線程是中斷喚醒的,此時需要通過enq()
將節點添加AQS
阻塞隊列,因為此時沒有鎖,所以enq()
方法以CAS重試的方式保證線程安全。
如果嘗試失敗,說明線程收到了SIGNAL
信號,節點將由負責SIGNAL
的線程移動至阻塞隊列。這里為了避免線程返回過早,在判斷出線程還未移動至阻塞隊列的情況下,會通過Thread.yeild()
讓出CPU時間。
步驟4.線程在阻塞隊列中重新等待鎖
這一步主要是通過acquireQueue()
方法。該方法已經在上一篇文章中介紹過了,這里不過多介紹。
需要注意的一點是,即使線程在等待時被中斷,仍然需要在AQS
的阻塞隊列中等待獲取鎖。因為外部沒有辦法在線程獲取鎖之前發現中斷狀態,而且即使線程拋出了中斷異常,此時線程也是持有鎖的,外部需要顯式的釋放。
步驟5.檢查並設置中斷狀態
這一步很簡單,主要就是通過步驟3中的checkInterruptWhilewaiting()
方法返回值:0表示未中斷,-1表示中斷發生在SIGNAl之前,1表示中斷發生在SIGNAL之后。
步驟6.清理節點與等待隊列的聯系
這里有兩種情況,如果線程是因為SIGNAL
喚醒的,在喚醒時調用signal()
的線程已經清理了被喚醒節點與等待隊列的關系。 因為那時喚醒線程持有鎖,操作是安全的。
但是如果對於中斷被喚醒的線程,喚醒時是不持有鎖的,不能保證線程安全的清理喚醒節點與等待隊列的關系。因此就將等待清理工作放在了獲取鎖之后。
//此方法並不保證線程安全,因此調用此方法時,必須要在獲取鎖的情況下調用
//此方法的目的是為了整理Condition的等待隊列,將非CONDITION狀態的節點從等待隊列中移除
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
步驟7.確定中斷狀態,並決定是否拋出異常
這一步也很簡單,就是根據interruptMode
來確定是否拋出異常,如果interruptMode
值為THROW_IE,說明線程被喚醒前先被中斷,此時需要拋出InterruptedException
。
Condition.signal()過程
signal()
過程比起await()
要簡單很多。既然await()
過程是將節點添加到等待隊列,那么signal()
作為await()
的逆過程,就是將節點從等待隊列重新移動到AQS
阻塞隊列。
/**
* 喚醒等待節點
* 主要的流程就是將節點從Condition的等待隊列移到AQS的阻塞隊列中,讓其重新等待鎖的獲取
*/
public final void signal() {
//先驗證喚醒者是否是鎖的持有者
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//喚醒等待隊列的第一個節點
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
為保證安全,先確定喚醒線程是否為鎖的持有者。
之后找出等待隊列中的頭節點,將其喚醒。
/**
* 喚醒操作(此時占有鎖,為線程安全),
* 找出等待隊列中第一個真正要被喚醒的節點,移動到阻塞隊列,
*/
private void doSignal(Node first) {
do {
//找出第一個需要喚醒的節點
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//斷開和之后等待隊列的聯系,並更新等待隊列的頭節點
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null); //等待隊列的頭節點移動失敗 且 等待隊列中還有其他節點,則繼續嘗試其他節點
}
因為頭節點可能已經被取消,所以這里會一直在等待隊列中從前往后找一個節點,開始喚醒。直到一個節點喚醒成功或者等待隊列中沒有節點需要喚醒。
上文也說了喚醒過程其實就是節點的移動過程。
/**
* 將節點從Condition的等待隊列移動到AQS的阻塞隊列(該方法只會在signal相關的方法中被調用)
*/
final boolean transferForSignal(Node node) {
//CAS操作更新節點的waitStatus,期待值為CONDITION
//操作成功,說明沒有其他線程在操作這個節點
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //CAS操作失敗,說明節點已經被中斷操作過,waitStatus已經變成了0
return false;
//這里說明CAS操作成功
//應該是線程安全的
//將節點添加到阻塞隊列的隊尾
Node p = enq(node);
//判斷此時阻塞隊列是否有前繼節點等待,有就Park線程,等待前繼節點喚醒
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
transferForSignal()
操作其實就是先通過CAS
操作確定等待隊列中的節點還未被取消。如果CAS
操作成功,則將其添加到阻塞隊列,如果此時阻塞隊列還需要等待鎖,則讓await()
的線程繼續被掛起。將給線程喚醒的任務(此時已經是獲取鎖的任務,不再是之前的await()任務)交給阻塞隊列中的前繼節點。
await()以及signal()過程的流程圖
跟着流程圖在走一遍,可以幫助鞏固上述的知識點。
總結
再借上篇文章重口味的比方梳理下這個流程。
當你排隊進了WC的包廂,想要方便時,你覺得太臟了,於是你在包廂內留了張紙條,希望有人能在廁所包廂干凈了在叫你過來上廁所(調用Condition的await ()),隨后主動讓出了包廂的使用權(釋放鎖)。后面在排隊等着方便的人便進去了。
你離開后,到另一個地方邊玩手機邊等(移動到Condition的阻塞隊列,並被Park),期間,也有一些人同樣覺得廁所太臟,跑了出來,在外面等,並排在了你的后面。
過了一段時間,有人把打電話給你說廁所已經變干凈了,可以去用了,你就重新回到廁所那排起了隊伍,等待輪到你用廁所(acquireQueue)。因為那個人僅通知了你,沒有通知其他因為嫌棄廁所臟,而跑出來的人,所以那些只能繼續在那等別人去叫他們。
如果你在等待的時候突然有人用其他方式把你喊了回去(外部未給signal卻被中斷),你又主動從在外面等待走到了廁所前去排隊等待繼續去用廁所,但是等你重新排到包廂時,發現測試還是很臟(Condition的 條件未滿足),你就會拋出異常。