前兩期我們已經掌握了AQS的基本結構、以及AQS是如何釋放和獲取資源的。其實到這里,我們已經掌握了AQS作為同步器的全部功能
不過,有些情況使用同步功能不夠靈活,所以AQS又引入了操作系統中的另一個高度相關的概念——條件變量。由於條件變量的使用緊密依賴於AQS提供的釋放、獲取資源功能和同步隊列,因此都放在了AQS源碼中
能堅持看到這里的同學已經很不容易了,再接再厲,一起沖掉最后一座堡壘吧🦾🦾🦾
簡介
條件變量是什么
條件對象這一概念源自於操作系統,設計它是為了解決等待同步需求,實現線程間協作通信的一種機制。Java其實也已經內置了條件變量,它和監視器鎖是綁定在一起的,即Object
的wait
和notify
方法,使用這兩個方法就可以實現線程之間的協作
Java中的條件變量直到Java 5才出現,用它來代替傳統的Object
的wait
和notify
方法。相比wait
和notify
,Condition
的await
和signal
方法更加安全和高效,因為Condition
是基於AQS實現的,加鎖、釋放鎖的效率更高
條件變量顧名思義就是表示某種條件的變量。不過需要說明的是,條件變量中的條件並沒有實際含義,僅僅只是一個標記,條件的含義需要代碼來賦予
Condition接口
Condition
是一個接口,條件變量都實現了Condition
接口。該接口的基本方法是await
、signal
、signalAll
這些
不過Condition
依賴於Lock
接口,需要借助Lock.newCondition
方法來創建條件變量。因此Condition
必然是和某個Lock
綁定在一起的,這就和await
和signal
一定會和Object
的監視器鎖綁定在一起。因此,Java中的條件變量必須配合鎖使用
Condition
和Java內置的條件變量方法之間的對應關系如下:
Condition.await
等價於Object.wait
Condition.signal
等價於Object.notify
不多BB了,直接看源碼吧~
AQS之條件對象
AQS為條件變量的實現提供了95%以上的功能,Lock
接口實現類一般只需要實現一下newCondition
方法,以及AQS的isHeldExclusively
方法,就可以直接使用條件變量了,你說方不方便!
AQS實現的方式就是直接提供了Condition
的一個實現類——ConditionObject
,我接下來就將其稱為條件對象(可能不太嚴謹)。ConditionObject
是AQS的內部類,可見性為public
條件對象的結構
每個ConditionObject
都維護了一個條件隊列,首尾節點分別由firstWaiter
、lastWaiter
兩個域來管理:
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
每個在條件隊列中等待的線程都由Node
類來維護,它們之間通過Node
類的nextWaiter
相連,形成一個單向鏈表。每個Node
的waitStatus
都是Node.CONDITION
,表明該線程正在某個條件變量的條件隊列中等待ing~
條件對象的創建
十分的朴實無華且簡單通透,要是所有代碼都這么簡單,那該有多好啊~
public ConditionObject() { }
正如前面所說,如果基於AQS實現的Lock
接口實現類想要使用AQS提供的條件變量,只需要實現newCondition
方法、isHeldExclusively
方法即可。而實現newCondition
方法一般只需要直接調用ConditionObject
的構造方法即可,多么簡單!
isHeldExclusively
方法會在條件喚醒(signal
、signalAll
)中被調用,因為只有在持有互斥資源的情況下,才允許使用條件變量,在持有共享資源的情況下是不允許使用的!
接下來主要剖析一下ConditionObject
是如何實現Condition
接口的兩大重要功能——條件等待、條件喚醒
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任
條件等待
await
await
方法就是最基本的條件等待,它是可以被中斷的。源碼如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 能夠離開while循環,說明被signal或被中斷了,而且在同步隊列中,所以可以調用acquireQueued
// 而且之前釋放了多少資源,現在就要還原回來,因此獲取的參數是之前保存的state
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter
方法會添加一個新節點到等待隊列的隊尾,該節點的waitStatus
是CONDITION
。源碼如下:
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果lastWaiter被取消(CANCELLED),就移除它以及前面所有被取消的節點
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION); // 為當前線程創建一個Node,waitStatus為CONDITION
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
unlinkCancelledWaiters
會將所有被取消的Node
從條件隊列中移除,該方法只會在持有鎖的情況下才會被調用,源碼如下:
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == nul)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
unlinkCancelledWaiters
相當於是進行了一次正向遍歷,將那些waitStatus
不為CONDITION
(即為CANCELLED
)的Node
給移除,挺簡單的~
回到await
方法,接下來調用fullyRelease
方法保存當前的state
,並調用一次release
,最后返回保存的state
。源碼如下:
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED; // 如果失敗就會將節點取消
}
}
調用release
就是為了完全釋放當前線程所持有的資源,防止死鎖。如果釋放失敗,說明當前線程壓根沒有獲取到資源就調用了await
方法,這是不允許的,會拋出IllegalMonitorStateException
異常。這也解決了我之前的疑惑——如何保證調用unlinkCancelledWaiters
之前必須持有鎖呢? 這里就給出了答案!只有在當前線程持有資源的前提下,代碼才能正常執行下去
回到await
方法,接下來調用isOnSyncQueue
方法來檢查當前線程對應的Node
在同步隊列上還是條件隊列上,源碼如下:
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
該方法會先看目標node
的waitStatus
是否為CONDITION
,或者它的prev
域是否為空,如果是,那么它一定在條件隊列上,返回false;如果目標node
的next
域不為空,那么它一定在同步隊列上,返回true。如果以上兩種快捷判斷方法無法判定,那么需要調用findNodeFromTail
進行暴力判定。總之,如果目標node在條件隊列上等待,則返回true,否則說明在同步隊列上,返回false
await
中的while(!isOnSyncQueue)
語義:如果當前節點所等待的條件沒有滿足,那么說明它還在條件隊列上等待,就會一直被困在while循環中不停地被阻塞,不能離開while循環去競爭獲取資源
findNodeFromTail
方法會從tail
開始向前遍歷,以確定目標node
是否位於同步隊列上,很簡單哦~其源碼如下:
// 此方法只會被isOnSyncQueue方法調用
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
回到await
方法,只有當node
在條件隊列上等待,才會執行while
循環。在循環中,將當前線程(即node
對應的線程)阻塞。如果被喚醒,就調用checkInterruptWhileWaiting
方法檢查是否在阻塞過程中被中斷,如果發生了中斷,該方法會返回非0值。while
循環中如果發現被中斷,則退出循環
checkInterruptWhileWaiting
方法即其調用到的transferAfterCancelledWait
方法,它們的源碼如下:
/*
* 不同返回值的含義:
* 0:沒有發生中斷
* THROW_IE(-1):中斷發生在被signal之前
* REINTERRUPT(1):中斷發生在被signal之后
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/*
* transferAfterCancelledWait有兩個作用:
* 1、將目標node放到同步隊列上
* 2、檢測node是否已經被signal,true表示還未被signal
final boolean transferAfterCancelledWait(Node node) {
// 如果CAS成功,說明該node還未接收到signal信號
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
// 如果上面if中的CAS失敗,說明node已經被signal
// 那么就等待signal中的邏輯將node移動到同步隊列上去,這里什么也不做,自旋等待即可
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
總之,如果線程在阻塞過程中沒有發生中斷,則checkInterruptWhileWaiting
返回0,否則返回非0值
具體來說,如果發生中斷,就會調用transferAfterCancelledWait
方法將node
轉移到同步隊列,並檢測中斷是發生在被signal
之前還是之后。如果發生在被signal
之前則返回THROW_IE(-1),如果發生在被signal
之后則返回REINTERRUPT(1)
回到await
方法的while
循環中,如果沒有被signal
或被中斷就會一直阻塞。如果被signal
就不滿足while
條件,會退出循環;如果被中斷而不是被signal
,會直接break
離開while
循環
退出while
循環說明此時已經位於同步隊列(要么因為被signal
而transfer到同步隊列,要么因為被中斷而調用transferAfterCancelledWait
方法移動到同步隊列),可以直接調用acquireQueued
方法(調用該方法的前提:位於同步隊列中)排隊獲取鎖。獲取雖然可能導致再次被阻塞,但這里是在同步隊列上的阻塞,而不是在條件隊列上的阻塞
退出處理是通過reportInterruptAfterWait
方法,具體如何處理,取決於interruptMode
的值。源碼如下:
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
如果interruptMode
是THROW_IE,那么說明該線程並未真正收到signal
信號,只是因為被中斷而被喚醒,所等待的條件並不一定被滿足,於是拋出中斷異常;如果是REINTERRUPT
,調用selfInterrupt
方法設置線程的中斷狀態
總的來說,await
執行分為6步:
1、如果當前線程中斷,那么拋出中斷異常
2、為當前線程創建Node
,並加入該條件變量的條件隊列隊尾
3、獲取並保存下當前state
,並使用保存的state
來調用release
方法,如果失敗則拋出IllegalMonitorStateException
異常
4、阻塞直到被signal
或被中斷
5、利用保存的state
,調用acquireQueued
方法重新獲取狀態
6、如果第四步期間被中斷,則拋出中斷異常
超時await
await
方法有一個重載版本,可以設置超時參數,如果超時也會導致等待停止。其源碼如下:
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
帶**超時參數**的`await`和無參數`await`基本差不多,執行一共分6步,不過最后一步會**返回是否超時**: 1、如果當前線程中斷,那么拋出中斷異常 2、為當前線程創建`Node`,並加入該條件變量的條件隊列 隊尾 3、獲取並保存下當前`state`,並使用保存的`state`來調用`release`方法,如果失敗則拋出`IllegalMonitorStateException`異常 4、阻塞直到被`signal`或被中斷,或**超時** 5、利用保存的`state`,調用`acquireQueued`方法重新獲取狀態 6、如果第四步期間被中斷,則拋出中斷異常;如果第四步是**因為超時而停止阻塞**,則返回false,否則返回true
awaitUninterruptibly
調用awaitUninterruptibly
方法在等待過程中,不會因為中斷而停止等待。源碼如下:
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
`awaitUninterruptibly`執行分4步: 1、為當前線程創建`Node`,並加入該條件變量的條件隊列 隊尾 2、獲取並保存下當前`state`,並使用保存的`state`來調用`release`方法,如果失敗則拋出`IllegalMonitorStateException`異常 3、阻塞直到被`signal` 4、利用保存的`state`,調用`acquireQueued`方法重新獲取狀態
awaitNanos
awaitNanos
最多等待一定時間,可以被中斷。其源碼如下:
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 如果超時,則將node從條件隊列轉移到同步隊列上去,並退出while循環
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
`awaitNanos`執行分6步: 1、如果當前線程中斷,那么拋出中斷異常 2、為當前線程創建`Node`,並加入該條件變量的條件隊列 隊尾 3、獲取並保存下當前`state`,並使用保存的`state`來調用`release`方法,如果失敗則拋出`IllegalMonitorStateException`異常 4、阻塞直到被`signal`或被中斷或**超時** 5、利用保存的`state`,調用`acquireQueued`方法重新獲取狀態 6、如果第四步期間被中斷,則拋出中斷異常;最后**返回距離超時還剩多少納秒**
awaitUntil
類似於awaitNanos
,會返回是否是因為超時而終止等待。源碼如下:
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
`awaitUntil`和`awaitNanos`基本差不多,執行一共分6步,不過最后一步會返回是否超時: 1、如果當前線程中斷,那么拋出中斷異常 2、為當前線程創建`Node`,並加入該條件變量的條件隊列 隊尾 3、獲取並保存下當前`state`,並使用保存的`state`來調用`release`方法,如果失敗則拋出`IllegalMonitorStateException`異常 4、阻塞直到被`signal`或被中斷或超時 5、利用保存的`state`,調用`acquireQueued`方法重新獲取狀態 6、如果第四步期間被中斷,則拋出中斷異常;如果第四步是 因為超時而停止阻塞,則返回false,否則返回true
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任
條件喚醒
signal
signal
用於喚醒某個等待在條件隊列的線程。源碼如下:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
首先調用isHeldExclusively
方法來判斷當前線程是否持有互斥資源(isHeldExclusively
方法只會在signal
或signalAll
中才會被調用),如果沒有則拋出IllegalMonitorStateException
異常。接下來調用doSignal
方法將等待最久(隊首)的線程從條件隊列轉移到同步隊列
這里說明一下,只有當線程持有互斥資源時,才支持使用條件變量。關於這點,可以參考全網最詳細的ReentrantReadWriteLock源碼剖析(萬字長文)
其中,寫鎖支持條件變量,而讀鎖不支持。因為寫鎖是互斥資源,而讀鎖是共享資源。原因也可以參見這篇博客里的解析,這里不再贅述
doSignal
方法會從目標node
(一般都是隊首)開始,將遇到的第一個未被取消的線程從條件隊列移除,並調用transferForSignal
方法將其放到同步隊列隊尾去等待獲取資源。源碼如下:
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
transferForSignal方法不僅會判斷目標node是否被取消,也會將目標node從條件隊列移動到同步隊列。源碼如下:
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 如果CAS失敗,說明node已經被取消了,直接返回false
return false;
Node p = enq(node); // 入隊,返回node的前驅節點p
int ws = p.waitStatus;
// 入隊之后希望將前驅節點置為SIGNAL,表明node的線程正在苦等獲取資源
// 如果前驅節點已被取消,或CAS修改前驅的waitStatus失敗,就干脆直接將node的線程喚醒,不多bb
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
總之,`signal`方法會將 最早的(非取消)線程移動到同步隊列去排隊獲取資源
signalAll
signalAll
方法會將條件隊列中所有的線程都移動到同步隊列,讓它們去獲取資源。其源碼如下:
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
該方法會調用doSignalAll
方法將條件隊列中所有的線程都移除,並放入同步隊列。源碼如下:
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null; // 將條件隊列清空
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
總之,`signalAll`方法會將條件隊列中 所有的線程都移動到同步隊列去排隊獲取資源
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任
條件變量的應用
實現生產者-消費者模型
使用上面的非可重入鎖NonReentrantLock
的條件變量來實現簡單的生產者-消費者模型(單生產者、單消費者),實現代碼如下:
點擊查看代碼
public class ProducerConsumerModel {
private static final NonReentrantLock lock = new NonReentrantLock();
private static final Condition notFull = lock.newCondition();
private static final Condition notEmpty = lock.newCondition();
private static final Queue<String> queue = new LinkedList<>();
private static final int queueLength = 10; // 隊列長度
public static void main(String[] args) {
Thread producer = new Thread(() -> {
int i = 0;
while (true) {
lock.lock();
try {
while (queue.size() == queueLength) {
System.out.println("Queue is full!");
notFull.await();
}
Thread.sleep(2000);
System.out.println("Produce product: " + "product" + i);
queue.add("product" + i);
++i;
notEmpty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
});
Thread consumer = new Thread(() -> {
while (true) {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println("Queue is empty!");
notEmpty.await();
}
Thread.sleep(2000);
String product = queue.poll();
System.out.println("Consume product: " + product);
notFull.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
});
producer.start();
consumer.start();
}
}
好了,AQS系列共三期,到此為止,Bye~
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(一)AQS基礎
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(二)資源的獲取和釋放
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(三)條件變量