1. 背景
AQS(java.util.concurrent.locks.AbstractQueuedSynchronizer)是Doug Lea大師創作的用來構建鎖或者其他同步組件(信號量、事件等)的基礎框架類。JDK中許多並發工具類的內部實現都依賴於AQS,如ReentrantLock, Semaphore, CountDownLatch等等。學習AQS的使用與源碼實現對深入理解concurrent包中的類有很大的幫助。
本文重點介紹AQS中的基本實現思路,包括獨占鎖、共享鎖的獲取和釋放實現原理和一些代碼細節。
對於AQS中ConditionObject的相關實現,可以參考我的另一篇博文AbstractQueuedSynchronizer源碼解讀--續篇之Condition。
2. 簡介
AQS的主要使用方式是繼承它作為一個內部輔助類實現同步原語,它可以簡化你的並發工具的內部實現,屏蔽同步狀態管理、線程的排隊、等待與喚醒等底層操作。
AQS設計基於模板方法模式,開發者需要繼承同步器並且重寫指定的方法,將其組合在並發組件的實現中,調用同步器的模板方法,模板方法會調用使用者重寫的方法。
3. 實現思路
下面介紹下AQS具體實現的大致思路。
AQS內部維護一個CLH隊列來管理鎖。
線程會首先嘗試獲取鎖,如果失敗,則將當前線程以及等待狀態等信息包成一個Node節點加到同步隊列里。
接着會不斷循環嘗試獲取鎖(條件是當前節點為head的直接后繼才會嘗試),如果失敗則會阻塞自己,直至被喚醒;
而當持有鎖的線程釋放鎖時,會喚醒隊列中的后繼線程。
下面列舉JDK中幾種常見使用了AQS的同步組件:
- ReentrantLock: 使用了AQS的獨占獲取和釋放,用state變量記錄某個線程獲取獨占鎖的次數,獲取鎖時+1,釋放鎖時-1,在獲取時會校驗線程是否可以獲取鎖。
- Semaphore: 使用了AQS的共享獲取和釋放,用state變量作為計數器,只有在大於0時允許線程進入。獲取鎖時-1,釋放鎖時+1。
- CountDownLatch: 使用了AQS的共享獲取和釋放,用state變量作為計數器,在初始化時指定。只要state還大於0,獲取共享鎖會因為失敗而阻塞,直到計數器的值為0時,共享鎖才允許獲取,所有等待線程會被逐一喚醒。
3.1 如何獲取鎖
獲取鎖的思路很直接:
while (不滿足獲取鎖的條件) {
把當前線程包裝成節點插入同步隊列
if (需要阻塞當前線程)
阻塞當前線程直至被喚醒
}
將當前線程從同步隊列中移除
以上是一個很簡單的獲取鎖的偽代碼流程,AQS的具體實現比這個復雜一些,也稍有不同,但思想上是與上述偽代碼契合的。
通過循環檢測是否能夠獲取到鎖,如果不滿足,則可能會被阻塞,直至被喚醒。
3.2 如何釋放鎖
釋放鎖的過程設計修改同步狀態,以及喚醒后繼等待線程:
修改同步狀態
if (修改后的狀態允許其他線程獲取到鎖)
喚醒后繼線程
這只是很簡略的釋放鎖的偽代碼示意,AQS具體實現中能看到這個簡單的流程模型。
3.3 API簡介
通過上面的AQS大體思路分析,我們可以看到,AQS主要做了三件事情
- 同步狀態的管理
- 線程的阻塞和喚醒
- 同步隊列的維護
下面三個protected final方法是AQS中用來訪問/修改同步狀態的方法:
-
int getState(): 獲取同步狀態
-
void setState(): 設置同步狀態
-
boolean compareAndSetState(int expect, int update):基於CAS,原子設置當前狀態
在自定義基於AQS的同步工具時,我們可以選擇覆蓋實現以下幾個方法來實現同步狀態的管理:
方法 | 描述 |
---|---|
boolean tryAcquire(int arg) | 試獲取獨占鎖 |
boolean tryRelease(int arg) | 試釋放獨占鎖 |
int tryAcquireShared(int arg) | 試獲取共享鎖 |
boolean tryReleaseShared(int arg) | 試釋放共享鎖 |
boolean isHeldExclusively() | 當前線程是否獲得了獨占鎖 |
以上的幾個試獲取/釋放鎖的方法的具體實現應當是無阻塞的。
AQS本身將同步狀態的管理用模板方法模式都封裝好了,以下列舉了AQS中的一些模板方法:
方法 | 描述 |
---|---|
void acquire(int arg) | 獲取獨占鎖。會調用tryAcquire 方法,如果未獲取成功,則會進入同步隊列等待 |
void acquireInterruptibly(int arg) | 響應中斷版本的acquire |
boolean tryAcquireNanos(int arg,long nanos) | 響應中斷+帶超時版本的acquire |
void acquireShared(int arg) | 獲取共享鎖。會調用tryAcquireShared 方法 |
void acquireSharedInterruptibly(int arg) | 響應中斷版本的acquireShared |
boolean tryAcquireSharedNanos(int arg,long nanos) | 響應中斷+帶超時版本的acquireShared |
boolean release(int arg) | 釋放獨占鎖 |
boolean releaseShared(int arg) | 釋放共享鎖 |
Collection getQueuedThreads() | 獲取同步隊列上的線程集合 |
上面看上去很多方法,其實從語義上來區分就是獲取和釋放,從模式上區分就是獨占式和共享式,從中斷相應上來看就是支持和不支持。
4. 代碼解讀
4.1 數據結構定義
首先看一下AQS中的嵌套類Node的定義。
static final class Node {
/**
* 用於標記一個節點在共享模式下等待
*/
static final Node SHARED = new Node();
/**
* 用於標記一個節點在獨占模式下等待
*/
static final Node EXCLUSIVE = null;
/**
* 等待狀態:取消
*/
static final int CANCELLED = 1;
/**
* 等待狀態:通知
*/
static final int SIGNAL = -1;
/**
* 等待狀態:條件等待
*/
static final int CONDITION = -2;
/**
* 等待狀態:傳播
*/
static final int PROPAGATE = -3;
/**
* 等待狀態
*/
volatile int waitStatus;
/**
* 前驅節點
*/
volatile Node prev;
/**
* 后繼節點
*/
volatile Node next;
/**
* 節點對應的線程
*/
volatile Thread thread;
/**
* 等待隊列中的后繼節點
*/
Node nextWaiter;
/**
* 當前節點是否處於共享模式等待
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 獲取前驅節點,如果為空的話拋出空指針異常
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null) {
throw new NullPointerException();
} else {
return p;
}
}
Node() {
}
/**
* addWaiter會調用此構造函數
*/
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
/**
* Condition會用到此構造函數
*/
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
這里有必要專門梳理一下節點等待狀態的定義,因為AQS源碼中有大量的狀態判斷與躍遷。
值 | 描述 |
---|---|
CANCELLED (1) | 當前線程因為超時或者中斷被取消。這是一個終結態,也就是狀態到此為止。 |
SIGNAL (-1) | 當前線程的后繼線程被阻塞或者即將被阻塞,當前線程釋放鎖或者取消后需要喚醒后繼線程。這個狀態一般都是后繼線程來設置前驅節點的。 |
CONDITION (-2) | 當前線程在condition隊列中。 |
PROPAGATE (-3) | 用於將喚醒后繼線程傳遞下去,這個狀態的引入是為了完善和增強共享鎖的喚醒機制。在一個節點成為頭節點之前,是不會躍遷為此狀態的 |
0 | 表示無狀態。 |
對於分析AQS中不涉及ConditionObject
部分的代碼,可以認為隊列中的節點狀態只會是CANCELLED, SIGNAL, PROPAGATE, 0這幾種情況。
上圖為自制的AQS狀態的流轉圖,AQS中0狀態和CONDITION狀態為始態,CANCELLED狀態為終態。0狀態同時也可以是節點生命周期的終態。
注意,上圖僅表示狀態之間流轉的可達性,並不代表一定能夠從一個狀態沿着線隨意躍遷。
在AQS中包含了head和tail兩個Node引用,其中head在邏輯上的含義是當前持有鎖的線程,head節點實際上是一個虛節點,本身並不會存儲線程信息。
當一個線程無法獲取鎖而被加入到同步隊列時,會用CAS來設置尾節點tail為當前線程對應的Node節點。
head和tail在AQS中是延遲初始化的,也就是在需要的時候才會被初始化,也就意味着在所有線程都能獲取到鎖的情況下,隊列中的head和tail都會是null。
4.2 獲取獨占鎖的實現
下面來具體看看acquire(int arg)的實現:
/**
* 獲取獨占鎖,對中斷不敏感。
* 首先嘗試獲取一次鎖,如果成功,則返回;
* 否則會把當前線程包裝成Node插入到隊列中,在隊列中會檢測是否為head的直接后繼,並嘗試獲取鎖,
* 如果獲取失敗,則會通過LockSupport阻塞當前線程,直至被釋放鎖的線程喚醒或者被中斷,隨后再次嘗試獲取鎖,如此反復。
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 在隊列中新增一個節點。
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 快速嘗試
if (pred != null) {
node.prev = pred;
// 通過CAS在隊尾插入當前節點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 初始情況或者在快速嘗試失敗后插入節點
enq(node);
return node;
}
/**
* 通過循環+CAS在隊列中成功插入一個節點后返回。
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 初始化head和tail
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
/*
* AQS的精妙就是體現在很多細節的代碼,比如需要用CAS往隊尾里增加一個元素
* 此處的else分支是先在CAS的if前設置node.prev = t,而不是在CAS成功之后再設置。
* 一方面是基於CAS的雙向鏈表插入目前沒有完美的解決方案,另一方面這樣子做的好處是:
* 保證每時每刻tail.prev都不會是一個null值,否則如果node.prev = t
* 放在下面if的里面,會導致一個瞬間tail.prev = null,這樣會使得隊列不完整。
*/
node.prev = t;
// CAS設置tail為node,成功后把老的tail也就是t連接到node。
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* 在隊列中的節點通過此方法獲取鎖,對中斷不敏感。
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/*
* 檢測當前節點前驅是否head,這是試獲取鎖的資格。
* 如果是的話,則調用tryAcquire嘗試獲取鎖,
* 成功,則將head置為當前節點。
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
* 如果未成功獲取鎖則根據前驅節點判斷是否要阻塞。
* 如果阻塞過程中被中斷,則置interrupted標志位為true。
* shouldParkAfterFailedAcquire方法在前驅狀態不為SIGNAL的情況下都會循環重試獲取鎖。
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 根據前驅節點中的waitStatus來判斷是否需要阻塞當前線程。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驅節點設置為SIGNAL狀態,在釋放鎖的時候會喚醒后繼節點,
* 所以后繼節點(也就是當前節點)現在可以阻塞自己。
*/
return true;
if (ws > 0) {
/*
* 前驅節點狀態為取消,向前遍歷,更新當前節點的前驅為往前第一個非取消節點。
* 當前線程會之后會再次回到循環並嘗試獲取鎖。
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/**
* 等待狀態為0或者PROPAGATE(-3),設置前驅的等待狀態為SIGNAL,
* 並且之后會回到循環再次重試獲取鎖。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 該方法實現某個node取消獲取鎖。
*/
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 遍歷並更新節點前驅,把node的prev指向前部第一個非取消節點。
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 記錄pred節點的后繼為predNext,后續CAS會用到。
Node predNext = pred.next;
// 直接把當前節點的等待狀態置為取消,后繼節點即便也在cancel可以跨越node節點。
node.waitStatus = Node.CANCELLED;
/*
* 如果CAS將tail從node置為pred節點了
* 則剩下要做的事情就是嘗試用CAS將pred節點的next更新為null以徹底切斷pred和node的聯系。
* 這樣一來就斷開了pred與pred的所有后繼節點,這些節點由於變得不可達,最終會被回收掉。
* 由於node沒有后繼節點,所以這種情況到這里整個cancel就算是處理完畢了。
*
* 這里的CAS更新pred的next即使失敗了也沒關系,說明有其它新入隊線程或者其它取消線程更新掉了。
*/
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果node還有后繼節點,這種情況要做的事情是把pred和后繼非取消節點拼起來。
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
/*
* 如果node的后繼節點next非取消狀態的話,則用CAS嘗試把pred的后繼置為node的后繼節點
* 這里if條件為false或者CAS失敗都沒關系,這說明可能有多個線程在取消,總歸會有一個能成功的。
*/
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
/*
* 這時說明pred == head或者pred狀態取消或者pred.thread == null
* 在這些情況下為了保證隊列的活躍性,需要去喚醒一次后繼線程。
* 舉例來說pred == head完全有可能實際上目前已經沒有線程持有鎖了,
* 自然就不會有釋放鎖喚醒后繼的動作。如果不喚醒后繼,隊列就掛掉了。
*
* 這種情況下看似由於沒有更新pred的next的操作,隊列中可能會留有一大把的取消節點。
* 實際上不要緊,因為后繼線程喚醒之后會走一次試獲取鎖的過程,
* 失敗的話會走到shouldParkAfterFailedAcquire的邏輯。
* 那里面的if中有處理前驅節點如果為取消則維護pred/next,踢掉這些取消節點的邏輯。
*/
unparkSuccessor(node);
}
/*
* 取消節點的next之所以設置為自己本身而不是null,
* 是為了方便AQS中Condition部分的isOnSyncQueue方法,
* 判斷一個原先屬於條件隊列的節點是否轉移到了同步隊列。
*
* 因為同步隊列中會用到節點的next域,取消節點的next也有值的話,
* 可以斷言next域有值的節點一定在同步隊列上。
*
* 在GC層面,和設置為null具有相同的效果。
*/
node.next = node;
}
}
/**
* 喚醒后繼線程。
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 嘗試將node的等待狀態置為0,這樣的話,后繼爭用線程可以有機會再嘗試獲取一次鎖。
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
/*
* 這里的邏輯就是如果node.next存在並且狀態不為取消,則直接喚醒s即可
* 否則需要從tail開始向前找到node之后最近的非取消節點。
*
* 這里為什么要從tail開始向前查找也是值得琢磨的:
* 如果讀到s == null,不代表node就為tail,參考addWaiter以及enq函數中的我的注釋。
* 不妨考慮到如下場景:
* 1. node某時刻為tail
* 2. 有新線程通過addWaiter中的if分支或者enq方法添加自己
* 3. compareAndSetTail成功
* 4. 此時這里的Node s = node.next讀出來s == null,但事實上node已經不是tail,它有后繼了!
*/
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
AQS獨占鎖的獲取的流程示意如下:
4.3 釋放獨占鎖的實現
上面已經分析了acquire的實現,下面來看看release的實現:
對於釋放一個獨占鎖,首先會調用tryRelease,在完全釋放掉獨占鎖后,這時后繼線程是可以獲取到獨占鎖的,
因此釋放者線程需要做的事情是喚醒一個隊列中的后繼者線程,讓它去嘗試獲取獨占鎖。
上述所謂完全釋放掉鎖的含義,簡單來說就是當前鎖處於無主狀態,等待線程有可能可以獲取。
舉例:對於可重入鎖ReentrantLock, 每次tryAcquire后,state會+1,每次tryRelease后,state會-1,如果state變為0了,則此時稱獨占鎖被完全釋放了。
下面,我們來看一下release的具體代碼實現:
public final boolean release(int arg) {
if (tryRelease(arg)) {
/*
* 此時的head節點可能有3種情況:
* 1. null (AQS的head延遲初始化+無競爭的情況)
* 2. 當前線程在獲取鎖時new出來的節點通過setHead設置的
* 3. 由於通過tryRelease已經完全釋放掉了獨占鎖,有新的節點在acquireQueued中獲取到了獨占鎖,並設置了head
* 第三種情況可以再分為兩種情況:
* (一)時刻1:線程A通過acquireQueued,持鎖成功,set了head
* 時刻2:線程B通過tryAcquire試圖獲取獨占鎖失敗失敗,進入acquiredQueued
* 時刻3:線程A通過tryRelease釋放了獨占鎖
* 時刻4:線程B通過acquireQueued中的tryAcquire獲取到了獨占鎖並調用setHead
* 時刻5:線程A讀到了此時的head實際上是線程B對應的node
* (二)時刻1:線程A通過tryAcquire直接持鎖成功,head為null
* 時刻2:線程B通過tryAcquire試圖獲取獨占鎖失敗失敗,入隊過程中初始化了head,進入acquiredQueued
* 時刻3:線程A通過tryRelease釋放了獨占鎖,此時線程B還未開始tryAcquire
* 時刻4:線程A讀到了此時的head實際上是線程B初始化出來的傀儡head
*/
Node h = head;
// head節點狀態不會是CANCELLED,所以這里h.waitStatus != 0相當於h.waitStatus < 0
if (h != null && h.waitStatus != 0)
// 喚醒后繼線程,此函數在acquire中已經分析過,不再列舉說明
unparkSuccessor(h);
return true;
}
return false;
}
整個release做的事情就是
- 調用tryRelease
- 如果tryRelease返回true也就是獨占鎖被完全釋放,喚醒后繼線程。
這里的喚醒是根據head幾點來判斷的,上面代碼的注釋中也分析了head節點的情況,只有在head存在並且等待狀態小於零的情況下喚醒。
4.4 獲取共享鎖的實現
與獲取獨占鎖的實現不同的關鍵在於,共享鎖允許多個線程持有。
如果需要使用AQS中共享鎖,在實現tryAcquireShared方法時需要注意,返回負數表示獲取失敗;返回0表示成功,但是后繼爭用線程不會成功;返回正數表示
獲取成功,並且后繼爭用線程也可能成功。
下面來看一下具體的代碼實現:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
// 一旦共享獲取成功,設置新的頭結點,並且喚醒后繼線程
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 這個函數做的事情有兩件:
* 1. 在獲取共享鎖成功后,設置head節點
* 2. 根據調用tryAcquireShared返回的狀態以及節點本身的等待狀態來判斷是否要需要喚醒后繼線程。
*/
private void setHeadAndPropagate(Node node, int propagate) {
// 把當前的head封閉在方法棧上,用以下面的條件檢查。
Node h = head;
setHead(node);
/*
* propagate是tryAcquireShared的返回值,這是決定是否傳播喚醒的依據之一。
* h.waitStatus為SIGNAL或者PROPAGATE時也根據node的下一個節點共享來決定是否傳播喚醒,
* 這里為什么不能只用propagate > 0來決定是否可以傳播在本文下面的思考問題中有相關講述。
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
* 這是共享鎖中的核心喚醒函數,主要做的事情就是喚醒下一個線程或者設置傳播狀態。
* 后繼線程被喚醒后,會嘗試獲取共享鎖,如果成功之后,則又會調用setHeadAndPropagate,將喚醒傳播下去。
* 這個函數的作用是保障在acquire和release存在競爭的情況下,保證隊列中處於等待狀態的節點能夠有辦法被喚醒。
*/
private void doReleaseShared() {
/*
* 以下的循環做的事情就是,在隊列存在后繼線程的情況下,喚醒后繼線程;
* 或者由於多線程同時釋放共享鎖由於處在中間過程,讀到head節點等待狀態為0的情況下,
* 雖然不能unparkSuccessor,但為了保證喚醒能夠正確穩固傳遞下去,設置節點狀態為PROPAGATE。
* 這樣的話獲取鎖的線程在執行setHeadAndPropagate時可以讀到PROPAGATE,從而由獲取鎖的線程去釋放后繼等待線程。
*/
for (;;) {
Node h = head;
// 如果隊列中存在后繼線程。
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
// 如果h節點的狀態為0,需要設置為PROPAGATE用以保證喚醒的傳播。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 檢查h是否仍然是head,如果不是的話需要再進行循環。
if (h == head)
break;
}
}
4.5 釋放共享鎖的實現
釋放共享鎖與獲取共享鎖的代碼共享了doReleaseShared,用於實現喚醒的傳播。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// doReleaseShared的實現上面獲取共享鎖已經介紹
doReleaseShared();
return true;
}
return false;
}
從中,我們可以看出,共享鎖的獲取和釋放都會涉及到doReleaseShared,也就是后繼線程的喚醒。關於PROPAGATE狀態的必要性,后文會作進一步介紹。
5. 一些思考
AQS的代碼實在是很精妙,要看懂大致套路並不困難,但是要完全領悟其中的一些細節是一件需要花功夫來仔細琢磨品味的事情。
下面列出一些看源碼時的問題與思考:
5.1 插入節點時的代碼順序
addWaiter和enq方法中新增一個節點時為什么要先將新節點的prev置為tail再嘗試CAS,而不是CAS成功后來構造節點之間的雙向鏈接?
這是因為,雙向鏈表目前沒有基於CAS原子插入的手段,如果我們將node.prev = t
和t.next = node
(t為方法執行時讀到的tail,引用封閉在棧上)
放到compareAndSetTail(t, node)
成功后執行,如下所示:
if (compareAndSetTail(t, node)) {
node.prev = t;
t.next = node;
return t;
}
會導致這一瞬間的tail也就是t的prev為null,這就使得這一瞬間隊列處於一種不一致的中間狀態。
5.2 喚醒節點時為什么從tail向前遍歷
unparkSuccessor方法中為什么喚醒后繼節點時要從tail向前查找最接近node的非取消節點,而不是直接從node向后找到第一個后break掉?
在上面的代碼注釋中已經提及到這一點:
如果讀到s == null,不代表node就為tail。
考慮如下場景:
- node某時刻為tail
- 有新線程通過addWaiter中的if分支或者enq方法添加自己
- compareAndSetTail成功
- 此時這里的Node s = node.next讀出來s == null,但事實上node已經不是tail,它有后繼了!
5.3 unparkSuccessor有新線程爭鎖是否存在漏洞
unparkSuccessor方法在被release調用時是否存在這樣的一個漏洞?
時刻1: node -> tail && tail.waitStatus == Node.CANCELLED (node的下一個節點為tail,並且tail處於取消狀態)
時刻2: unparkSuccessor讀到s.waitStatus > 0
時刻3: unparkSuccessor從tail開始遍歷
時刻4: tail節點對應線程執行cancelAcquire方法中的if (node == tail && compareAndSetTail(node, pred)) 返回true,
此時tail變為pred(也就是node)
時刻5: 有新線程進隊列tail變為新節點
時刻6: unparkSuccessor沒有發現需要喚醒的節點
最終新節點阻塞並且前驅節點結束調用,新節點再也無法被unpark
這種情況不會發生,確實可能出現從tail向前掃描,沒有讀到新入隊的節點,但別忘了acquireQueued的思想就是不斷循環檢測是否能夠獨占獲取鎖,
否則再進行判斷是否要阻塞自己,而release的第一步就是tryRelease,它的語義為true表示完全釋放獨占鎖,完全釋放之后才會執行后面的邏輯,也就是unpark后繼線程。在這種情況下,新入隊的線程應當能獲取到鎖。
如果沒有獲取鎖,則必然是在覆蓋tryAcquire/tryRelease的實現有問題,導致前驅節點成功釋放了獨占鎖,后繼節點獲取獨占鎖仍然失敗。也就是說AQS框架的可靠性還在
某些程度上依賴於具體子類的實現,子類實現如果有bug,那AQS再精巧也扛不住。
5.4 AQS如何保證隊列活躍
AQS如何保證在節點釋放的同時又有新節點入隊的情況下,不出現原持鎖線程釋放鎖,后繼線程被自己阻塞死的情況,保持同步隊列的活躍?
回答這個問題,需要理解shouldParkAfterFailedAcquire和unparkSuccessor這兩個方法。
以獨占鎖為例,后繼爭用線程阻塞自己的情況是讀到前驅節點的等待狀態為SIGNAL,只要不是這種情況都會再試着去爭取鎖。
假設后繼線程讀到了前驅狀態為SIGNAL,說明之前在tryAcquire的時候,前驅持鎖線程還沒有tryRelease完全釋放掉獨占鎖。
此時如果前驅線程完全釋放掉了獨占鎖,則在unparkSuccessor中還沒執行完置waitStatus為0的操作,也就是還沒執行到下面喚醒后繼線程的代碼,否則后繼線程會再去爭取鎖。
那么就算后繼爭用線程此時把自己阻塞了,也一定會馬上被前驅線程喚醒。
那么是否可能持鎖線程執行喚醒后繼線程的邏輯時,后繼線程讀到前驅等待狀態為SIGNAL把自己給阻塞,再也無法蘇醒呢?
這個問題在上面的問題3中已經有答案了,確實可能在掃描后繼需要喚醒線程時讀不到新來的線程,但只要tryRelease語義實現正確,在true時表示完全釋放獨占鎖,
則后繼線程理應能夠tryAcquire成功,shouldParkAfterFailedAcquire在讀到前驅狀態不為SIGNAL
會給當前線程再一次獲取鎖的機會的。
別看AQS代碼寫的有些復雜,狀態有些多,還真的就是沒毛病,各種情況都能覆蓋。
5.5 PROPAGATE狀態存在的意義
在setHeadAndPropagate中我們可以看到如下的一段代碼:
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
為什么不只是用propagate > 0來判斷呢?我們知道目前AQS代碼中的Node.PROPAGATE狀態就是為了此處可以讀取到h.waitStatus < 0(PROPAGATE值為-3);如果這里可以只用propagate > 0來判斷,是否PROPAGATE狀態都沒有存在的必要了?
我接觸JAVA比較晚,接觸的時候就已經是JDK8的年代了。這個問題我思考了很久,沒有想到很合理的解釋來說明PROPAGATE狀態存在的必要性。
在網上也鮮少有相關方面的資料、博客提及到這些。后來通過瀏覽Doug Lea的個人網站,發現在很久以前AQS的代碼確實是沒有PROPAGATE的,PROPAGATE的引入是為了解決共享鎖並發釋放導致的線程hang住問題。
在Doug Lea的JSR 166 repository上,我找到了PROPAGATE最早被引入的那一版。可以看到
Revision1.73中,PROPAGATE狀態被引入用以修復bug 6801020,讓我們來看看這個bug:
import java.util.concurrent.Semaphore;
public class TestSemaphore {
private static Semaphore sem = new Semaphore(0);
private static class Thread1 extends Thread {
@Override
public void run() {
sem.acquireUninterruptibly();
}
}
private static class Thread2 extends Thread {
@Override
public void run() {
sem.release();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000000; i++) {
Thread t1 = new Thread1();
Thread t2 = new Thread1();
Thread t3 = new Thread2();
Thread t4 = new Thread2();
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
System.out.println(i);
}
}
}
很顯然,這段程序一定能執行結束的,但是會偶現線程hang住的問題。
當時的AQS中setHeadAndPropagate是這樣的:
以上是bug 6801020修復點的對比,左邊為修復之前的版本,右邊為引入PROPAGATE修復之后的版本。
從左邊可以看到原先的setHeadAndPropagate相比目前版本要簡單很多,而releaseShared的實現也與release基本雷同,這也正是本問題的核心:為什么僅僅用調用的tryAcquireShared
得到的返回值來判斷是否需要喚醒不行呢?
在PROPAGATE狀態出現之前的源碼可以
在這里查看。
5.5.1 分析
讓我們來分析一下上面的程序:
上面的程序循環中做的事情就是放出4個線程,其中2個線程用於獲取信號量,另外2個用於釋放信號量。每次循環主線程會等待所有子線程執行完畢。
出現bug也就是線程hang住的問題就在於兩個獲取信號量的線程有一個會沒辦法被喚醒,隊列就死掉了。
在AQS的共享鎖中,一個被park的線程,不考慮線程中斷和前驅節點取消的情況,有兩種情況可以被unpark:一種是其他線程釋放信號量,調用unparkSuccessor;
另一種是其他線程獲取共享鎖時通過傳播機制來喚醒后繼節點。
我們假設某次循環中隊列里排隊的節點為情況為:
head -> t1的node -> t2的node(也就是tail)
信號量釋放的順序為t3先釋放,t4后釋放:
時刻1: t3調用releaseShared,調用了unparkSuccessor(h),head的等待狀態從-1變為0
時刻2: t1由於t3釋放了信號量,被t3喚醒,調用Semaphore.NonfairSync的tryAcquireShared,返回值為0
時刻3: t4調用releaseShared,讀到此時h.waitStatus為0(此時讀到的head和時刻1中為同一個head),不滿足條件,因此不會調用unparkSuccessor(h)
時刻4: t1獲取信號量成功,調用setHeadAndPropagate時,因為不滿足propagate > 0(時刻2的返回值也就是propagate==0),從而不會喚醒后繼節點
這就好比是一個精巧的多米諾骨牌最終由於設計的失誤導致動力無法傳遞下去,至此AQS中的同步隊列宣告死亡。
那么引入PROPAGATE是怎么解決問題的呢?
引入之后,調用releaseShared方法不再簡單粗暴地直接unparkSuccessor,而是將傳播行為抽了一個doReleaseShared方法出來。
再看上面的那種情況:
時刻1:t3調用releaseShared -> doReleaseShared -> unparkSuccessor,完了之后head的等待狀態為0
時刻2:t1由於t3釋放了信號量,被t3喚醒,調用Semaphore.NonfairSync的tryAcquireShared,返回值為0
時刻3:t4調用releaseShared,讀到此時h.waitStatus為0(此時讀到的head和時刻1中為同一個head),將等待狀態置為PROPAGATE
時刻4:t1獲取信號量成功,調用setHeadAndPropagate時,可以讀到h.waitStatus < 0
,從而可以接下來調用doReleaseShared喚醒t2
也就是說,上面會產生線程hang住bug的case在引入PROPAGATE后可以被規避掉。在PROPAGATE引入之前,之所以可能會出現線程hang住的情況,就是在於
releaseShared有競爭的情況下,可能會有隊列中處於等待狀態的節點因為第一個線程完成釋放喚醒,第二個線程獲取到鎖,但還沒設置好head,又有新線程釋放鎖,但是讀到老的head狀態為0導致釋放但不喚醒,最終后一個等待線程既沒有被釋放線程喚醒,也沒有被持鎖線程喚醒。
所以,僅僅靠tryAcquireShared的返回值來決定是否要將喚醒傳遞下去是不充分的。
5.6 AQS如何防止內存泄露
AQS維護了一個FIFO隊列,它是如何保證在運行期間不發生內存泄露的?
AQS在無競爭條件下,甚至都不會new出head和tail節點。
線程成功獲取鎖時設置head節點的方法為setHead,由於頭節點的thread並不重要,此時會置node的thread和prev為null,
完了之后還會置原先head也就是線程對應node的前驅的next為null,從而實現隊首元素的安全移出。
而在取消節點時,也會令node.thread = null,在node不為tail的情況下,會使node.next = node(之所以這樣也是為了isOnSyncQueue實現更加簡潔)
6. 總結
AQS毫無疑問是Doug Lea大師令人嘆為觀止的作品,它實現精巧、魯棒、優雅,很好地封裝了同步狀態的管理、線程的等待與喚醒,足以滿足大多數同步工具的需求。
閱讀AQS的源碼不是一蹴而就就能完全讀懂的,閱讀源碼大致分為三步:
- 讀懂大概思路以及一些重要方法之間的調用關系
- 逐行看代碼的具體實現,知道每一段代碼是干什么的
- 琢磨參悟某一段代碼為什么是這么寫的,能否換一種寫法,能否前后幾行代碼調換順序,作者是怎么想的
從Doug Lea大師的論文中,我們也能夠看出他設計並實現了AQS本身一方面是本人功力深厚,另一方面也閱讀了大量的文獻與資料,也做了很多方面的測試。
讀AQS最難的地方不在於明白套路和思路,而在於代碼中點點滴滴的細節。從一行行的代碼角度來說,比如改一個值,是否需要CAS,是否一定要CAS成功;讀一個值,在多線程環境下含義是什么,有哪些種情況。從一個個方法角度來說,這些方法的調用關系是如何保證框架的正確性、魯棒性、伸縮性等。
如果能把這些細節都想清楚,明白作者的思路與考慮,才可以源碼理解入木三分了。
對於PROPAGATE狀態,網上大多AQS的介紹也都只是淺顯地提及是用來設置傳播的,缺少對於這個狀態存在必要性的思考。一開始我也想了很久不明白為什么一定需要一個PROPAGATE狀態而不是直接根據tryAcquireShared的返回值來判斷是否需要傳播。后來也是去了Doug Lea的個人網站翻出當時最早引入PROPAGATE狀態的提交,看到了原來的代碼,以及http://bugs.java.com/上的bug才更厘清PROPAGATE狀態引入的前因后果。
盡管看懂源碼,也可能遠遠達不到能再造一個能與之媲美的輪子的程度,但是能對同步框架、鎖、線程等有更深入的理解,也是很豐碩的收獲了。
當然,AQS也有其局限性,由於維護的是FIFO隊列。如果想要實現一個具有優先級的鎖,AQS就派不上什么用處了。
7. 參考
Doug Lea的AQS論文
The Art of Multiprocessor Programming(多處理器編程的藝術)