系列傳送門:
- Java並發包源碼學習系列:AbstractQueuedSynchronizer
- Java並發包源碼學習系列:CLH同步隊列及同步資源獲取與釋放
- Java並發包源碼學習系列:AQS共享式與獨占式獲取與釋放資源的區別
- Java並發包源碼學習系列:ReentrantLock可重入獨占鎖詳解
- Java並發包源碼學習系列:ReentrantReadWriteLock讀寫鎖解析
Condition接口
Contition是一種廣義上的條件隊列,它利用await()和signal()為線程提供了一種更為靈活的等待/通知模式。
圖源:《Java並發編程的藝術》

Condition必須要配合Lock一起使用,因為對共享狀態變量的訪問發生在多線程環境下。
一個Condition的實例必須與一個Lock綁定,因此await和signal的調用必須在lock和unlock之間,有鎖之后,才能使用condition嘛。以ReentrantLock為例,簡單使用如下:
public class ConditionTest {
public static void main(String[] args) {
final ReentrantLock lock = new ReentrantLock();
final Condition condition = lock.newCondition();
Thread thread1 = new Thread(() -> {
String name = Thread.currentThread().getName();
lock.lock();
System.out.println(name + " <==成功獲取到鎖" + lock);
try {
System.out.println(name + " <==進入條件隊列等待");
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " <==醒了");
lock.unlock();
System.out.println(name + " <==釋放鎖");
}, "等待線程");
thread1.start();
Thread thread2 = new Thread(() -> {
String name = Thread.currentThread().getName();
lock.lock();
System.out.println(name + " ==>成功獲取到鎖" + lock);
try {
System.out.println("========== 這里演示await中的線程沒有被signal的時候會一直等着 ===========");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " ==>通知等待隊列的線程");
condition.signal();
lock.unlock();
System.out.println(name + " ==>釋放鎖");
}, "通知線程");
thread2.start();
}
}
等待線程 <==成功獲取到鎖java.util.concurrent.locks.ReentrantLock@3642cea8[Locked by thread 等待線程]
等待線程 <==進入條件隊列等待
通知線程 ==>成功獲取到鎖java.util.concurrent.locks.ReentrantLock@3642cea8[Locked by thread 通知線程]
========== 這里演示await中的線程沒有被signal的時候會一直等着 ===========
通知線程 ==>通知等待隊列的線程
通知線程 ==>釋放鎖
等待線程 <==醒了
等待線程 <==釋放鎖
接下來我們將從源碼的角度分析上面這個流程,理解所謂條件隊列的內涵。
AQS條件變量的支持之ConditionObject內部類
AQS,Lock,Condition,ConditionObject之間的關系:
ConditionObject是AQS的內部類,實現了Condition接口,Lock中提供newCondition()方法,委托給內部AQS的實現Sync來創建ConditionObject對象,享受AQS對Condition的支持。
// ReentrantLock#newCondition
public Condition newCondition() {
return sync.newCondition();
}
// Sync#newCondition
final ConditionObject newCondition() {
// 返回Contition的實現,定義在AQS中
return new ConditionObject();
}
ConditionObject用來結合鎖實現線程同步,ConditionObject可以直接訪問AQS對象內部的變量,比如state狀態值和AQS隊列。

ConditionObject是條件變量,每個條件變量對應一個條件隊列(單向鏈表隊列),其用來存放調用條件變量的await方法后被阻塞的線程,ConditionObject維護了首尾節點,沒錯這里的Node就是我們之前在學習AQS的時候見到的那個Node,我們會在下面回顧:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 條件隊列的第一個節點. */
private transient Node firstWaiter;
/** 條件隊列的最后一個節點. */
private transient Node lastWaiter;
}
看到這里我們需要明確這里的條件隊列和我們之前說的AQS同步隊列是不一樣的:
- AQS維護的是當前在等待資源的隊列,Condition維護的是在等待signal信號的隊列。
- 每個線程會存在上述兩個隊列中的一個,lock與unlock對應在AQS隊列,signal與await對應條件隊列,線程節點在他們之間反復橫跳。
這里着重說明一下,接下來的源碼學習部分,我們會將兩個隊列進行區分,涉及到同步隊列和阻塞隊列的描述,意味着是AQS的同步隊列,而條件隊列指的是Condition隊列,望讀者知曉。
這里我們針對上面的demo來分析一下會更好理解一些:
為了簡化,接下來我將用D表示等待線程,用T表示通知線程。
- 【D】先調用
lock.lock()方法,此時無競爭,【D】被加入到AQS同步隊列中。 - 【D】調用
condition.await()方法,此時【D】被構建為等待節點並加入到condition對應的條件等待隊列中,並從AQS同步隊列中移除。 - 【D】陷入等待之后,【T】啟動,由於AQS隊列中的【D】已經被移除,此時【T】也很快獲取到鎖,相應的,【T】也被加入到AQS同步隊列中。
- 【T】接着調用
condition.signal()方法,這時condition對應的條件隊列中只有一個節點【D】,於是【D】被取出,並被再次加入AQS的等待隊列中。此時【D】並沒有被喚醒,只是單純換了個位置。 - 接着【T】執行
lock.unlock(),釋放鎖鎖之后,會喚醒AQS隊列中的【D】,此時【D】真正被喚醒且執行。
OK,lock -> await -> signal -> unlock這一套流程相信已經大概能夠理解,接下來我們試着看看源碼吧。
回顧AQS中的Node
我們這里再簡單回顧一下AQS中Node類與Condition相關的字段:
// 記錄當前線程的等待狀態,
volatile int waitStatus;
// 前驅節點
volatile Node prev;
// 后繼節點
volatile Node next;
// node存儲的線程
volatile Thread thread;
// 當前節點在Condition中等待隊列上的下一個節點
Node nextWaiter;
waitStatus可以取五種狀態:
- 初始化為0,啥也不表示,之后會被置signal。
- 1表示cancelled,取消當前線程對鎖的爭奪。
- -1表示signal,表示當前節點釋放鎖后需要喚醒后面可被喚醒的節點。
- -2表示condition,我們這篇的重點,表示當前節點在條件隊列中。
- -3表示propagate,表示釋放共享資源的時候會向后傳播釋放其他共享節點。
當然,除了-2這個condition狀態,其他的等待狀態我們之前都或多或少分析過,今天着重學習condition這個狀態的意義。
我們還可以看到一個Node類型的nextWaiter,它表示條件隊列中當前節點的下一個節點,可以看出用以實現條件隊列的單向鏈表。
void await()
調用Condition的await()方法,會使當前線程進入等待隊列並釋放鎖,同時線程狀態變為等待狀態。
其實就是從AQS同步隊列的首節點,注意不是head,而是獲取了鎖的節點,移動到Condition的等待隊列中。

了解這些之后,我們直接來看看具體方法的源碼:
public final void await() throws InterruptedException {
// 這個方法是響應中斷的
if (Thread.interrupted())
throw new InterruptedException();
// 添加到條件隊列中
Node node = addConditionWaiter();
// 釋放同步資源,也就是釋放鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果這個節點的線程不在同步隊列中,說明該線程還不具備競爭鎖的資格
while (!isOnSyncQueue(node)) {
// 掛起線程
LockSupport.park(this);
// 如果線程中斷,退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 上面的循環退出有兩種情況:
// 1. isOnSyncQueue(node) 為true,即當前的node已經轉移到阻塞隊列了
// 2. checkInterruptWhileWaiting != 0, 表示線程中斷
// 退出循環,被喚醒之后,進入阻塞隊列,等待獲取鎖 acquireQueued
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
添加到條件隊列
Node addConditionWaiter()
addConditionWaiter() 是將當前節點加入到條件隊列中:
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果lastWaiter被取消了,將其清除
if (t != null && t.waitStatus != Node.CONDITION) {
// 遍歷整個條件隊列,將已取消的所有節點清除出列
unlinkCancelledWaiters();
// t重新賦值一下,因為last可能改變了
t = lastWaiter;
}
//注意這里,node在初始化的時候,會指定ws為CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// t == null 表示隊列此時為空,初始化firstWaiter
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;// 入隊尾
lastWaiter = node;// 將尾指針指向新建的node
return node;
}
void unlinkCancelledWaiters()
unlinkCancelledWaiters用於清除隊列中已經取消等待的節點。
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
// trail這里表示取消節點的前驅節點
Node trail = null;
// t會從頭到尾遍歷這個單鏈表
while (t != null) {
// next用於保存下一個
Node next = t.nextWaiter;
// 如果發現當前這個節點 不是 condition了, 那么考慮移除它
// 下面是單鏈表的移除節點操作 簡單來說就是 trail.next = t.next
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
// 說明first就是不是condition了
if (trail == null)
firstWaiter = next;
else
//trail.next = t.next
trail.nextWaiter = next;
// trail后面沒東西,自然trail就是lastWaiter了
if (next == null)
lastWaiter = trail;
}
// 當前節點是一直跟到不是condition節點的上一個
else
trail = t;
// 向后遍歷 t = t.next
t = next;
}
}
總結一下addConditionWaiter的過程:
- 首先判斷條件隊列的尾節點是否被取消了,這里用last.ws != CONDITION來判斷,如果是的話,就需要從頭到尾遍歷,消除被不是condition的節點。
- 接着將當前線程包裝為Node,指定ws為CONDITION。
完全釋放獨占鎖
將節點加入等待隊列中后,就需要完全釋放線程擁有的獨占鎖了,完全釋放針對重入鎖的情況。我們可以拉到await()方法中看看,將會調用:int savedState = fullyRelease(node);,這句話有什么內涵呢?
我們看到這個方法返回了一個savedState變量,簡單的理解就是保存狀態。我們知道重入鎖的state由重入的次數,如果一個state為N,我們可以認為它持有N把鎖。
await()方法必須將state置0,也就是完全釋放鎖,后面的線程才能獲取到這把鎖,置0之后,我們需要用個變量標記一下,也就是這里的savedState。
這樣它被重新喚醒的時候,我們就知道,他需要獲取savedState把鎖。
int fullyRelease(Node node)
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 獲取當前的state值,重入次數
int savedState = getState();
// 釋放N = savedState資源
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 如果獲取失敗,將會將節點設置為取消狀態,並拋出異常
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
這里其實我們就會明白開頭說的:如果某個線程沒有獲取lock,就直接調用condition的await()方法,結果是什么呢,在release的時候拋出異常,然后節點被取消,之后節點進來的時候,將它清理掉。
等待進入阻塞隊列
ok,完全釋放鎖之后,將會來到這幾步,如果這個節點的線程不在同步隊列中,說明該線程還不具備競爭鎖的資格,將被一直掛起,這里的同步隊列指的是AQS的阻塞隊列。
int interruptMode = 0;
// 如果這個節點的線程不在同步隊列中,說明該線程還不具備競爭鎖的資格,會一直掛起
while (!isOnSyncQueue(node)) {
// 掛起線程
LockSupport.park(this);
// 如果線程中斷,退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
boolean isOnSyncQueue(Node node)
下面這個方法會判斷節點是不是已經到阻塞隊列中了,如果是的話,就直接返回true,這個方法的必要性是什么呢?
其實啊,這里需要提前說一下signal()方法,signal的作用和await()方法,將在等待隊列中阻塞的節點移動到AQS同步隊列中,這個方法就是說判斷一下這個節點是不是移過去了。
final boolean isOnSyncQueue(Node node) {
// 1. 節點的等待狀態還是condition表示還在等待隊列中
// 2. node.prev == null 表示還沒移到阻塞隊列中[prev和next都是阻塞隊列中用的]
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果node已經有了后繼節點,表示已經在阻塞隊列中了
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// 來到這里的情況:ws != condition && node.prev != null && node.next == null
// 想想:為什么node.prev != null不能作為判斷不在阻塞隊列的依據呢?
// CAS首先設置node.prev 指向tail,這個時候node.prev 是不為null的,但CAS可能會失敗
return findNodeFromTail(node);
}
為什么node.prev != null不能作為判斷不在阻塞隊列的依據呢?
官方給出了解答: 因為CAS的入隊操作中,首先設置node.prev 指向tail,這個時候node.prev 是不為null的。你能夠說他入隊成功一定成功嗎?不一定,因為CAS可能會失敗,所以要findNodeFromTail(node)。
boolean findNodeFromTail(Node node)
從阻塞隊列的尾部向前遍歷,如果找到這個node,表示它已經在了,那就返回true。
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
// 已經有了
if (t == node)
return true;
// 尾都沒有,找啥呢,返回false
if (t == null)
return false;
// 一直往前找
t = t.prev;
}
}
void signal()
由於之前節點被加入等待隊列將會一直阻塞,為了連貫性,我們來看看喚醒它的signal方法吧:
之前說到,如果這個線程會在等待隊列中等待,那么喚醒它的signal方法的流程是怎么樣的呢,前面其實已經說了一丟丟了,我們猜測,signal會將isOnSyncQueue方法的循環打破,接下來看看是不是這樣子的。
public final void signal() {
// 一樣的,必須占有當前這個鎖才能用signal方法
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

喚醒節點
該方法會從頭到尾遍歷條件隊列,找到需要移到同步隊列的節點。
void doSignal(Node first)
private void doSignal(Node first) {
do {
// firstWaiter 指向first的下一個
if ( (firstWaiter = first.nextWaiter) == null)
// 如果first是最后一個且要被移除了,就將last置null
lastWaiter = null;
// first斷絕與條件隊列的連接
first.nextWaiter = null;
// fisrt轉移失敗,就看看后面是不是需要的
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
這里的while循環表示,如果first沒有轉移成功,就接着判斷first后面的節點是不是需要轉移。
boolean transferForSignal(Node node)
該方法將節點從條件隊列轉移到阻塞隊列。
final boolean transferForSignal(Node node) {
/*
* CAS操作嘗試將Condition的節點的ws改為0
* 如果失敗,意味着:節點的ws已經不是CONDITION,說明節點已經被取消了
* 如果成功,則該節點的狀態ws被改為0了
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* 通過enq方法將node自旋的方式加入同步隊列隊尾
* 這里放回的p是node在同步隊列的前驅節點
*/
Node p = enq(node);
int ws = p.waitStatus;
// ws大於0 的情況只有 cancenlled,表示node的前驅節點取消了爭取鎖,那直接喚醒node線程
// ws <= 0 會使用cas操作將前驅節點的ws置為signal,如果cas失敗也會喚醒node
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
// 自旋的方式入隊
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
// 返回的是node的前驅節點
return t;
}
}
}
}
檢查中斷狀態
ok,一旦signal之后,節點被成功轉移到同步隊列后,這時下面這個循環就會退出了,繼續回到這里:
int interruptMode = 0;
// 如果這個節點的線程不在同步隊列中,說明該線程還不具備競爭鎖的資格,會一直掛起
while (!isOnSyncQueue(node)) {
// 掛起線程
LockSupport.park(this);
// 如果線程中斷,退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
interruptMode可以有以下幾種取值:
/** await 返回的時候,需要重新設置中斷狀態 */
private static final int REINTERRUPT = 1;
/** await 返回的時候,需要拋出 InterruptedException 異常 */
private static final int THROW_IE = -1;
/** interruptMode取0的時候表示在await()期間,沒有發生中斷 */
說到這里我們需要明白,LockSupport.park(this)掛起的線程是什么時候喚醒的:
- signal方法將節點轉移到同步隊列中,且獲取到了鎖或者對前驅節點的cas操作失敗,調用了
LockSupport.unpark(node.thread);方法。 - 在park的時候,另外一個線程對掛起的線程進行了中斷。
喚醒之后,我們可以看到調用checkInterruptWhileWaiting方法檢查等待期間是否發生了中斷,如果不為0表示確實在等待期間發生了中斷。
但其實這個方法的返回結果用interruptMode變量接收,擁有更加豐富的內涵,它還能夠判斷中斷的時機是否在signal之前。
int checkInterruptWhileWaiting(Node node)
該方法用於判斷該線程是否在掛起期間發生了中斷。
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?// 如果處於中斷狀態,返回true,且將重置中斷狀態
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :// 如果中斷了,判斷何時中斷
0; // 沒有中斷, 返回0
}
boolean transferAfterCancelledWait(Node node)
該方法判斷何時中斷,是否在signal之前。
final boolean transferAfterCancelledWait(Node node) {
// 嘗試使用CAS操作將node 的ws設置為0
// 如果成功,說明在signal方法之前中斷就已經發生:
// 原因在於:signal如果在此之前發生,必然已經cas操作將ws設置為0了,這里不可能設置成功
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 就算中斷了,也將節點入隊
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
* 這里就是signal之后發生的中斷
* 但是signal可能還在進行轉移中,這邊自旋等一下它完成
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
這里的話,我們還是稍微總結一下:
- await()中的節點中斷之后,被喚醒有多種情況:
- 無中斷的情況:signal方法成功將節點移入同步隊列且節點成功獲取資源,喚醒該線程,此時退出的時候interruptMode為0。
- 有中斷的情況:
- signal之前中斷,interruptMode設置為THROW_IE。
- signal之后中斷,interruptMode設置為REINTERRUPT。
- 中斷時,無論signal之前或之后,節點無論如何都會進入阻塞隊列。
處理中斷狀態
接下來三個部分我將一一說明:
// 第一部分
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 第二部分
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters(); // 清除取消的節點
// 第三部分
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
第一部分
signal喚醒的線程並不會立即獲取到資源,從while循環退出后,會通過acquireQueued方法加入獲取同步狀態的競爭中。
// 第一部分
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
acquireQueued(node, savedState)中node此時已經被加入同步隊列了,savedState是之前存儲的state。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; //
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued方法返回時,表示已經獲取到了鎖,且返回的是interrupted值,如果返回true,表示已經被中斷。
接着判斷interruptMode != THROW_IE表示是在signal之后發生的中斷,需要重新中斷當前線程,將interruptMode設置為REINTERRUPT。
第二部分
// 第二部分
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters(); // 清除取消的節點
前面說了,signal會將節點移到同步隊列中,最后一步需要和條件隊列斷開關系,也就是:node.nextWaiter = null,但這是想象中比較正常的情況,如果在signal之前被中斷,節點也會被加入同步隊列中,這時其實是沒有調用這個斷開關系的。
因此這邊做一點處理, unlinkCancelledWaiters()邏輯上面也說過了,可以回過頭去看看,主要是清除隊列中已經取消等待的節點。
第三部分
最后一個部分,就是對兩種interruptMode的情況進行處理,看看代碼就知道了:
void reportInterruptAfterWait(interruptMode)
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
// signal 之前的中斷, 需要拋出異常
if (interruptMode == THROW_IE)
throw new InterruptedException();
// signal 之后發生的中斷, 需要重新中斷
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
帶超機制的void await()
帶超時機制的await()方法有以下幾個,簡單看下即可:
- long awaitNanos(long nanosTimeout)
- boolean awaitUntil(Date deadline)
- boolean await(long time, TimeUnit unit)
我們選最后一個來看看,主要看看和之前await()方法不一樣的地方:
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
// 計算等待的時間
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
// 截止時間
final long deadline = System.nanoTime() + nanosTimeout;
// 表示是否超時
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 等待時間到了
if (nanosTimeout <= 0L) {
// 這個方法返回true表示在這個方法內,已經將節點轉移到阻塞隊列中
// 返回false,表示signal已經發生,表示沒有超時
timedout = transferAfterCancelledWait(node);
break;
}
//spinForTimeoutThreshold 是AQS中的一個字段,如果超過1ms,使用parkNonos
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 更新一下還需要等待多久
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
不拋出InterruptedException的await
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
// 相比await() 針對中斷少了拋出異常的操作,而是直接進行中斷
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
Condition的使用
最后以一個Java doc給的例子結尾吧:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
final Lock lock = new ReentrantLock();
// condition 依賴於 lock 來產生
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
// 生產
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await(); // 隊列已滿,等待,直到 not full 才能繼續生產
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // 生產成功,隊列已經 not empty 了,發個通知出去
} finally {
lock.unlock();
}
}
// 消費
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 隊列為空,等待,直到隊列 not empty,才能繼續消費
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // 被我消費掉一個,隊列 not full 了,發個通知出去
return x;
} finally {
lock.unlock();
}
}
}
其實這個之前也說過,ArrayBlockingQueue就是采用了這種方式實現的生產者-消費者模式,如果你感興趣,可以看看具體的實現細節哦。
總結
- Condition的await()和signal()基於Lock,相比於基於Object的wait()和notify()方法,它提供更加靈活的等待通知的機制。
- 支持豐富的功能如:帶超時機制的await(),不響應中斷的await(),以及多個等待的條件隊列。
- Condition的await()方法會將線程包裝為等待節點,加入等待隊列中,並將AQS同步隊列中的節點移除,接着不斷檢查
isOnSyncQueue(Node node),如果在等待隊列中,就一直等着,如果signal將它移到AQS隊列中,則退出循環。 - Condition的signal()方法則是先檢查當前線程是否獲取了鎖,接着將等待隊列中的節點通過Node的操作直接加入AQS隊列。線程並不會立即獲取到資源,從while循環退出后,會通過acquireQueued方法加入獲取同步狀態的競爭中。
參考閱讀
- java Condition源碼分析
- 一行一行源碼分析清楚 AbstractQueuedSynchronizer (二)
- 【死磕 Java 並發】—– J.U.C 之 Condition
- 方騰飛:《Java並發編程的藝術》
- DougLea : 《Java並發編程實戰》
