概括
AQS框架數據結構是一個先進先出的雙向隊列,當多個線程進行競爭資源時,那些競爭失敗的線程會加入到隊列中。他向上層提供了很多接口,其中一個是acquireShared獲取共享模式的接口。本文將會根據這個接口一步步分析,獲取資源失敗的線程是怎么進入到隊列中的,進入到隊列中又是怎么出隊列再次競爭資源的,下面是acquireShared執行的一個大致流程:
-
多個線程通過調用tryAcquireShared方法獲取共享資源,返回值大於等於0則獲取資源成功,返回值小於0則獲取失敗。
-
當前線程獲取共享資源失敗后,通過調用addWaiter方法把該線程封裝為Node節點,並設置該節點為共享模式。然后把該節點添加到隊列的尾部。
-
添加到尾部后,判斷該節點的上一個節點是不是隊列的頭節點,如果是頭節點,那么該節點的上一個節點出隊列並獲取共享資源,同時調用setHeadAndPropagate方法把該節點設置為新的頭節點,同時喚醒隊列中所有共享類型的節點,去獲取共享資源。如果獲取失敗,則再次加入到隊列中。
-
如果該節點的前驅節點不是頭節點,那么通過for循環進行自旋轉等待,直到當前節點的前驅節點是頭節點,結束自旋。
這就是AQS共享模式競爭資源失敗的大致流程,這里先讓大家有一個大致的印象,下面通過源碼具體分析是怎么進行操作的。
AQS共享鎖模式
AQS獲取共享鎖是通過調用acquireShared() 這個頂層方法,我們看一下這個方法的源代碼:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
這個方法中有一個if判斷,當tryAcquireShared()這個返回值是小於0的時候獲取鎖失敗,進入doAcquireShared()方法。tryAcquireShared方法是用來獲取共享模式下的鎖,對於tryAcquireShared()這個方法我們重點看一下他的返回值。jdk1.8中是這樣寫的
* @return a negative value on failure; zero if acquisition in shared
* mode succeeded but no subsequent shared-mode acquire can
* succeed; and a positive value if acquisition in shared
* mode succeeded and subsequent shared-mode acquires might
* also succeed, in which case a subsequent waiting thread
* must check availability. (Support for three different
* return values enables this method to be used in contexts
* where acquires only sometimes act exclusively.) Upon
* success, this object has been acquired.
當失敗的時候返回的是負值,如果返回的是0表示獲取共享模式成功但是它下一個節點的共享模式無法獲取成功。如果返回的是正數也就是大於0,表示當前線程獲取共享模式成功,並且它后面的線程也可以獲取共享模式。
當共享模式獲取失敗的時候,我們看一下doAcquireShared源代碼做了哪些操作
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
首先調用addWaiter()方法,它主要是封裝為Node節點,並且把該節點添加到隊列的尾部。此處傳入共享模式的參數,節點就變成了共享模式。
當前線程添加到隊列后,然后通過自旋(for(;;))獲取前驅節點,如果前驅節點是頭節點,那么調用tryAcquireShared()方法獲取當前節點的狀態,注意此方法的返回值在上面已經介紹過,等於0表示不用喚醒后繼節點,只有大於0才會喚醒后面的所有節點。
如果獲取共享資源成功,調用setHeadAndPropagate方法設置當前節點為頭節點,並讓原來的頭節點出隊列。如果在獲取鎖自旋的過程中中斷過,那么將當前線程中斷。
如果當前節點的前驅節點不是頭節點,通過shouldParkAfterFailedAcquire判斷當前線程的狀態,如果線程阻塞返回true,否則返回false. parkAndCheckInterrupt方法是指當前線程在獲取鎖的過程中是否被中斷喚醒,如果當前線程狀態阻塞並且被中斷過那么就把標志為interrupted更新為true。
如果發生異常調用cancelAcquire方法,此方法是把當前節點先更新為取消狀態,並清除該節點。
setHeadAndPropagate我們看一下這個方法的源代碼
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);//設置當前節點為頭節點
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {//符合狀態的將全部喚醒
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
此方法傳遞了2個參數,一個是當前節點,一個是tryAcquireShared方法的返回值。從源代碼中我們看到它首先記錄了當前頭節點,然后它通過setHead()方法把當前獲取到鎖的節點設置為頭節點。通過if語句把符合條件的繼續喚醒后繼節點,如果下一個節點為空那么調用doReleaseShared方法,doReleaseShared方法繼續喚醒后面的節點。此方法會在共享鎖釋放詳細講解。
共享鎖釋放
我們來看一下releaseShared的源代碼,此方法是共享模式釋放資源的頂層方法。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared方法獲取共享模式資源釋放,如果釋放成功那么會調用doReleaseShared繼續喚醒下一個節點.
我們繼續看一下具體的喚醒操作doReleaseShared() 這個方法
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
通過源代碼我們發現,當前線程狀態如果是Node.SIGNAL,Node.SIGNAL的值是-1,是一個靜態常量,此值表示當前線程被掛起。如果當前線程被掛起,那么更新當前線程的狀態值為0.如果更新失敗那么就繼續。更新成功后調用unparkSuccessor()此方法是喚醒共享鎖的第一個節點。如果本身頭節點屬於重置狀態waitStatus==0,並且把它設置為傳播狀態那么就向下一個節點傳播。
我們再看一下unparkSuccessor這個方法的源碼
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
從這個方法中我們發現如果當先線程的狀態是小於0,那么就把當前線程重置為0.為什么是小於0呢,上篇文章已經講過,waitStatus<0為等待或掛起狀態。也就是如果當前線程是等待掛起狀態,那么把當前線程狀態重置為0。然后找到下一個節點,如果下一個節點是空或下一個線程已經被取消,那么就從頭部找下一個沒有被取消的節點。當下一個節點不為空的時候,調用LockSupport.unpark方法喚醒當前線程。LockSupport.unpark會調用Unsafe這個類調用native方法進行執行。