前言
如果你想深入研究Java並發的話,那么AQS一定是繞不開的一塊知識點,Java並發包很多的同步工具類底層都是基於AQS來實現的,比如我們工作中經常用的Lock工具ReentrantLock、柵欄CountDownLatch、信號量Semaphore等,而且關於AQS的知識點也是面試中經常考察的內容,所以,無論是為了更好的使用還是為了應付面試,深入學習AQS都很有必要。
CAS
學習AQS之前,我們有必要了解一個知識點,就是AQS底層中大量使用的CAS,關於CAS,大家應該都不陌生,如果還有哪位同學不清楚的話,可以看看我之前的文章《面試必備知識點:悲觀鎖和樂觀鎖的那些事兒》 ,這里不多復述,哈哈,給自己舊文章加了閱讀量
此時,好幾塊搬磚朝我飛了過來。。。。。
好吧,開個玩笑,還是大概講解一下吧,了解的同學可以跳過這一段。
CAS是樂觀鎖的一種思想,它假設線程對資源的訪問是沒有沖突的,同時所有的線程執行都不需要等待,可以持續執行。 如果有沖突的話,就用比較+交換的方式來檢測沖突,有沖突就不斷重試。
CAS的全稱是Compare-and-Swap,也就是比較並交換,它包含了三個參數:V,A,B,V表示要讀寫的內存位置,A表示舊的預期值,B表示新值,當執行CAS時,只有當V的值等於預期值A時,才會把V的值改為B,這樣的方式可以讓多個線程同時去修改,但也會因為線程操作失敗而不斷重試,對CPU有一定程序上的開銷。
AQS簡介
本文主角正式登場。
AQS,全名AbstractQueuedSynchronizer,是一個抽象類的隊列式同步器,它的內部通過維護一個狀態volatile int state(共享資源),一個FIFO線程等待隊列來實現同步功能。
state用關鍵字volatile修飾,代表着該共享資源的狀態一更改就能被所有線程可見,而AQS的加鎖方式本質上就是多個線程在競爭state,當state為0時代表線程可以競爭鎖,不為0時代表當前對象鎖已經被占有,其他線程來加鎖時則會失敗,加鎖失敗的線程會被放入一個FIFO的等待隊列中,這些線程會被UNSAFE.park()
操作掛起,等待其他獲取鎖的線程釋放鎖才能夠被喚醒。
而這個等待隊列其實就相當於一個CLH隊列,用一張原理圖來表示大致如下:
基礎定義
AQS支持兩種資源分享的方式:Exclusive(獨占,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。
自定義的同步器繼承AQS后,只需要實現共享資源state的獲取和釋放方式即可,其他如線程隊列的維護(如獲取資源失敗入隊/喚醒出隊等)等操作,AQS在頂層已經實現了,
AQS代碼內部提供了一系列操作鎖和線程隊列的方法,主要操作鎖的方法包含以下幾個:
- compareAndSetState():利用CAS的操作來設置state的值
- tryAcquire(int):獨占方式獲取鎖。成功則返回true,失敗則返回false。
- tryRelease(int):獨占方式釋放鎖。成功則返回true,失敗則返回false。
- tryAcquireShared(int):共享方式釋放鎖。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
- tryReleaseShared(int):共享方式釋放鎖。如果釋放后允許喚醒后續等待結點返回true,否則返回false。
像ReentrantLock就是實現了自定義的tryAcquire-tryRelease,從而操作state的值來實現同步效果。
除此之外,AQS內部還定義了一個靜態類Node,表示CLH隊列的每一個結點,該結點的作用是對每一個等待獲取資源做了封裝,包含了需要同步的線程本身、線程等待狀態.....
我們可以看下該類的一些重點變量:
static final class Node {
/** 表示共享模式下等待的Node */
static final Node SHARED = new Node();
/** 表示獨占模式下等待的mode */
static final Node EXCLUSIVE = null;
/** 下面幾個為waitStatus的具體值 */
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
/** 表示前面的結點 */
volatile Node prev;
/** 表示后面的結點 */
volatile Node next;
/**當前結點裝載的線程,初始化時被創建,使用后會置空*/
volatile Thread thread;
/**鏈接到下一個節點的等待條件,用到Condition的時候會使用到*/
Node nextWaiter;
}
代碼里面定義了一個表示當前Node結點等待狀態的字段waitStatus
,該字段的取值包含了CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、0,這五個值代表了不同的特定場景:
- CANCELLED:表示當前結點已取消調度。當timeout或被中斷(響應中斷的情況下),會觸發變更為此狀態,進入該狀態后的結點將不會再變化。
- SIGNAL:表示后繼結點在等待當前結點喚醒。后繼結點入隊時,會將前繼結點的狀態更新為SIGNAL(記住這個-1的值,因為后面我們講的時候經常會提到)
- CONDITION:表示結點等待在Condition上,當其他線程調用了Condition的signal()方法后,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。(注:Condition是AQS的一個組件,后面會細說)
- PROPAGATE:共享模式下,前繼結點不僅會喚醒其后繼結點,同時也可能會喚醒后繼的后繼結點。
- 0:新結點入隊時的默認狀態。
也就是說,當waitStatus為負值表示結點處於有效等待狀態,為正值的時候表示結點已被取消。
在AQS內部中還維護了兩個Node對象head
和tail
,一開始默認都為null
private transient volatile Node head;
private transient volatile Node tail;
講完了AQS的一些基礎定義,我們就可以開始學習同步的具體運行機制了,為了更好的演示,我們用ReentrantLock作為使用入口,一步步跟進源碼探究AQS底層是如何運作的,這里說明一下,因為ReentrantLock底層調用的AQS是獨占模式,所以下文講解的AQS源碼也是針對獨占模式的操作
好了,熱身正式結束,來吧。
獨占模式
加鎖過程
我們都知道,ReentrantLock的加鎖和解鎖方法分別為lock()和unLock(),我們先來看獲取鎖的方法,
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
邏輯很簡單,線程進來后直接利用CAS
嘗試搶占鎖,如果搶占成功state
值回被改為1,且設置對象獨占鎖線程為當前線程,否則就調用acquire(1)
再次嘗試獲取鎖。
我們假定有兩個線程A和B同時競爭鎖,A進來先搶占到鎖,此時的AQS模型圖就類似這樣:
繼續走下面的方法,
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire
包含了幾個函數的調用,
tryAcquire:嘗試直接獲取鎖,如果成功就直接返回;
addWaiter:將該線程加入等待隊列FIFO的尾部,並標記為獨占模式;
acquireQueued:線程阻塞在等待隊列中獲取鎖,一直獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
selfInterrupt:自我中斷,就是既拿不到鎖,又在等待時被中斷了,線程就會進行自我中斷selfInterrupt(),將中斷補上。
我們一個個來看源碼,並結合上面的兩個線程來做場景分析。
tryAcquire
不用多說,就是為了再次嘗試獲取鎖
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
當線程B進來后,nonfairTryAcquire方法首先會獲取state的值,如果為0,則正常獲取該鎖,不為0的話判斷是否是當前線程占用了,是的話就累加state的值,這里的累加也是為了配合釋放鎖時候的次數,從而實現可重入鎖的效果。
當然,因為之前鎖已經被線程A占領了,所以這時候tryAcquire
會返回false,繼續下面的流程。
addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
這段代碼首先會創建一個和當前線程綁定的Node
節點,Node
為雙向鏈表。此時等待隊列中的tail
指針為空,直接調用enq(node)
方法將當前線程加入等待隊列尾部,然后返回當前結點的前驅結點,
private Node enq(final Node node) {
// CAS"自旋",直到成功加入隊尾
for (;;) {
Node t = tail;
if (t == null) {
// 隊列為空,初始化一個Node結點作為Head結點,並將tail結點也指向它
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 把當前結點插入隊列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
第一遍循環時,tail指針為空,初始化一個Node結點,並把head和tail結點都指向它,然后第二次循環進來之后,tail結點不為空了,就將當前的結點加入到tail結點后面,也就是這樣:
todo 如果此時有另一個線程C進來的話,發現鎖已經被A拿走了,然后隊列里已經有了線程B,那么線程C就只能乖乖排到線程B的后面去,
acquireQueued
接着解讀方法,通過tryAcquire()和addWaiter(),我們的線程還是沒有拿到資源,並且還被排到了隊列的尾部,如果讓你來設計的話,這個時候你會怎么處理線程呢?其實答案也很簡單,能做的事無非兩個:
1、循環讓線程再搶資源。但仔細一推敲就知道不合理,因為如果有多個線程都參與的話,你搶我也搶只會降低系統性能
2、進入等待狀態休息,直到其他線程徹底釋放資源后喚醒自己,自己再拿到資源
毫無疑問,選擇2更加靠譜,acquireQueued方法做的也是這樣的處理:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 標記是否會被中斷
boolean interrupted = false;
// CAS自旋
for (;;) {
// 獲取當前結點的前結點
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 獲取鎖失敗,則將此線程對應的node的waitStatus改為CANCEL
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驅結點等待狀態為"SIGNAL",那么自己就可以安心等待被喚醒了
return true;
if (ws > 0) {
/*
* 前驅結點被取消了,通過循環一直往前找,直到找到等待狀態有效的結點(等待狀態值小於等於0) ,
* 然后排在他們的后邊,至於那些被當前Node強制"靠后"的結點,因為已經被取消了,也沒有引用鏈,
* 就等着被GC了
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前驅正常,那就把前驅的狀態設置成SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
acquireQueued
方法的流程是這樣的:
1、CAS自旋,先判斷當前傳入的Node的前結點是否為head結點,是的話就嘗試獲取鎖,獲取鎖成功的話就把當前結點置為head,之前的head置為null(方便GC),然后返回
2、如果前驅結點不是head或者加鎖失敗的話,就調用shouldParkAfterFailedAcquire
,將前驅節點的waitStatus變為了SIGNAL=-1,最后執行parkAndChecknIterrupt
方法,調用LockSupport.park()
掛起當前線程,parkAndCheckInterrupt
在掛起線程后會判斷線程是否被中斷,如果被中斷的話,就會重新跑acquireQueued
方法的CAS自旋操作,直到獲取資源。
ps:LockSupport.park方法會讓當前線程進入waitting狀態,在這種狀態下,線程被喚醒的情況有兩種,一是被unpark(),二是被interrupt(),所以,如果是第二種情況的話,需要返回被中斷的標志,然后在acquire
頂層方法的窗口那里自我中斷補上
此時,因為線程A還未釋放鎖,所以線程B狀態都是被掛起的,
到這里,加鎖的流程就分析完了,其實整體來說也並不復雜,而且當你理解了獨占模式加鎖的過程,后面釋放鎖和共享模式的運行機制也沒什么難懂的了,所以整個加鎖的過程還是有必要多消化下的,也是AQS的重中之重。
為了方便你們更加清晰理解,我加多一張流程圖吧(這個作者也太暖了吧,哈哈)
釋放鎖
說完了加鎖,我們來看看釋放鎖是怎么做的,AQS中釋放鎖的方法是release()
,當調用該方法時會釋放指定量的資源 (也就是鎖) ,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。
還是一步步看源碼吧,
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
代碼上可以看出,核心的邏輯都在tryRelease
方法中,該方法的作用是釋放資源,AQS里該方法沒有具體的實現,需要由自定義的同步器去實現,我們看下ReentrantLock代碼中對應方法的源碼:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease
方法會減去state對應的值,如果state為0,也就是已經徹底釋放資源,就返回true,並且把獨占的線程置為null,否則返回false。
此時AQS中的數據就會變成這樣:
完全釋放資源后,當前線程要做的就是喚醒CLH隊列中第一個在等待資源的線程,也就是head結點后面的線程,此時調用的方法是unparkSuccessor()
,
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
//將head結點的狀態置為0
compareAndSetWaitStatus(node, ws, 0);
//找到下一個需要喚醒的結點s
Node s = node.next;
//如果為空或已取消
if (s == null || s.waitStatus > 0) {
s = null;
// 從后向前,直到找到等待狀態小於0的結點,前面說了,結點waitStatus小於0時才有效
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 找到有效的結點,直接喚醒
if (s != null)
LockSupport.unpark(s.thread);//喚醒
}
方法的邏輯很簡單,就是先將head的結點狀態置為0,避免下面找結點的時候再找到head,然后找到隊列中最前面的有效結點,然后喚醒,我們假設這個時候線程A已經釋放鎖,那么此時隊列中排最前邊競爭鎖的線程B就會被喚醒,
然后被喚醒的線程B就會嘗試用CAS獲取鎖,回到acquireQueued
方法的邏輯,
for (;;) {
// 獲取當前結點的前結點
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
當線程B獲取鎖之后,會把當前結點賦值給head,然后原先的前驅結點 (也就是原來的head結點) 去掉引用鏈,方便回收,這樣一來,線程B獲取鎖的整個過程就完成了,此時AQS的數據就會變成這樣:
到這里,我們已經分析完了AQS獨占模式下加鎖和釋放鎖的過程,也就是tryAccquire->tryRelease這一鏈條的邏輯,除此之外,AQS中還支持共享模式的同步,這種模式下關於鎖的操作核心其實就是tryAcquireShared->tryReleaseShared這兩個方法,我們可以簡單看下
共享模式
獲取鎖
AQS中,共享模式獲取鎖的頂層入口方法是acquireShared
,該方法會獲取指定數量的資源,成功的話就直接返回,失敗的話就進入等待隊列,直到獲取資源,
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
該方法里包含了兩個方法的調用,
tryAcquireShared:嘗試獲取一定資源的鎖,返回的值代表獲取鎖的狀態。
doAcquireShared:進入等待隊列,並循環嘗試獲取鎖,直到成功。
tryAcquireShared
tryAcquireShared
在AQS里沒有實現,同樣由自定義的同步器去完成具體的邏輯,像一些較為常見的並發工具Semaphore、CountDownLatch里就有對該方法的自定義實現,雖然實現的邏輯不同,但方法的作用是一樣的,就是獲取一定資源的資源,然后根據返回值判斷是否還有剩余資源,從而決定下一步的操作。
返回值有三種定義:
- 負值代表獲取失敗;
- 0代表獲取成功,但沒有剩余的資源,也就是state已經為0;
- 正值代表獲取成功,而且state還有剩余,其他線程可以繼續領取
當返回值小於0時,證明此次獲取一定數量的鎖失敗了,然后就會走doAcquireShared
方法
doAcquireShared
此方法的作用是將當前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應量的資源后才返回,這是它的源碼:
private void doAcquireShared(int arg) {
// 加入隊列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// CAS自旋
for (;;) {
final Node p = node.predecessor();
// 判斷前驅結點是否是head
if (p == head) {
// 嘗試獲取一定數量的鎖
int r = tryAcquireShared(arg);
if (r >= 0) {
// 獲取鎖成功,而且還有剩余資源,就設置當前結點為head,並繼續喚醒下一個線程
setHeadAndPropagate(node, r);
// 讓前驅結點去掉引用鏈,方便被GC
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 跟獨占模式一樣,改前驅結點waitStatus為-1,並且當前線程掛起,等待被喚醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// head指向自己
setHead(node);
// 如果還有剩余量,繼續喚醒下一個鄰居線程
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
看到這里,你會不會一點熟悉的感覺,這個方法的邏輯怎么跟上面那個acquireQueued()
那么類似啊?對的,其實兩個流程並沒有太大的差別。只是doAcquireShared()
比起獨占模式下的獲取鎖上多了一步喚醒后繼線程的操作,當獲取完一定的資源后,發現還有剩余的資源,就繼續喚醒下一個鄰居線程,這才符合"共享"的思想嘛。
這里我們可以提出一個疑問,共享模式下,當前線程釋放了一定數量的資源,但這部分資源滿足不了下一個等待結點的需要的話,那么會怎么樣?
按照正常的思維,共享模式是可以多個線程同時執行的才對,所以,多個線程的情況下,如果老大釋放完資源,但這部分資源滿足不了老二,但能滿足老三,那么老三就可以拿到資源。可事實是,從源碼設計中可以看出,如果真的發生了這種情況,老三是拿不到資源的,因為等待隊列是按順序排列的,老二的資源需求量大,會把后面量小的老三以及老四、老五等都給卡住。從這一個角度來看,雖然AQS嚴格保證了順序,但也降低了並發能力
接着往下說吧,喚醒下一個鄰居線程的邏輯在doReleaseShared()
中,我們放到下面的釋放鎖來解析。
釋放鎖
共享模式釋放鎖的頂層方法是releaseShared
,它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列里的其他線程來獲取資源。下面是releaseShared()的源碼:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
該方法同樣包含兩部分的邏輯:
tryReleaseShared:釋放資源。
doAcquireShared:喚醒后繼結點。
跟tryAcquireShared
方法一樣,tryReleaseShared
在AQS中沒有具體的實現,由子同步器自己去定義,但功能都一樣,就是釋放一定數量的資源。
釋放完資源后,線程不會馬上就收工,而是喚醒等待隊列里最前排的等待結點。
doAcquireShared
喚醒后繼結點的工作在doReleaseShared()
方法中完成,我們可以看下它的源碼:
private void doReleaseShared() {
for (;;) {
// 獲取等待隊列中的head結點
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// head結點waitStatus = -1,喚醒下一個結點對應的線程
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒后繼結點
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
代碼沒什么特別的,就是如果等待隊列head結點的waitStatus為-1的話,就直接喚醒后繼結點,喚醒的方法unparkSuccessor()
在上面已經講過了,這里也沒必要再復述。
總的來看,AQS共享模式的運作流程和獨占模式很相似,只要掌握了獨占模式的流程運轉,共享模式什么的不就那樣嗎,沒難度。這也是我為什么共享模式講解中不畫流程圖的原因,沒必要嘛。
Condition
介紹完了AQS的核心功能,我們再擴展一個知識點,在AQS中,除了提供獨占/共享模式的加鎖/解鎖功能,它還對外提供了關於Condition的一些操作方法。
Condition是個接口,在jdk1.5版本后設計的,基本的方法就是await()
和signal()
方法,功能大概就對應Object的wait()
和notify()
,Condition必須要配合鎖一起使用,因為對共享狀態變量的訪問發生在多線程環境下。一個Condition的實例必須與一個Lock綁定,因此Condition一般都是作為Lock的內部實現 ,AQS中就定義了一個類ConditionObject來實現了這個接口,
那么它應該怎么用呢?我們可以簡單寫個demo來看下效果
public class ConditionDemo {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread tA = new Thread(() -> {
lock.lock();
try {
System.out.println("線程A加鎖成功");
System.out.println("線程A執行await被掛起");
condition.await();
System.out.println("線程A被喚醒成功");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("線程A釋放鎖成功");
}
});
Thread tB = new Thread(() -> {
lock.lock();
try {
System.out.println("線程B加鎖成功");
condition.signal();
System.out.println("線程B喚醒線程A");
} finally {
lock.unlock();
System.out.println("線程B釋放鎖成功");
}
});
tA.start();
tB.start();
}
}
執行main函數后結果輸出為:
線程A加鎖成功
線程A執行await被掛起
線程B加鎖成功
線程B喚醒線程A
線程B釋放鎖成功
線程A被喚醒成功
線程A釋放鎖成功
代碼執行的結果很容易理解,線程A先獲取鎖,然后調用await()
方法掛起當前線程並釋放鎖,線程B這時候拿到鎖,然后調用signal
喚醒線程A。
毫無疑問,這兩個方法讓線程的狀態發生了變化,我們仔細來研究一下,
翻看AQS的源碼,我們會發現Condition中定義了兩個屬性firstWaiter
和lastWaiter
,前面說了,AQS中包含了一個FIFO的CLH等待隊列,每個Conditon對象就包含這樣一個等待隊列,而這兩個屬性分別表示的是等待隊列中的首尾結點,
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
注意:Condition當中的等待隊列和AQS主體的同步等待隊列是分開的,兩個隊列雖然結構體相同,但是作用域是分開的
await
先看await()
的源碼:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 將當前線程加入到等待隊列中
Node node = addConditionWaiter();
// 完全釋放占有的資源,並返回資源數
int savedState = fullyRelease(node);
int interruptMode = 0;
// 循環判斷當前結點是不是在Condition的隊列中,是的話掛起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
當一個線程調用Condition.await()方法,將會以當前線程構造結點,這個結點的waitStatus
賦值為Node.CONDITION,也就是-2,並將結點從尾部加入等待隊列,然后尾部結點就會指向這個新增的結點,
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
我們依然用上面的demo來演示,此時,線程A獲取鎖並調用Condition.await()方法后,AQS內部的數據結構會變成這樣:
在Condition隊列中插入對應的結點后,線程A會釋放所持有的資源,走到while循環那層邏輯,
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
isOnSyncQueue
方法的會判斷當前的線程節點是不是在同步隊列中,這個時候此結點還在Condition隊列中,所以該方法返回false,這樣的話循環會一直持續下去,線程被掛起,等待被喚醒,此時,線程A的流程暫時停止了。
當線程A調用await()
方法掛起的時候,線程B獲取到了線程A釋放的資源,然后執行signal()
方法:
signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
先判斷當前線程是否為獲取鎖的線程,如果不是則直接拋出異常。 接着調用doSignal()
方法來喚醒線程。
private void doSignal(Node first) {
// 循環,從隊列一直往后找不為空的首結點
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// CAS循環,將結點的waitStatus改為0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 上面已經分析過,此方法會把當前結點加入到等待隊列中,並返回前驅結點
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
從doSignal
的代碼中可以看出,這時候程序尋找的是Condition等待隊列中首結點firstWaiter的結點,此時該結點指向的是線程A的結點,所以之后的流程作用的都是線程A的結點。
這里分析下transferForSignal
方法,先通過CAS自旋將結點waitStatus改為0,然后就把結點放入到同步隊列 (此隊列不是Condition的等待隊列) 中,然后再用CAS將同步隊列中該結點的前驅結點waitStatus改為Node.SIGNAL,也就是-1,此時AQS的數據結構大概如下 (額.....少畫了個箭頭,大家就當head結點是線程A結點的前驅結點就好):
回到await()
方法,當線程A的結點被加入同步隊列中時,isOnSyncQueue()
會返回true,跳出循環,
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
接着執行acquireQueued()
方法,這里就不用多說了吧,嘗試重新獲取鎖,如果獲取鎖失敗繼續會被掛起,直到另外線程釋放鎖才被喚醒。
所以,當線程B釋放完鎖后,線程A被喚醒,繼續嘗試獲取鎖,至此流程結束。
對於這整個通信過程,我們可以畫一張流程圖展示下:
總結
說完了Condition的使用和底層運行機制,我們再來總結下它跟普通 wait/notify 的比較,一般這也是問的比較多的,Condition大概有以下兩點優勢:
- Condition 需要結合 Lock 進行控制,使用的時候要注意一定要對應的unlock(),可以對多個不同條件進行控制,只要new 多個 Condition對象就可以為多個線程控制通信,wait/notify 只能和 synchronized 關鍵字一起使用,並且只能喚醒一個或者全部的等待隊列;
- Condition 有類似於 await 的機制,因此不會產生加鎖方式而產生的死鎖出現,同時底層實現的是 park/unpark 的機制,因此也不會產生先喚醒再掛起的死鎖,一句話就是不會產生死鎖,但是 wait/notify 會產生先喚醒再掛起的死鎖。
最后
對AQS的源碼分析到這里就全部結束了,雖然還有很多知識點沒講解,比如公平鎖/非公平鎖下AQS是怎么作用的,篇幅所限,部分知識點沒有擴展還請見諒,盡管如此,如果您能看完文章的話,相信對AQS也算是有足夠的了解了。
回顧本篇文章,我們不難發現,無論是獨占還是共享模式,或者結合是Condition工具使用,AQS本質上的同步功能都是通過對鎖和隊列中結點的操作來實現的,從設計上講,AQS的組成結構並不算復雜,底層的運轉機制也不會很繞,所以,大家如果看源碼的時候覺得有些困難的話也不用灰心,多看幾遍,順便畫個圖之類的,理清下流程還是沒什么問題的。
當然,自己看得懂是一回事,寫出來讓別人看懂又是另一回事了,就像這篇文章,我花了好長的時間來准備,又是畫圖又是理流程的,期間還參考了不少網上大神的博文,肝了幾天才算是成文了。雖然我知道本文不算什么高質文,但我也算是費盡心力了,寫技術文真是挺累的,大家看的覺得不錯的話還請幫忙轉發下或點個贊吧!這也是對我最好的鼓勵了
作者:鄙人薛某,一個不拘於技術的互聯網人,技術三流,吹水一流,想看更多精彩文章可以關注我的公眾號哦~~~