Java並發包源碼學習系列:AQS共享模式獲取與釋放資源
往期回顧:
上一篇文章介紹了AQS內置隊列節點的出隊入隊操作,以及獨占式獲取共享資源與釋放資源的詳細流程,為了結構完整,本篇繼續以AQS的角度介紹另外一種:共享模式獲取與釋放資源的細節,本篇暫不分析具體子類如ReentrantLock、ReentrantReadWriteLock的實現,之后會陸續補充。
獨占式獲取資源
友情提示:本篇文章着重介紹共享模式獲取和釋放資源的特點,許多代碼實現上面和共享式和獨占式其實邏輯差不多,為了清晰對比,這邊會將獨占式的部分核心代碼粘貼過來,注意理解共享式和獨占式存在差異的地方。詳細解析可戳:Java並發包源碼學習系列:CLH同步隊列及同步資源獲取與釋放
void acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) && // tryAcquire由子類實現,表示獲取鎖,如果成功,這個方法直接返回了
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果獲取失敗,執行
selfInterrupt();
}
boolean acquireQueued(Node, int)
// 這個方法如果返回true,代碼將進入selfInterrupt()
final boolean acquireQueued(final Node node, int arg) {
// 注意默認為true
boolean failed = true;
try {
// 是否中斷
boolean interrupted = false;
// 自旋,即死循環
for (;;) {
// 得到node的前驅節點
final Node p = node.predecessor();
// 我們知道head是虛擬的頭節點,p==head表示如果node為阻塞隊列的第一個真實節點
// 就執行tryAcquire邏輯,這里tryAcquire也需要由子類實現
if (p == head && tryAcquire(arg)) {
// tryAcquire獲取成功走到這,執行setHead出隊操作
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 走到這有兩種情況 1.node不是第一個節點 2.tryAcquire爭奪鎖失敗了
// 這里就判斷 如果當前線程爭鎖失敗,是否需要掛起當前這個線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 死循環退出,只有tryAcquire獲取鎖失敗的時候failed才為true
if (failed)
cancelAcquire(node);
}
}
獨占式釋放資源
boolean release(int arg)
public final boolean release(int arg) {
if (tryRelease(arg)) { // 子類實現tryRelease方法
// 獲得當前head
Node h = head;
// head不為null並且head的等待狀態不為0
if (h != null && h.waitStatus != 0)
// 喚醒下一個可以被喚醒的線程,不一定是next哦
unparkSuccessor(h);
return true;
}
return false;
}
void unparkSuccessor(Node node)
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 如果node的waitStatus<0為signal,CAS修改為0
// 將 head 節點的 ws 改成 0,清除信號。表示,他已經釋放過了。不能重復釋放。
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 喚醒后繼節點,但是有可能后繼節點取消了等待 即 waitStatus == 1
Node s = node.next;
// 如果后繼節點為空或者它已經放棄鎖了
if (s == null || s.waitStatus > 0) {
s = null;
// 從隊尾往前找,找到沒有沒取消的所有節點排在最前面的【直到t為null或t==node才退出循環嘛】
for (Node t = tail; t != null && t != node; t = t.prev)
// 如果>0表示節點被取消了,就一直向前找唄,找到之后不會return,還會一直向前
if (t.waitStatus <= 0)
s = t;
}
// 如果后繼節點存在且沒有被取消,會走到這,直接喚醒后繼節點即可
if (s != null)
LockSupport.unpark(s.thread);
}
共享式獲取資源
void acquireShared(int arg)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) //子類實現
doAcquireShared(arg);
}
tryAcquireShared(int)
是AQS提供給子類實現的鈎子方法,子類可以自定義實現共享式獲取資源的方式,獲取狀態失敗返回小於0,返回零值表示被獨占方式獲取,返回正值表示共享方式獲取。- 如果獲取失敗,則進入
doAcquireShared(arg);
的邏輯。
void doAcquireShared(int arg)
注意這里和獨占式獲取資源acquireQueued
的區別。
private void doAcquireShared(int arg) {
// 包裝成共享模式的節點,入隊
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 嘗試獲取同步狀態,子類實現
int r = tryAcquireShared(arg);
if (r >= 0) {
// 設置新的首節點,並根據條件,喚醒下一個節點
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
我們可以看到有幾個存在差異的地方:
- 在共享式獲取資源失敗的時候,會包裝成SHARED模式的節點入隊。
- 如果前驅節點為head,則使用tryAcquireShared方法嘗試獲取同步狀態,這個方法由子類實現。
- 如果獲取成功r>=0,這時調用
setHeadAndPropagate(node, r)
,該方法首先會設置新的首節點,將第一個節點出隊,接着會不斷喚醒下一個共享模式節點,實現同步狀態被多個線程共享獲取。
接下來我們着重看下setHeadAndPropagate方法。
void setHeadAndPropagate(Node node, int propagate)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 節點出隊,設置node為新的head
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 這個方法進來的時候propagate>=0
// propagate>0表示同步狀態還可以被后面的節點獲取
// h指向原先的head節點,之后h = head,h表示新的head節點
// h.waitStatus<0表示該節點后面還有節點需要被喚醒
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 獲取下一個節點
Node s = node.next;
// 沒有下一個節點或下一個節點為共享式獲取狀態
if (s == null || s.isShared())
// 喚醒后續的共享式獲取同步狀態的節點
doReleaseShared();
}
}
- 先記錄一下原來的頭節點,然后設置node為新的頭節點。
- 原先的頭節點或新的頭節點等待狀態是propagate或signal,可以繼續向下喚醒。
- 如果判斷下個節點為shared節點,調用共享式釋放資源方法喚醒后續節點。
共享式釋放資源
boolean releaseShared(int arg)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 子類實現
doReleaseShared();
return true;
}
return false;
}
doReleaseShared()
可以發現共享模式下,無論是獲取資源還是釋放資源都調用了doReleaseShared方法,可見該方法是共享模式釋放資源喚醒節點的核心方法,主要功能是喚醒下一個線程或者設置傳播狀態。
后繼線程被喚醒后,會嘗試獲取共享鎖,如果成功之后,則又會調用setHeadAndPropagate,將喚醒傳播下去。這個方法的作用是保障在acquire和release存在競爭的情況下,保證隊列中處於等待狀態的節點能夠有辦法被喚醒。
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
// 自旋
for (;;) {
Node h = head;
// 隊列已經初始化且至少有一個節點
if (h != null && h != tail) {
int ws = h.waitStatus;
// 無論是獨占還是共享,只有節點的ws為signal的時候,才會在釋放的時候,喚醒后面的節點
if (ws == Node.SIGNAL) {
// cas將ws設置為0,設置失敗,將會繼續從循環開始
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒后繼節點,unparkSuccessor這個方法上面已經解析過
unparkSuccessor(h);
}
// 如果ws為0,則更新狀態為propagate,
// 之后setHeadAndPropagate讀到ws<0的時候,會繼續喚醒后面節點
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果head在這個過程中被更改了,會繼續自旋
if (h == head) // loop if head changed
break;
}
}
該方法在 head 節點存在后繼節點的情況下,做了兩件事情:
-
如果 head 節點等待狀態為 SIGNAL,則將 head 節點狀態設為 0,並喚醒后繼未取消節點。
-
如果 head 節點等待狀態為 0,則將 head 節點狀態設為 PROPAGATE,保證喚醒能夠正常傳播下去。
設置PROPAGATE的作用:PROPAGATE狀態用在[setHeadAndPropagate](#void setHeadAndPropagate(Node node, int propagate)) ,當頭節點狀態被設為 PROPAGATE 后,后繼節點成為新的頭結點后。若 propagate > 0
條件不成立,則根據條件h.waitStatus < 0
成立與否,來決定是否喚醒后繼節點,即向后傳播喚醒動作。
引入PROPAGATE是為了解決什么問題?
AbstractQueuedSynchronizer源碼解讀,強烈建議閱讀這篇博客。
獨占式和共享式的區別總結
共享式獲取與獨占式獲取最大的區別就是同一時刻能否有多個線程同時獲取到同步狀態。
- 共享式訪問資源時,同一時刻其他共享式的訪問會被允許。
- 獨占式訪問資源時,同一時刻其他訪問均被阻塞。
AQS都提供了子類實現的鈎子方法,獨占式的代表方法有:tryAcquire和tryRelease以及isHeldExclusively方法,共享式的代表方法有:tryAcquireShared和tryReleaseShared方法。
AQS中獲取操作和釋放操作的標准形式:
boolean acquire() throws InterruptedException{
while( 當前狀態不允許獲取操作 ){
if( 需要阻塞獲取請求){
如果當前線程不在隊列中,則將其插入隊列
阻塞當前線程
}else{
返回失敗
}
}
可能更新同步器的狀態
如果線程位於隊列中,則將其移除隊列
返回成功
}
void release(){
更新同步器的狀態
if( 新的狀態允許某個被阻塞的線程獲取成功 ){
解除隊列中一個或多個線程的阻塞狀態
}
}
圖源:《並發編程的藝術》下圖是獨占式同步狀態獲取的流程
當某個線程爭奪同步資源失敗之后,他們都會將線程包裝為節點,並加入CLH同步隊列的隊尾,並保持自旋,一個是addWaiter(Node.EXCLUSIVE)
,一個是addWaiter(Node.EXCLUSIVE)
。
同步隊列中的線程在自旋時會判斷其前驅節點是否為首節點,如果是首節點node.predecessor() == head
,他們都會嘗試獲取同步狀態,只不過:
- 獨占式獲取狀態成功后,只會出隊一個節點。
- 共享式獲取狀態成功后,除了出隊一個節點,還會喚醒后面的節點。
線程執行完邏輯之后,他們都會釋放同步狀態,釋放之后將會unparkSuccessor(h)
喚醒其后可被喚醒的某個后繼節點。
參考閱讀
- 【死磕Java並發】—–J.U.C之AQS:同步狀態的獲取與釋放
- Java並發之AQS詳解
- AbstractQueuedSynchronizer源碼解讀
- Java技術之AQS詳解
- 《並發編程的藝術》方騰飛
- 《Java並發編程實戰》Doug Lea