AQS工作原理分析
一、大致介紹
1、前面章節講解了一下CAS,簡單講就是cmpxchg+lock的原子操作;
2、而在談到並發操作里面,我們不得不談到AQS,JDK的源碼里面好多並發的類都是通過Sync的內部類繼承AQS而實現出五花八門的功能;
3、本章節就和大家分享分析一下AQS的工作原理;
二、簡單認識AQS
2.1 何為AQS?
1、AQS是一個抽象類,類名為AbstractQueuedSynchronizer,抽象的都是一些公用的方法屬性,其自身是沒有實現任何同步接口的;
2、AQS定義了同步器中獲取鎖和釋放鎖,目的來讓自定義同步器組件來使用或重寫;
3、縱觀AQS的子類,絕大多數都是一個叫Sync的靜態內部類來繼承AQS類,通過重寫AQS中的一些方法來實現自定義同步器;
4、AQS定義了兩種資源共享方式:EXCLUSIVE( 獨占式:每次僅有一個Thread能執行 )、SHARED( 共享式:多個線程可同時執行 );
5、AQS維護了一個FIFO的CLH鏈表隊列,且該隊列不支持基於優先級的同步策略;
2.2 AQS的state關鍵詞
1、private volatile int state:維護了一個volatile的int類型的state字段,該字段是實現AQS的核心關鍵詞;
2、通過getState、setState、compareAndSetState方法類獲取、設置更新state值;
3、該字段在不同的並發類中起着不同的紐帶作用,下面會接着講到state字段的一些應用場景;
2.3 Node的waitStatus關鍵詞
1、正常默認的狀態值為0;
2、對於釋放操作的時候,前一個結點有喚醒后一個結點的任務;
3、當前結點的前置結點waitStatus > 0,則結點處於CANCELLED狀態,應該需要踢出隊列;
4、當前結點的前置結點waitStatus = 0,則需要將前置結點改為SIGNAL狀態;
2.4 CLH隊列
1、隊列模型:
+------+ prev +------+ prev +------+
| | <---- | | <---- | |
head | Node | next | Node | next | Node | tail
| | ----> | | ----> | |
+------+ +------+ +------+
2、鏈表結構,在頭尾結點中,需要特別指出的是頭結點是一個空對象結點,無任何意義,即傀儡結點;
3、每一個Node結點都維護了一個指向前驅的指針和指向后驅的指針,結點與結點之間相互關聯構成鏈表;
4、入隊在尾,出隊在頭,出隊后需要激活該出隊結點的后繼結點,若后繼結點為空或后繼結點waitStatus>0,則從隊尾向前遍歷取waitStatus<0的觸發阻塞喚醒;
2.5 state在AQS簡單應用舉例
1、CountDownLatch,簡單大致意思為:A組線程等待另外B組線程,B組線程執行完了,A組線程才可以執行;
state初始化假設為N,后續每countDown()一次,state會CAS減1。
等到所有子線程都執行完后(即state=0),會unpark()主調用線程,然后主調用線程就會從await()函數返回,繼續后余動作。
2、ReentrantLock,簡單大致意思為:獨占式鎖的類;
state初始化為0,表示未鎖定狀態,然后每lock()時調用tryAcquire()使state加1,
其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會獲取該鎖;
3、Semaphore,簡單大致意思為:A、B、C、D線程同時爭搶資源,目前卡槽大小為2,若A、B正在執行且未執行完,那么C、D線程在門外等着,一旦A、B有1個執行完了,那么C、D就會競爭看誰先執行;
state初始值假設為N,后續每tryAcquire()一次,state會CAS減1,當state為0時其它線程處於等待狀態,
直到state>0且<N后,進程又可以獲取到鎖進行各自操作了;
2.6 常用重要的方法
1、protected boolean isHeldExclusively()
// 需要被子類實現的方法,調用該方法的線程是否持有獨占鎖,一般用到了condition的時候才需要實現此方法
2、protected boolean tryAcquire(int arg)
// 需要被子類實現的方法,獨占方式嘗試獲取鎖,獲取鎖成功后返回true,獲取鎖失敗后返回false
3、protected boolean tryRelease(int arg)
// 需要被子類實現的方法,獨占方式嘗試釋放鎖,釋放鎖成功后返回true,釋放鎖失敗后返回false
4、protected int tryAcquireShared(int arg)
// 需要被子類實現的方法,共享方式嘗試獲取鎖,獲取鎖成功后返回正數1,獲取鎖失敗后返回負數-1
5、protected boolean tryReleaseShared(int arg)
// 需要被子類實現的方法,共享方式嘗試釋放鎖,釋放鎖成功后返回正數1,釋放鎖失敗后返回負數-1
6、final boolean acquireQueued(final Node node, int arg)
// 對於進入隊尾的結點,檢測自己可以休息了,如果可以修改則進入SIGNAL狀態且進入park()阻塞狀態
7、private Node addWaiter(Node mode)
// 添加結點到鏈表隊尾
8、private Node enq(final Node node)
// 如果addWaiter嘗試添加隊尾失敗,則再次調用enq此方法自旋將結點加入隊尾
9、private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
// 檢測結點狀態,如果可以休息的話則設置waitStatus=SIGNAL並調用LockSupport.park休息;
10、private void unparkSuccessor(Node node)
// 釋放鎖時,該方法需要負責喚醒后繼節點
2.7 設計與實現偽代碼
1、獲取獨占鎖:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire{
如果嘗試獲取獨占鎖失敗的話( 嘗試獲取獨占鎖的各種方式由AQS的子類實現 ),
那么就新增獨占鎖結點通過自旋操作加入到隊列中,並且根據結點中的waitStatus來決定是否調用LockSupport.park進行休息
}
2、釋放獨占鎖:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release{
如果嘗試釋放獨占鎖成功的話( 嘗試釋放獨占鎖的各種方式由AQS的子類實現 ),
那么取出頭結點並根據結點waitStatus來決定是否有義務喚醒其后繼結點
}
3、獲取共享鎖:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
acquireShared{
如果嘗試獲取共享鎖失敗的話( 嘗試獲取共享鎖的各種方式由AQS的子類實現 ),
那么新增共享鎖結點通過自旋操作加入到隊尾中,並且根據結點中的waitStatus來決定是否調用LockSupport.park進行休息
}
4、釋放共享鎖:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releaseShared{
如果嘗試釋放共享鎖失敗的話( 嘗試釋放共享鎖的各種方式由AQS的子類實現 ),
那么通過自旋操作喚完成阻塞線程的喚起操作
}
三、舉例ReentrantLock
3.1、ReentrantLock
1、在分析AQS源碼前,我們需要依賴一個載體來說,畢竟AQS的一些方法都是空方法且拋異常的,所以單講AQS不太生動形象;
2、因此我們決定采用ReentrantLock來講解,其他都大致差不多,因為了解了一個,其他都可以依葫蘆畫瓢秒懂;
3.2、ReentrantLock生活細節化理解
比如我們天天在外面吃快餐,我就以吃快餐為例生活化闡述該ReentrantLock原理:
1、場景:餐廳只有一個排隊的走廊,只有一個打飯菜的師傅;
2、開飯時間點,大家都爭先恐后的去吃飯,因此排上了隊,挨個挨個排隊打飯菜,任何一個人只要排到了打飯師傅的前面,都可以打到飯菜;
3、但是有時候隊很長,有些人之間的關系是家屬關系,如果后來的人看到自己家屬正在打飯菜,這個時候可以不用排隊直接跑到前面打飯菜;
4、總之大家都挨個挨個排隊打飯,有家屬關系的直接跑到前面打飯菜;
5、到此打止,1、2、3、4可以認為是一種公平方式的獨占鎖,3可以理解為重入鎖;
5、但是呢,還有那么些緊急趕時間的人,而且又跟排隊的人沒半點瓜葛,來餐廳時剛好看到師傅剛剛打完一個人的飯菜,於是插入去打飯菜敢時間;
6、如果敢時間人的來的時候發現師傅還在打飯菜,那么就只得乖乖的排隊等候打飯菜咯;
7、到此打止,1、2、5、6可以認為是一種非公平方式的獨占鎖;
四、源碼分析ReentrantLock
4.1、ReentrantLock構造器
1、構造器源碼:
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
2、默認構造方法為非公平鎖,帶參構造方法還可通過傳入變量還決定調用方是使用公平鎖還是非公平鎖;
4.2、Sync同步器
1、AQS --> Sync ---> FairSync // 公平鎖
|
|> NonfairSync // 非公平鎖
2、ReentrantLock內的同步器都是通過Sync抽象接口來操作調用關系的,細看會發現基本上都是通過sync.xxx之類的這種調用方式的;
4.3、lock()
1、源碼:
public void lock() {
sync.lock();
}
// FairSync 公平鎖調用方式
final void lock() {
acquire(1); // 嘗試獲取獨占鎖
}
// NonfairSync 非公平鎖調用方式
final void lock() {
if (compareAndSetState(0, 1)) // 首先判斷state資源是否為0,如果恰巧為0則表明目前沒有線程占用鎖,則利用CAS占有鎖
setExclusiveOwnerThread(Thread.currentThread()); // 當獨占鎖之后則將設置exclusiveOwnerThread為當前線程
else
acquire(1); // 若CAS占用鎖失敗的話,則再嘗試獲取獨占鎖
}
2、這里的區別就是非公平鎖在調用lock時首先檢測了是否通過CAS獲取鎖,發現鎖一旦空着的話,則搶先一步占為己有,
不管有沒有阻塞隊列,只要當前線程來的時候發現state資源沒被占用那么當前線程就搶先一步試一下CAS,CAS失敗了它才去排隊;
4.4、acquire(int)
1、源碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 嘗試獲取鎖資源,若獲取到資源的話則線程直接返回,此方法由AQS的具體子類實現
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 否則獲取資源失敗的話,那么就進入等待隊列
selfInterrupt();
}
2、該方法是獨占模式下線程獲取state共享資源的入口,如果獲取到資源的話就返回,否則創建獨占模式結點加入阻塞隊列,直到獲取到共享資源;
3、而且這里需要加上自我中斷判斷,主要是因為線程在等待過程中被中斷的話,它是不響應的,那么就只有等到線程獲取到資源后通過自我判斷將這個判斷后續補上;
4、獨占模式的該方法,正常情況下只要沒有獲取到鎖,該方法一直處於阻塞狀態,獲取到了則跳出該方法區;
4.5、tryAcquire(int)
1、公平鎖tryAcquire源碼:
// FairSync 公平鎖的 tryAcquire 方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 獲取鎖資源的最新內存值
if (c == 0) { // 當state=0,說明鎖資源目前還沒有被任何線程被占用
if (!hasQueuedPredecessors() && // 檢查線程是否有阻塞隊列
compareAndSetState(0, acquires)) { // 如果沒有阻塞隊列,則通過CAS操作獲取鎖資源
setExclusiveOwnerThread(current); // 沒有阻塞隊列,且CAS又成功獲取鎖資源,則設置獨占線程對象為當前線程
return true; // 返回標志,告訴上層該線程已經獲取到了鎖資源
}
}
// 執行到此,鎖資源值不為0,說明已經有線程正在占用這鎖資源
else if (current == getExclusiveOwnerThread()) { // 既然鎖已經被占用,則看看占用鎖的線程是不是當前線程
int nextc = c + acquires; // 如果占用的鎖的線程是當前線程的話,則為重入鎖概念,狀態值做加1操作
// int類型值小於0,是因為該int類型的state狀態值溢出了,溢出了的話那得說明這個鎖有多難獲取啊,可能出問題了
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true; // 返回成功標志,告訴上層該線程已經獲取到了鎖資源
}
return false; // 返回失敗標志,告訴上層該線程沒有獲取到鎖資源
}
2、非公平鎖tryAcquire源碼:
// NonfairSync 非公平鎖的 tryAcquire 方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires); // 調用父類的非公平獲取鎖資源方法
}
// NonfairSync 非公平鎖父類 Sync 類的 nonfairTryAcquire 方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 獲取鎖資源的最新內存值
if (c == 0) { // 當state=0,說明鎖資源目前還沒有被任何線程被占用
if (compareAndSetState(0, acquires)) { // 先不管三七二十一,先嘗試通過CAS操作獲取鎖資源
setExclusiveOwnerThread(current); // CAS一旦成功獲取鎖資源,則設置獨占線程對象為當前線程
return true;// 返回成功標志,告訴上層該線程已經獲取到了鎖資源
}
}
// 執行到此,鎖資源值不為0,說明已經有線程正在占用這鎖資源
else if (current == getExclusiveOwnerThread()) { // 既然鎖已經被占用,則看看占用鎖的線程是不是當前線程
int nextc = c + acquires; // 如果占用的鎖的線程是當前線程的話,則為重入鎖概念,狀態值做加1操作
// int類型值小於0,是因為該int類型的state狀態值溢出了,溢出了的話那得說明這個鎖有多難獲取啊,可能出問題了
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc); //
return true; // 返回成功標志,告訴上層該線程已經獲取到了鎖資源
}
return false; // 返回失敗標志,告訴上層該線程沒有獲取到鎖資源
}
3、tryAcquire方法是AQS的子類實現的,也就是ReentrantLock的兩個靜態內部類實現的,目的就是通過CAS嘗試獲取鎖資源,
獲取鎖資源成功則返回true,獲取鎖資源失敗則返回false;
4.6、addWaiter(Node)
1、源碼:
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 按照給定的mode模式創建新的結點,模式有兩種:Node.EXCLUSIVE獨占模式、Node.SHARED共享模式;
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 將先隊尾結點賦值給臨時變量
if (pred != null) { // 如果pred不為空,說明該隊列已經有結點了
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 通過CAS嘗試將node結點設置為隊尾結點
pred.next = node;
return node;
}
}
// 執行到此,說明隊尾沒有元素,則進入自旋首先設置頭結點,然后將此新建結點添加到隊尾
enq(node); // 進入自旋添加node結點
return node;
}
2、 addWaiter通過傳入不同的模式來創建新的結點嘗試加入到隊列尾部,如果由於並發導致添加結點到隊尾失敗的話那么就進入自旋將結點加入隊尾;
```
4.7、enq(Node)
1、源碼:
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) { // 自旋的死循環操作方式
Node t = tail;
// 因為是自旋方式,首次鏈表隊列tail肯定為空,但是后續鏈表有數據后就不會為空了
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) // 隊列為空時,則創建一個空對象結點作為頭結點,無意思,可認為傀儡結點
tail = head; // 空隊列的話,頭尾都指向同一個對象
} else {
// 進入 else 方法里面,說明鏈表隊列已經有結點了
node.prev = t;
// 因為存在並發操作,通過CAS嘗試將新加入的node結點設置為隊尾結點
if (compareAndSetTail(t, node)) {
// 如果node設置隊尾結點成功,則將之前的舊的對象尾結點t的后繼結點指向node,node的前驅結點也設置為t
t.next = node;
return t;
}
}
// 如果執行到這里,說明上述兩個CAS操作任何一個失敗的話,該方法是不會放棄的,因為是自旋操作,再次循環繼續入隊
}
}
2、enq通過自旋這種死循環的操作方式,來確保結點正確的添加到隊列尾部,通過CAS操作如果頭部為空則添加傀儡空結點,然后在循環添加隊尾結點;
4.8、compareAndSetHead/compareAndSetTail
1、源碼:
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
2、CAS操作,設置頭結點、尾結點;
4.9、acquireQueued(Node, int)
1、源碼:
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 自旋的死循環操作方式
final Node p = node.predecessor(); // 如果新建結點的前驅結點是頭結點
// 如果前驅結點為頭結點,那么該結點則是老二,僅次於老大,也希望嘗試去獲取一下鎖,萬一頭結點恰巧剛剛釋放呢?希望還是要有的,萬一實現了呢。。。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
// 拿到鎖資源后,則該node結點升級做頭結點,且設置后繼結點指針為空,便於GC回收
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 根據前驅結點看看是否需要休息一會兒
parkAndCheckInterrupt()) // 阻塞操作,正常情況下,獲取不到鎖,代碼就在該方法停止了,直到被喚醒
interrupted = true;
// 如果執行到這里,說明嘗試休息失敗了,因為是自旋操作,所以還會再次循環繼續操作判斷
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2、acquireQueued也是采用一個自旋的死循環操作方式,只有頭結點才能嘗試獲取鎖資源,其余的結點挨個挨個在那里等待修改,等待被喚醒,等待機會成為頭結點;
而新添加的node結點也自然逃不過如此命運,先看看是否頭結點,然后再看看是否能休息;
4.10、shouldParkAfterFailedAcquire(Node, Node)
1、源碼:
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 獲取前驅結點的狀態值
if (ws == Node.SIGNAL) // 若前驅結點的狀態為SIGNAL狀態的話,那么該結點就不要想事了,直接返回true准備休息
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 若前驅結點的狀態為CANCELLED狀態的話,那么就一直向前遍歷,直到找到一個不為CANCELLED狀態的結點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 剩下的結點狀態,則設置其為SIGNAL狀態,然后返回false標志等外層循環再次判斷
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
2、shouldParkAfterFailedAcquire主要是檢測前驅結點狀態,前驅結點為SIGNAL的話,則新結點可以安安心心休息了;
如果前驅結點大於零,說明前驅結點處於CANCELLED狀態,那么則以入參pred前驅為起點,一直往前找,直到找到最近一個正常等待狀態的結點;
如果前驅結點小於零,那么就將前驅結點設置為SIGNAL狀態,然后返回false依賴acquireQueued的自旋再次判斷是否需要進行休息;
```
4.11、parkAndCheckInterrupt()
1、源碼:
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 阻塞等待
return Thread.interrupted(); // 被喚醒后查看是否有被中斷過否?
}
2、parkAndCheckInterrupt首先調用park讓線程進入等待狀態,然后當park阻塞被喚醒后,再次檢測是否曾經被中斷過;
而被喚醒有兩種情況,一個是利用unpark喚醒,一個是利用interrupt喚醒;
4.12、unlock()
```
1、源碼:
public void unlock() {
sync.release(1); //
}
2、unlock釋放鎖資源,一般都是在finally中被調用,防止當臨界區因為任何異常時怕鎖不被釋放;
而釋放鎖不像獲取鎖lock的實現多色多樣,沒有所謂公平或不公平,就是規規矩矩的釋放資源而已;
4.13、release(int)
1、源碼:
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) { // 嘗試釋放鎖資源,此方法由AQS的具體子類實現
Node h = head;
if (h != null && h.waitStatus != 0) // 從頭結點開始,喚醒后繼結點
unparkSuccessor(h); // 踢出CANCELLED狀態結點,然后喚醒后繼結點
return true;
}
return false;
}
2、release嘗試釋放鎖,並且有義務移除CANCELLED狀態的結點,還有義務喚醒后繼結點繼續運行獲取鎖資源;
4.14、tryRelease(int)
1、源碼:
// NonfairSync 和 FairSync 的父類 Sync 類的 tryRelease 方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 獲取鎖資源值並做減1操作
if (Thread.currentThread() != getExclusiveOwnerThread()) // 查看當前線程是否和持有鎖的線程是不是同一個線程
// 正常情況下,需要釋放的線程肯定是持有鎖的線程,否則不就亂套了,肯定哪里出問題了,所以拋出異常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 若此時鎖資源值做減法操作后正好是0,則所有鎖資源已經釋放干凈,因此持有鎖的變量也置為空
free = true;
setExclusiveOwnerThread(null);
}
setState(c); // 若此時做減法操作還沒有歸零,那么這種情況就是那種重入鎖,需要重重釋放后才行
return free;
}
2、tryRelease主要通過CAS操作對state鎖資源進行減1操作;
4.15、unparkSuccessor(Node)
1、源碼:
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 該node一般都是傳入head進來,也就是說,需要釋放頭結點,也就是當前結點需要釋放鎖操作,順便喚醒后繼結點
int ws = node.waitStatus;
if (ws < 0) // 若結點狀態值小於0,則歸零處理,通過CAS歸零,允許失敗,但是不管怎么着,仍然要往下走去喚醒后繼結點
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; // 取出后繼結點,這個時候一般都是Head后面的一個結點,所以一般都是老二
if (s == null || s.waitStatus > 0) { // 若后繼結點為空或者后繼結點已經處於CANCELLED狀態的話
s = null;
// 那么從隊尾向前遍歷,直到找到一個小於等於0的結點
// 這里為什么要從隊尾向前尋找?
// * 因為在這個隊列中,任何一個結點都有可能被中斷,只是有可能,並不代表絕對的,但有一點是確定的,
// * 被中斷的結點會將結點的狀態設置為CANCELLED狀態,標識這個結點在將來的某個時刻會被踢出;
// * 踢出隊列的規則很簡單,就是該結點的前驅結點不會指向它,而是會指向它的后面的一個非CANCELLED狀態的結點;
// * 而這個將被踢出的結點,它的next指針將會指向它自己;
// * 所以設想一下,如果我們從head往后找,一旦發現這么一個處於CANCELLED狀態的結點,那么for循環豈不是就是死循環了;
// * 但是所有的這些結點當中,它們的prev前驅結點還是沒有被誰動過,所以從tail結點向前遍歷最穩妥
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 喚醒線程
}
2、unparkSuccessor主要是踢出CANCELLED狀態結點,然后喚醒后繼結點;
但是這個喚醒的后繼結點為空的話,那么則從隊尾一直向前循環查找小於等於零狀態的結點並調用unpark喚醒;
五、總結
1、分析了這么多,感覺是不是有一種豁然開朗的感覺,原來大家傳的神乎其神的AQS是不是沒有想象中那么難以理解;
2、在這里我簡要總結一下AQS的流程的一些特性:
• 關鍵獲取鎖、釋放鎖操作由AQS子類實現:acquire-release、acquireShared-releaseShared;
• 維護了一個FIFO鏈表結構的隊列,通過自旋方式將新結點添加到隊尾;
• 添加結點時會從前驅結點向前遍歷,跳過那些處於CANCELLED狀態的結點;
• 釋放結點時會從隊尾向前遍歷,踢出CANCELLED狀態的結點,然后喚醒后繼結點;
3、其實當了解了AQS后,這里以ReentrantLock為載體分析了一下,那么再去分析CountDownLatch、Semaphore、ReentrantReadWriteLock等那些集成AQS而實現不同功能的模塊就會順利很多;
