上期的《全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(一)AQS基礎》中介紹了什么是AQS,以及AQS的基本結構。有了這些概念做鋪墊之后,我們就可以正式地看看AQS是如何通過
state
(以下也稱資源)和同步隊列,實現線程之間的同步功能了
那么線程之間是如何同步呢?其實就是通過資源的獲取和釋放來進行同步。如果獲取到就繼續運行,獲取不到就放入同步隊列阻塞等待,釋放就是交出獲得的資源,並釋放同步隊列中需要被喚醒的線程。對,就是這么簡單!
本篇我們繼續深入AQS內部,一起來看看線程是怎么利用AQS來獲取、釋放資源的~
獲取資源
AQS獲取資源是通過各種acquire
方法。不同acquire
方法之間存在區別,如下:
acquire
:以互斥模式獲取資源,忽略中斷acquireInterruptibly
:以互斥模式獲取資源,響應中斷acquireShared
:以共享模式獲取資源,忽略中斷acquireSharedInterruptibly
:以共享模式獲取資源,響應中斷
獲取互斥資源
忽略中斷的acquire方法
acquire
方法是獲取互斥資源,忽略中斷。如果獲取成功,直接返回,否則該線程會進入同步隊列阻塞等待。源碼如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire
是一個模板方法,定義為final方法防止子類重寫。其中的鈎子方法tryAcquire
需要子類去實現。
如果tryAcquire
返回true,說明嘗試獲取成功,直接返回即可。如果tryAcquire
返回false,說明嘗試獲取失敗,會調用addWaiter
方法進入等待隊列。該方法的解析見上一篇博客全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(一)AQS基礎
執行完addWaiter
方法后,該線程就處於同步隊列中了(queued),接下來就會調用acquireQueued
方法
acquireQueued
方法為一個已經位於同步隊列的線程,以互斥模式獲取資源,不響應中斷但是會記錄中斷狀態。源碼如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 獲取node的前一個節點
if (p == head && tryAcquire(arg)) { // 如果p是head,說明node是隊列頭,可以競爭資源
setHead(node); // 將node出隊
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在acquireQueued
方法代碼主要都包含在一個for
循環中。如果發現node
是隊首節點,就會再次嘗試獲取資源。如果此時獲取成功,就直接出隊並返回,不用阻塞等待,這里體現了同步隊列先進先出的特點
如果不是隊首節點,或者是再次嘗試獲取資源又雙叒叕失敗了,則調用shouldParkAfterFailedAcquire
方法判斷當前線程是否應該被阻塞
shouldParkAfterFailedAcquire
方法會檢查當前線程是否應該被阻塞,如果是就返回true,否則返回false。其源碼如下:
// 調用此方法必須保證pred是node的直接前驅,即node.prev == pred
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
// 如果前面的Node都被cancel了,那么就跳過這些Node
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
只有當node
的直接前驅節點等待狀態waitStatus
是SIGNAL
時,才會認為該線程應該被阻塞。否則還需要回到acquireQueued
的for
循環中重新檢查,不會立即阻塞
我畫了一張shouldParkAfterFailedAcquire
的執行流程圖,如下:

那么會不會有一種可能:
shouldParkAfterFailedAcquire
方法一直返回false,始終認為該線程不應該阻塞,那么該線程就會一直占用CPU資源,“忙等”
其實一般來說是不會的,原因見上面示意圖中的紫色文字部分
再回到acquireQueued
方法中,如果shouldParkAfterFailedAcquire
判斷該線程,並返回了true,就需要執行parkAndCheckInterrupt
將該線程阻塞,源碼如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
在parkAndCheckInterrupt
中借助了工具類LockSuppport
將線程阻塞。阻塞過程中如果該線程被設置了中斷狀態,雖然中斷不會導致阻塞立即被喚醒,但是線程的中斷狀態會被記錄下來,並作為該方法的返回值
總體來說,acquireQueued
方法的執行流程如下圖所示:

再回到acquire
方法中。如果acquire
失敗而阻塞等待的過程中被中斷,那么等它被喚醒並成功獲得資源之后,會立即調用setInterrupt
方法設置線程的中斷狀態。setInterrupt
的源碼如下:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
最后補充一點,acquire
方法除了會在線程獲取互斥資源時被調用,也會被條件等待方法await
方法調用,具體分析見本系列最后一期博客全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(三)條件變量
響應中斷的acquireInterruptibly方法
acquireInterruptibly
用於獲取互斥資源。顧名思義,這個方法響應中斷,即如果在調用過程中發生了中斷,會拋出中斷異常,中止資源的獲取。其源碼如下:
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
acquireInterruptibly
方法首先會檢查中斷狀態,如果沒有發生中斷,才會繼續向下執行,否則拋出中斷異常
接下來執行鈎子方法tryAcquire
,如果獲取成功則直接返回,否則獲取失敗,執行doAcquireInterruptibly
方法:
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireInterruptibly
會先調用addWaiter
方法,將當前線程加入隊尾。之后的邏輯和acquireQueued
類似,就是在for
循環中,先判斷當前節點是否是頭節點,如果是則再次嘗試獲取資源。如果不是隊首或者獲取失敗,則調用shouldParkAfterFailedAcquire
方法判斷該線程是否應該被阻塞。如果不是就進入下一輪循環。如果需要被阻塞,則調用parkAndCheckInterrupt
方法將其阻塞。如果阻塞過程中發生中斷,則當該線程被喚醒后回到doAcquireInterruptibly
中,會拋出中斷異常,並調用cancelAcquire
執行取消節點的邏輯
doAcquireInterruptibly
和acquireQueued
的區別有兩點:
acquireQueued
調用之前,當前線程就已經被放入同步隊列;而doAcquireInterruptibly
沒有,需要自己調用addWaiter
方法acquireQueued
中不會因發生中斷而拋出中斷異常、取消節點,只會記錄是否發生中斷並返回;而doAcquireInterruptibly
會響應中斷,拋出中斷異常,並取消該線程對應的節點
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任
獲取共享資源
忽略中斷的acquireShared方法
acquireShared
是以共享模式獲取資源,並且忽略中斷。源碼如下:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
該方法首先會調用鈎子方法tryAcquireShared
嘗試獲取共享資源,如果獲取成功則直接返回,否則獲取失敗,調用doAcquireShared
方法:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) { // 表示tryAcquireShared獲取成功
// 設置head,並判斷是否需要喚醒后繼線程。如果需要則喚醒,並保證傳播
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這里也會調用addWaiter
將當前線程加入同步隊列,不過這里的Node
是共享模式(Node.SHARED
)
在接下來的for
循環中,如果當前線程位於隊首,則再次嘗試獲取資源。如果獲取成功,則調用setHeadAndPropagate
方法,處理中斷之后返回
其中setHeadAndPropagate
方法的作用是彈出隊頭,並檢測其后繼節點是否需要被喚醒,如果需要的話就喚醒,並確保傳播。源碼如下;
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
// 這個復雜的if條件判斷就是用於判斷:后繼節點的線程是否要被喚醒
// propagate > 0 表示允許后續節點繼續獲取共享資源
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); // 喚醒后繼的共享模式的線程,並確保狀態傳播下去
}
}
在共享模式下,一個線程獲取資源成功后,可能會引起后繼等待獲取共享資源的線程。注意,這里是后繼而非同步隊列中所有后面的。在這一點上,不同於互斥資源的獲取,共享資源的獲取更像是一人得道,雞犬升天
如果在setHeadAndPropagate
中發現存在后繼線程需要被釋放,則調用doReleaseShared
方法將它釋放,並確保傳播,它也是releaseShared
方法的核心,該方法會在后面講解釋放共享資源時給出解析,這里暫時不分析
確保傳播的含義:
保證被喚醒的線程可以繼續喚醒它的后繼線程。如果每個線程都能確保傳播,那么所有應該被釋放的后繼線程都能得到釋放(類似於遞歸釋放)
總的來說,acquireShared
的流程與acquire
基本一致,最大的區別在於:獲取共享資源成功后,可能需要喚醒后繼的多個線程。而獲取互斥資源成功后,不需要喚醒其他任何線程
響應中斷的acquireSharedInterruptibly方法
acquireSharedInterruptibly
方法用於獲取共享資源,但是該方法會響應中斷,即在獲取過程中接收到中斷信號,會拋出中斷異常。其源碼如下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
和acquireInterruptibly
一樣,acquireSharedInterruptibly
也會先檢查線程的中斷狀態是否已經被設置。如果設置則直接拋出中斷異常
接下來會調用鈎子方法tryAcquireShared
嘗試獲取共享資源,獲取成功則直接返回,獲取失敗就會調用doAcquireSharedInterruptibly
方法:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
不多解釋,直接上圖吧!下面是doAcquireSharedInterruptibly
方法的執行流程圖:

doAcquireSharedInterruptibly
方法和doAcquireShared
方法大體上差不多,區別僅在於前者響應中斷並會拋出中斷異常,而后者忽略中斷,只記錄中斷狀態並返回
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任
釋放資源
AQS釋放資源是通過各種release
方法。不同release
之間存在區別,如下:
release
:以獨占模式釋放對象releaseShared
:以共享模式釋放對象
這些釋放資源的方法都不存在響應中斷的區別,都是忽略中斷的,因為線程在釋放資源的時候被中斷可能引起意外的錯誤
釋放互斥資源
AQS使用release
方法釋放互斥資源,源碼如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
該方法會先調用鈎子方法tryRelease
,如果釋放失敗則直接返回false,如果釋放成功,則調用unparkSuccessor
方法喚醒隊首線程,並返回true
unparkSuccessor
方法是喚醒線程的主要邏輯。源碼如下:
private void unparkSuccessor(Node node) {
// 如果status < 0(表明可能需要signal),先清除狀態(設為0)
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 一般來說后繼需要unpark的節點就是next節點
// 但是如果next被cancel或為null,則需要從后向前遍歷,直到找到有效的后繼節點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
該方法的作用是喚醒node
的有效后繼節點。有效指的是跳過那些被cancel的節點。 由於同步隊列是FIFO的,所以node
一定是head
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任
釋放共享資源
releaseShared
用於釋放共享資源,源碼如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
該方法首先調用鈎子方法tryReleaseShared
嘗試釋放資源,如果失敗則直接返回false,如果成功則執行doReleaseShared
方法喚醒后繼的其他共享模式線程同時確保傳播,最后返回true
doReleaseShared
方法在前面的acquireShared
-> setHeadAndPropagate
中出現過,該方法的作用是在共享模式下喚醒后繼線程,並確保傳播。其源碼如下:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 如果需要,則喚醒后繼線程,同時設置waitStatus為0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); // 喚醒后繼線程
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 設置PROPAGATE狀態,保證喚醒可以傳播下去
continue; // loop on failed CAS
}
// 如果上述的執行過程沒有被別的線程打擾,那就退出,否則重新loop
if (h == head) // loop if head changed
break;
}
}
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任
AQS的應用
AQS的應用就不用我多吹了吧,那些個JUC里面的大名鼎鼎的可重入鎖、讀寫鎖,底層實現都是基於AQS
如果想要自己使用AQS實現某個並發工具,也很簡單,只需要繼承AQS,並實現一些特定方法即可~
繼承AQS的注意點
- 如果要使用AQS中的互斥資源同步方法,需要手動實現
tryAcquire
和tryRelease
方法 - 如果要使用AQS中的共享資源同步方法,需要手動實現
tryAcquireShared
和tryReleaseShared
方法 - 如果要使用AQS中的條件變量,需要實現
isHeldExclusively
方法
應用實踐:實現非可重入鎖
非可重入鎖NonReentrantLock
定義了一個內部工具類Sync
實現關於鎖的操作,而Sync
則繼承了AQS。實現的代碼如下:
點擊查看代碼
public class NonReentrantLock implements Lock {
private final Sync sync = new Sync();
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (arg != 1) {
throw new IllegalArgumentException();
}
if (compareAndSetState(0, 1)) {
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (arg != 1) {
throw new IllegalArgumentException();
}
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
Condition newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
最后來做個總結:
AQS針對互斥資源、共享資源的獲取和釋放,提供了不同的方法。而獲取資源的方法也可以分為響應中斷和忽略中斷,釋放資源都是忽略中斷的
AQS正是通過資源 (state
)的釋放和獲取,配合同步隊列讓線程排隊等待,以FIFO的方式讓競爭資源失敗的線程阻塞、喚醒
這些釋放、獲取方法都是AQS提供給子類去調用的模板方法,其中的一些關鍵步驟均設計為了鈎子方法,讓子類可以個性化定制
正是有了AQS這個強大的后盾,才能誕生出那么多實用的並發同步工具類。不得不說,AQS是真的牛啊
好了,能看到這里的讀者,相信已經掌握了AQS的基本結構,以及AQS是獲取、釋放資源的原理
我這里其實並沒有剖析所有AQS提供的資源獲取方法,還有兩個可超時方法tryAcquireNanos
、tryAcquireSharedNanos
沒有分析,但是基本上和其他獲取資源方法是類似的,只是多了一個超時而取消的邏輯,感興趣的讀者可以打開AQS源碼自己分析
接下來的就是AQS的最后一篇了,我們來看看AQS里面的條件隊列是怎么實現的
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(一)AQS基礎
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(二)資源的獲取和釋放
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(三)條件變量