AQS(AbstractQueuedSynchronizer), 可以說的誇張點,並發包中的幾乎所有類都是基於AQS的。
一起揭開AQS的面紗
1. 介紹
為依賴 FIFO阻塞隊列 的阻塞鎖和相關同步器(semaphores, events等)的實現提供一個框架。 為那些依賴於原子state的同步器提供基礎(CyclicBarrier、CountDownLatch等). 支持獨占模式和共享模式, 不同的模式需要實現不同的方法.
引用這位大佬的圖 http://www.cnblogs.com/waterystone/p/4920797.html

這個圖是AQS整體結構,從圖中可以看到,AQS維護着一個阻塞隊列(當線程獲取資源失敗時,就會進入該隊列,等待,直到被喚醒), state是一個共享的資源。
2. 源碼剖析
我們先看看AQS的類圖,

內部類: Node,阻塞隊列維護的元素;ConditionObject, 支持獨占模式下的子類用作Condition實現, 后面會講到。先看看Node的結構。
static final class Node {
/** 表明阻塞一個共享模式的結點 */
static final Node SHARED = new Node();
/** 表明阻塞一個獨占模式的結點 */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
// 發生的情況: 中斷或者超時
static final int CANCELLED = 1;
/** 表明后繼節點需要被喚醒 */
// 發生的情況: 后繼結點被park()阻塞,當目前的結點釋放或取消,必須要unpark它的后繼結點
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
// 發生的情況: 當前結點在條件隊列中(后面會講解)
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
// 發生的情況: 執行releaseShared后,需要傳播釋放其他結點。
// 在doReleaseShared中進行設置。(后面講解這個狀態的必要性)
// 在共享模式下,才會用到
static final int PROPAGATE = -3;
/** 結點的狀態, 初始值為0*/
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
....
}
tip: waitStatus > 0, 即 CANCELLED, 此時的結點不正常。
AQS使用了模板模式, 自主選擇重新定義以下方法
- tryAcquire - 獨占模式
- tryRelease - 獨占模式
- tryAcquireShared - 共享模式
- tryReleaseShared - 共享模式
- isHeldExclusively
調用這些方法,都會引發UnsupportedOperationException,后面的文章將通過其子類,來講解它們的實現。
有了這些知識后,我們從下面這幾個關鍵的共有方法入手去講解AQS
- acquire(int arg) - 獨占模式
- release(int arg) - 獨占模式
- acquireShared(int arg) - 共享模式
- releaseShared(int arg) - 共享模式
2.1 acquire
獨占模式下的獲取資源,忽略中斷。調用tryAcquire至少一次,若成功就返回。否則,將線程入隊,並可能反復阻塞和接觸阻塞,並調用tryAcquire直至成功。此方法可以用於實現 Lock.lock
public final void acquire(int arg) {
// (2.1.1)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire在后面文章結合子類進行分析。
代碼(2.1.1)中,此時調用了tryAcquire,獲取資源失敗,返回false,繼續執行后續方法。
addWaiter -- Queuing utilities
使用當前線程和給定mode, 新建一個Node,並且將新建Node入隊
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// (2.1.2) 記住這個地方, 后面有個知識點會用到
node.prev = pred;
// (2.1.3)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 插入node入隊,初始化如果有必要
// 執行到這的情況:
// 1. 阻塞隊列為空時,即tail == null
// 2. 代碼(2.1.3), CAS失敗
enq(node);
return node;
}
enq -- Queuing utilities
一直循環直到node入隊成功
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 初始時,head與tail都為空,需要新建一個哨兵結點
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 插入結點
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
經過了上面的操作,目前的線程已經加入了隊尾,此時做的事情就是阻塞自己,等待資源釋放並且獲取,然后執行自己的操作
acquireQueued
以獨占且不可中斷的模式,獲取已經在阻塞隊列中的線程。若在等待時被中斷,返回true
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 不斷自旋直到獲取到資源或被park
for (;;) {
final Node p = node.predecessor();
// 若node的前繼結點是head,執行tryAcquire,嘗試獲取資源(可能剛好釋放了資源,就可以不用阻塞)
// (2.1.4)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 檢測是否需要被park,若需要就進行park,並返回在等待時是否被中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 失敗,發生的情況 1. 被中斷 2. 超時.(其他方法調用該方法時,會發生)
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire -- Utilities for various versions of acquire
對獲取資源失敗的node,檢測並獲取結點。返回true,如果線程需要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驅結點的狀態為SIGNAL,此時就可以安心的park,等待前驅節點釋放資源,然后喚醒自己
if (ws == Node.SIGNAL)
return true;
// 此時ws為CANCELLED
if (ws > 0) {
/*
* 前驅結點狀態為CANCELLED,跳過前驅結點並重試, 直到前驅節點不為CANCELLED
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus此時的值要么是0要么是PROPAGATE.
* 需要把前驅結點狀態設置為SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
cancelAcquire -- Utilities for various versions of acquire
取消正在進行的獲取資源的嘗試
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 獲取過濾后的前驅的后驅節點
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// node為tail,就直接從隊列中刪除
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 1. 前驅結點不是頭結點,該結點不是老二
// 2. 滿足以下任意一個條件:
// 2.1 此時的前驅結點狀態為SIGNAL
// 2.2 此時的前驅結點狀態為PROPAGATE或CONDITION,成功將狀態設置為SIGNAL
// 3. 前驅節點的任務不為空
// 滿足上面的條件,就將節點的前驅結點的next 指向 節點的后驅結點
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 此時結點剛好是老二;
// 代碼(2.1.4) 可以看出,頭結點要么是哨兵結點,要么是已經獲取到資源的結點。
// 此時喚醒node的后驅結點,是為了防止后驅結點一直阻塞
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
unparkSuccessor
后驅結點存在一個在正在等待的結點,則喚醒它
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 將當前的結點的waitStatus設置為0,失去SIGNAL、PROPAGATE的含義
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// (2.1.5)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// [問題一] 為什么node的后驅結點為空,重新尋找是從后往前找
// 只要waitStatus <= 0, 都有機會被喚醒
for (Node t = tail; t != null && t != node; t = t.prev)
// (2.1.6)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
2.1.1 acquire流程圖
根據上面的分析,整一個流程圖

2.1.2 小結
根據acquire流程圖,一句話小結其流程,嘗試獲取資源,失敗則將新建node(當前線程及獨占模式)入隊,檢測自己是否是老二,是老二就再一次嘗試獲取資源,成功就返回中斷標志,不是老二就設置為SIGNAL,park自己,然后安心等待被喚醒。
2.2 release
獨占模式下的釋放資源。解除阻塞一個或多個線程,當tryRelease返回true時。此方法可以用於實現 Lock.unlock
public final boolean release(int arg) {
// tryRelease返回true,繼續下面操作。
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
2.2.1 release流程圖
因為release(int arg)的主要流程是在tryRelease和unparkSuccessor中,但是tryRelease又是在子類中實現,所以該流程圖也可以看作unparkSuccessor的流程圖

2.2.2 小結
根據release流程圖, 一句話小結其流程, 釋放資源,喚醒后驅沒有被取消的結點。
下面講講AQS的另一種模式,共享模式
2.3 acquireShared
共享模式下獲取資源,忽略中斷。至少調用tryAcquireShared一次,成功就返回。否則,線程將入隊,可能會重復的阻塞和解除阻塞,直到調用tryAcquireShared成功。成功獲取到資源,將會喚醒后驅結點,若資源滿足。
public final void acquireShared(int arg) {
// 在子類探究tryAcquireShared
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
對tryAcquireShared返回的參數,進行簡單的介紹
- 返回負數表示失敗;
- 返回零,隨后的線程都不能獲取到資源
- 返回正數,隨后的線程可以獲取到資源
此時tryAcquireShared的返回值是小於零,表示獲取資源失敗,進行下一步處理
doAcquireShared
獲取資源在不可中斷的模式下
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// 若前驅結點是頭結點且剛好釋放了,嘗試獲取資源
// (2.3.1)
if (r >= 0) {
// 將當前node設置為head,並且嘗試喚醒node的后驅結點
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 跟獨占模式的處理是一樣的
// (2.3.2)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate
設置隊列的頭結點,達到條件就喚醒后面的結點.
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// tips: h == null、(h = head) == null、s == null,是為了防止空指針的發生
// 執行到下面的情況:
// 1. propagate是tryAcquireShared返回的值。propagate(資源) > 0, 表示還有資源可以喚醒后面的結點。
// 否則,此時propagate = 0, 結合代碼(2.3.1)
// 2. 舊的head的waitStatus < 0
// 舊的頭結點釋放了資源,執行了代碼(2.3.4), 此時的waitStatus 為PROPAGATE(初始化為0)
// 3. 此時的head已經是當前結點了,后面若有結點(此時后面的結點在park),
// 那么新head的waitStatus肯定為SIGNAL, 結合代碼(2.3.2)
// 情況3,有可能會發生沒必要的喚醒,因為此時去喚醒新head的后驅結點,但是此時還沒
// 有釋放資源,它后驅結點喚醒后,去獲取資源,獲取失敗,又被park.
// 源碼注釋說到,雖然有爭議,但是大多數情況下,需要去喚醒
// (2.3.3)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
代碼(2.3.3)中, 為什么不只用propagate來判斷是否喚醒后驅結點 [問題二]
doReleaseShared
共享模式下主要的釋放資源的邏輯,喚醒后驅結點,確保線程不被掛起
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// (2.3.4)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// (2.3.5)
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
2.3.1 acquireShared的流程圖

2.3.2 小結
一句話小結acquireShared的流程,嘗試獲取資源,若獲取到資源,資源還有剩余就去繼續喚醒后驅結點,若嘗試獲取資源失敗,就park自己,等待被喚醒。 跟acquire相比,最大的區別就是,獲取到資源acquireShared,還會去嘗試喚醒其后驅結點
2.4 releaseShared
Releases in shared mode. Implemented by unblocking one or more threads if tryReleaseShared returns true.
// 代碼比較簡單,就不分析了~
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
2.5 問題解答
[問題一] 為什么在喚醒后驅結點時,node的后驅結點為空,需要重新從后往前找
// (2.1.2) 記住這個地方, 后面有個知識點會用到
node.prev = pred; // 1
// (2.1.3)
if (compareAndSetTail(pred, node)) { // 2
pred.next = node; // 3
return node;
}
仔細觀察代碼(2.1.2) 和(2.1.3), 此時添加結點相當於有三步,都不是原子性的,當執行到第二步時,就要喚醒后驅結點了,此時新增的結點只設置了前驅結點,隊列設置了尾結點,但是沒有設置后驅結點,如果從前往后查找的話,可能會丟失符合要求的結點。
[問題二],代碼(2.3.3)中, 為什么不只用propagate來判斷是否喚醒后驅結點。
請看這位大佬的博客 講的非常詳細
大致意思就是,
我們假設有A、B、C、D四個線程,前兩個釋放資源的線程,后兩個是爭搶資源的線程,此時只有A或B釋放了資源,C、D才可以被喚醒,假設我們不看PROPAGTE
時刻一: A線程釋放資源,執行代碼 (2.3.4),head的waitStatus從SIGNAL(-1)變為了0
時刻二: C線程獲取到資源,執行到代碼(2.3.1), tryAcquireShared返回0
時刻三: B線程線程釋放資源,執行代碼 (2.3.4),因為此時未改變頭結點,head的waitStatus為0,不能unparkSuccessor
時刻四: 此時C執行到代碼(2.3.3),propagate(tryAcquireShared返回值)為0,C也不會去喚醒后驅結點,D線程就永遠GG了
引用doReleaseShared注釋中的一句話
status is set to PROPAGATE to ensure that upon release, propagation continues.
2.6 Condition
使用synchronized時,線程間通信使用wait, notify and notifyAll;而使用AQS實現的lock,線程間的通信就使用Condition中的await、signal...。Condition與Lock結合使用,同一個lock對應多個Condition。

public class ConditionObject implements Condition...
在AQS中,已經對Condition的方法進行了實現,子類想使用的話,只需要調用ConditionObject就行了
final ConditionObject newCondition() {
return new ConditionObject();
}
本來想跟着源碼走,簡簡單單介紹一下Condition,但是源碼有幾處細節,讓我頭禿,在網上搜索別人的博客,這篇博客解開了我的疑惑,對Condition介紹的非常詳細,寫的非常的完美~
根據大佬的博客,那我們下面簡單講解Condition的兩個常用方法
- await
- signal
2.6.1 await & signal
導致目前線程阻塞直到被喚醒或中斷;調用await后,會將當前的線程封裝成node,加入到條件隊列中
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加結點到條件隊列中
Node node = addConditionWaiter();
// 釋放當前線程持有的鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
// 不在同步隊列中,就park
// (2.6.1)
while (!isOnSyncQueue(node)) {
// 執行到這,當前線程會立即掛起
LockSupport.park(this);
// 運行到這的話,情況: 1. signal 2. 中斷
// 檢驗中斷
// (2.6.2)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 第二部分
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
為了講清楚代碼(2.6.1)之后的邏輯,我們先看看signal的源碼
將condition queue中等待最長的結點轉移到sync queue中去,去爭搶資源
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
此時,執行signal的主要邏輯
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 將后驅結點置空
// (2.6.3)
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
將condition queue中的一個結點轉移到sync queue中去
final boolean transferForSignal(Node node) {
// 這里表示當前已經被取消了。
// (2.6.4)
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 將當前結點放入sync queue的末尾, 此時返回的是當前結點的前驅結點(一定要注意)
Node p = enq(node);
int ws = p.waitStatus;
// 前驅結點被取消,或者設置SIGNAL狀態失敗,就直接喚醒當前線程, 喚醒 = 有資格去競爭鎖了
// enq返回的是前驅結點,我人傻了,看成是返回當前線程,就一直覺得邏輯不對。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
這里signal的邏輯就講完了,總結一下:
- 在condition queue中找出等待時間最長且未被取消的結點, 轉移到sync queue的隊尾去,同時要在condition queue中刪除該結點
- 若在sync queue中的該結點的前驅結點被取消了或設置SIGNAL狀態失敗,要直接喚醒它,叫它去競爭鎖。
signalAll的主要邏輯和signal是一樣的,差別就是signalAll會把所有在condition queue中的結點轉移到sync queue中去,並清空所有在condition queue中的結點,下面只貼一下signalAll的主要代碼,
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
我們再次回到await中去
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加結點到條件隊列中
Node node = addConditionWaiter();
// 釋放當前線程持有的鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
// 不在同步隊列中,就park
// (2.6.1)
while (!isOnSyncQueue(node)) {
// 執行到這,當前線程會立即掛起
LockSupport.park(this);
// 運行到這的話,情況: 1. signal 2. 中斷
// 檢驗中斷
// (2.6.2)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 第二部分
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
從代碼(2.6.1),繼續講解
isOnSyncQueue
一開始在條件隊列中,現在在sync queue中等待重新獲取資源,如果有這種的node就返回true
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
findNodeFromTail
從尾部找尋結點
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
// 結合下面的enq代碼以及圖思考一下,會明白此方法的意義的
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
node.waitStatus == Node.CONDITION, 表示當前結點肯定在condition queue中。
為何是上面的那些條件?
我們上面看了轉移到sync queue是用的enq方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// (2.6.5)
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
結合代碼(2.6.5),思考一下就知道isOnSyncQueue中條件設置的道理了,但是為何需要findNodeFromTail啦? 這是需要補充一個知識點了,在多個線程執行enq時,只有一個線程會設置為tail成功,其余的都只是設置prev,就可能會出現下面圖片中的情況,'多個尾巴'。一直不斷自旋,最后會變成一個正常的鏈表。

此時線程的狀態是,調用await后,將結點添加到條件隊列中,且釋放了自己持有的所有資源,並將自己park,此時等待被signal或者被中斷。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加結點到條件隊列中
Node node = addConditionWaiter();
// 釋放當前線程持有的鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
// 不在同步隊列中,就park
// (2.6.1)
while (!isOnSyncQueue(node)) {
// 執行到這,當前線程會立即掛起
LockSupport.park(this); /// 我們在這被掛起了
// 運行到這的話,情況: 1. signal 2. 中斷
// 檢驗中斷
// (2.6.2)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 第二部分
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
執行到代碼(2.6.2), 我們直到可能是被signal或被中斷了。現在要解決的是,
- 是否被中斷?
- 何時被中斷?
- 中斷如何處理?
我們帶着這三個問題,繼續出發~
補充一個小知識點,AQS定義了三種情況中斷的值
- THROW_IE, signal前被中斷,要拋出InterruptedException
- REINTERRUPT, signal后被中斷
- 0, 未被中斷
關於REINTERRUPT這個中斷,可以理解成,吃飯,吃完了但是還有一個菜沒有上,問服務員,“如果沒有炒,就不要了”,但是服務員告訴,菜已經下鍋,所以這時候的中斷就是REINTERRUPT,中斷的太晚了。 -- 例子來自上面的那篇博客
我們繼續看向代碼(2.6.2)
checkInterruptWhileWaiting
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
若被中斷Thread.interrupted肯定為true
transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) {
// 中斷情況一: 此時結點還在condition queue中,肯定是signal前就被中斷了
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 這里還要加入到sync queue中,獲取到鎖才能拋出錯,繼續往后看
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
// 中斷情況二: 這里的情況就是signal后,node還沒有執行enq,畢竟執行signal到執行enq還有幾個步驟
// 此時就自旋,等待node轉移到sync node中就行了
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
上面的代碼已經注明了,各種情況的發生時機,此時我們來到了await的第二部分~
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加結點到條件隊列中
Node node = addConditionWaiter();
// 釋放當前線程持有的鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
// 不在同步隊列中,就park
// (2.6.1)
while (!isOnSyncQueue(node)) {
// 執行到這,當前線程會立即掛起
LockSupport.park(this); /// 我們在這被掛起了
// 運行到這的話,情況: 1. signal 2. 中斷
// 檢驗中斷
// (2.6.2)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 第二部分
// acquireQueued獲取到鎖,並返回是否被中斷。
// 有一種情況需要提一下,acquireQueued返回true,上面的interruptMode = 0,
// 表示signal后,在獲取鎖的時候被中斷了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// node還在condition上,說明是被取消了的node,清除它
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
// 對上面得到的interruptMode做出回應
reportInterruptAfterWait(interruptMode);
}
reportInterruptAfterWait
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
2.6.2 await與signal的流程圖


2.6.3 小結
先講講await的流程,看起流程圖有點嚇人,其實很多步驟是對不同時機的中斷操作的記錄
當await被執行,下面簡單總結下await的流程
- 將當前線程與CONDITION狀態封裝成node,加入到condition queue的末尾
- 釋放線程之前獲取的所有資源
- 若不在sync queue中,阻塞自己,等待被signal或被中斷
- 獲取中斷操作的時機,並記錄表示何時中斷的值(interruptMode)
- 不管是怎么被喚醒的,都要去競爭資源,直到獲得資源為止
- 最后對不同的中斷值,作出不同的操作
signal的流程就相對於簡單一點
- 獲取condition queue的頭結點
- 檢驗是否被取消,若被取消,就獲取頭結點的后驅結點,以此類推;
- 將結點從condition queue中轉移到sync queue中,而且會從condition queue中刪除該節點
- 若結點插入sync queue,得到的前驅結點,被取消了,或者CAS前驅結點狀態為SIGNAL
失敗,將直接unpark當前線程
3. 總結
Doug Lea,太秀了。AQS中有很多細枝末節的東西,只有自己去讀了源碼,理解為何這樣做,你才會明白才會真正讀懂AQS。
關於學習和寫AQS文章時,看了一些博客,為我解答了自己的疑惑,慢慢加油,我也要向這些大佬看齊~
4. 參考
- 《Java並發編程之美》 - 這本書可以作為學習並發的入門書
- Java並發之AQS詳解 - 引用了他的圖片
- AbstractQueuedSynchronizer源碼解讀 - 為我解開了一些獲取和釋放資源的疑惑
- 逐行分析AQS源碼(4)——Condition接口實現 - 為我解開了一些Condition的疑惑
- AQS論文 Doug Lea
~~# 5. 面試中問題
這是我的一個想法,若我博客中寫過的知識,在面試中有問到過,我會記錄下來,沒有就是目前還沒遇到過
