每一個Java工程師應該都或多或少了解過AQS,我自己也是前前后后,反反復復研究了很久,看了忘,忘了再看,每次都有不一樣的體會。這次趁着寫博客,打算重新拿出來系統的研究下它的源碼,總結成文章,便於以后復習。
原文地址:http://www.jianshu.com/p/71449a7d01af
AbstractQueuedSynchronizer(以下簡稱AQS)作為java.util.concurrent包的基礎,它提供了一套完整的同步編程框架,開發人員只需要實現其中幾個簡單的方法就能自由的使用諸如獨占,共享,條件隊列等多種同步模式。我們常用的比如ReentrantLock,CountDownLatch等等基礎類庫都是基於AQS實現的,足以說明這套框架的強大之處。鑒於此,我們開發人員更應該了解它的實現原理,這樣才能在使用過程中得心應手。
總體來說個人感覺AQS的代碼非常難懂,本文就其中的獨占鎖實現原理進行分析。
一、執行過程概述
首先先從整體流程入手,了解下AQS獨占鎖的執行邏輯,然后再一步一步深入分析源碼。
獲取鎖的過程:
- 當線程調用acquire()申請獲取鎖資源,如果成功,則進入臨界區。
- 當獲取鎖失敗時,則進入一個FIFO等待隊列,然后被掛起等待喚醒。
- 當隊列中的等待線程被喚醒以后就重新嘗試獲取鎖資源,如果成功則進入臨界區,否則繼續掛起等待。
釋放鎖過程:
- 當線程調用release()進行鎖資源釋放時,如果沒有其他線程在等待鎖資源,則釋放完成。
- 如果隊列中有其他等待鎖資源的線程需要喚醒,則喚醒隊列中的第一個等待節點(先入先出)。
二、源碼深入分析
基於上面所講的獨占鎖獲取釋放的大致過程,我們再來看下源碼實現邏輯:
首先來看下獲取鎖的方法acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
代碼雖然短,但包含的邏輯卻很多,一步一步看下:
- 首先是調用開發人員自己實現的tryAcquire() 方法嘗試獲取鎖資源,如果成功則整個acquire()方法執行完畢,即當前線程獲得鎖資源,可以進入臨界區。
- 如果獲取鎖失敗,則開始進入后面的邏輯,首先是addWaiter(Node.EXCLUSIVE)方法。來看下這個方法的源碼實現:
//注意:該入隊方法的返回值就是新創建的節點
private Node addWaiter(Node mode) {
//基於當前線程,節點類型(Node.EXCLUSIVE)創建新的節點
//由於這里是獨占模式,因此節點類型就是Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//這里為了提搞性能,首先執行一次快速入隊操作,即直接嘗試將新節點加入隊尾
if (pred != null) {
node.prev = pred;
//這里根據CAS的邏輯,即使並發操作也只能有一個線程成功並返回,其余的都要執行后面的入隊操作。即enq()方法
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//完整的入隊操作
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果隊列還沒有初始化,則進行初始化,即創建一個空的頭節點
if (t == null) {
//同樣是CAS,只有一個線程可以初始化頭結點成功,其余的都要重復執行循環體
if (compareAndSetHead(new Node()))
tail = head;
} else {
//新創建的節點指向隊列尾節點,毫無疑問並發情況下這里會有多個新創建的節點指向隊列尾節點
node.prev = t;
//基於這一步的CAS,不管前一步有多少新節點都指向了尾節點,這一步只有一個能真正入隊成功,其他的都必須重新執行循環體
if (compareAndSetTail(t, node)) {
t.next = node;
//該循環體唯一退出的操作,就是入隊成功(否則就要無限重試)
return t;
}
}
}
}
上面的入隊操作有兩點需要說明:
一、初始化隊列的觸發條件就是當前已經有線程占有了鎖資源,因此上面創建的空的頭節點可以認為就是當前占有鎖資源的節點(雖然它並沒有設置任何屬性)。
二、注意整個代碼是處在一個死循環中,知道入隊成功。如果失敗了就會不斷進行重試。
- 經過上面的操作,我們申請獲取鎖的線程已經成功加入了等待隊列,通過文章最一開始說的獨占鎖獲取流程,那么節點現在要做的就是掛起當前線程,等待被喚醒,這個邏輯是怎么實現的呢?來看下源碼:
通過上面的分析,該方法入參node就是剛入隊的包含當前線程信息的節點
final boolean acquireQueued(final Node node, int arg) {
//鎖資源獲取失敗標記位
boolean failed = true;
try {
//等待線程被中斷標記位
boolean interrupted = false;
//這個循環體執行的時機包括新節點入隊和隊列中等待節點被喚醒兩個地方
for (;;) {
//獲取當前節點的前置節點
final Node p = node.predecessor();
//如果前置節點就是頭結點,則嘗試獲取鎖資源
if (p == head && tryAcquire(arg)) {
//當前節點獲得鎖資源以后設置為頭節點,這里繼續理解我上面說的那句話
//頭結點就表示當前正占有鎖資源的節點
setHead(node);
p.next = null; //幫助GC
//表示鎖資源成功獲取,因此把failed置為false
failed = false;
//返回中斷標記,表示當前節點是被正常喚醒還是被中斷喚醒
return interrupted;
}
如果沒有獲取鎖成功,則進入掛起邏輯
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//最后會分析獲取鎖失敗處理邏輯
if (failed)
cancelAcquire(node);
}
}
掛起邏輯是很重要的邏輯,這里拿出來單獨分析一下,首先要注意目前為止,我們只是根據當前線程,節點類型創建了一個節點並加入隊列中,其他屬性都是默認值。
//首先說明一下參數,node是當前線程的節點,pred是它的前置節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取前置節點的waitStatus
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果前置節點的waitStatus是Node.SIGNAL則返回true,然后會執行parkAndCheckInterrupt()方法進行掛起
return true;
if (ws > 0) {
//由waitStatus的幾個取值可以判斷這里表示前置節點被取消
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//這里我們由當前節點的前置節點開始,一直向前找最近的一個沒有被取消的節點
//注,由於頭結點head是通過new Node()創建,它的waitStatus為0,因此這里不會出現空指針問題,也就是說最多就是找到頭節點上面的循環就退出了
pred.next = node;
} else {
//根據waitStatus的取值限定,這里waitStatus的值只能是0或者PROPAGATE,那么我們把前置節點的waitStatus設為Node.SIGNAL然后重新進入該方法進行判斷
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
上面這個方法邏輯比較復雜,它是用來判斷當前節點是否可以被掛起,也就是喚醒條件是否已經具備,即如果掛起了,那一定是可以由其他線程來喚醒的。該方法如果返回false,即掛起條件沒有完備,那就會重新執行acquireQueued方法的循環體,進行重新判斷,如果返回true,那就表示萬事俱備,可以掛起了,就會進入parkAndCheckInterrupt()方法看下源碼:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
//被喚醒之后,返回中斷標記,即如果是正常喚醒則返回false,如果是由於中斷醒來,就返回true
return Thread.interrupted();
}
看acquireQueued方法中的源碼,如果是因為中斷醒來,那么就把中斷標記置為true。不管是正常被喚醒還是由與中斷醒來,都會去嘗試獲取鎖資源。如果成功則返回中斷標記,否則繼續掛起等待。
注:Thread.interrupted()方法在返回中斷標記的同時會清除中斷標記,也就是說當由於中斷醒來然后獲取鎖成功,那么整個acquireQueued方法就會返回true表示是因為中斷醒來,但如果中斷醒來以后沒有獲取到鎖,繼續掛起,由於這次的中斷已經被清除了,下次如果是被正常喚醒,那么acquireQueued方法就會返回false,表示沒有中斷。
最后我們回到acquireQueued方法的最后一步,finally模塊。這里是針對鎖資源獲取失敗以后做的一些善后工作,翻看上面的代碼,其實能進入這里的就是tryAcquire()方法拋出異常,也就是說AQS框架針對開發人員自己實現的獲取鎖操作如果拋出異常,也做了妥善的處理,一起來看下源碼:
//傳入的方法參數是當前獲取鎖資源失敗的節點
private void cancelAcquire(Node node) {
// 如果節點不存在則直接忽略
if (node == null)
return;
node.thread = null;
// 跳過所有已經取消的前置節點,跟上面的那段跳轉邏輯類似
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//這個是前置節點的后繼節點,由於上面可能的跳節點的操作,所以這里可不一定就是當前節點,仔細想一下。^_^
Node predNext = pred.next;
//把當前節點waitStatus置為取消,這樣別的節點在處理時就會跳過該節點
node.waitStatus = Node.CANCELLED;
//如果當前是尾節點,則直接刪除,即出隊
//注:這里不用關心CAS失敗,因為即使並發導致失敗,該節點也已經被成功刪除
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
//這里的判斷邏輯很繞,具體就是如果當前節點的前置節點不是頭節點且它后面的節點等待它喚醒(waitStatus小於0),
//再加上如果當前節點的后繼節點沒有被取消就把前置節點跟后置節點進行連接,相當於刪除了當前節點
compareAndSetNext(pred, predNext, next);
} else {
//進入這里,要么當前節點的前置節點是頭結點,要么前置節點的waitStatus是PROPAGATE,直接喚醒當前節點的后繼節點
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
上面就是獨占模式獲取鎖的核心源碼,確實非常難懂,很繞,就這幾個方法需要反反復復看很多遍,才能慢慢理解。
接下來看下釋放鎖的過程:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease()方法是用戶自定義的釋放鎖邏輯,如果成功,就判斷等待隊列中有沒有需要被喚醒的節點(waitStatus為0表示沒有需要被喚醒的節點),一起看下喚醒操作:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
//把標記為設置為0,表示喚醒操作已經開始進行,提高並發環境下性能
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//如果當前節點的后繼節點為null,或者已經被取消
if (s == null || s.waitStatus > 0) {
s = null;
//注意這個循環沒有break,也就是說它是從后往前找,一直找到離當前節點最近的一個等待喚醒的節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//執行喚醒操作
if (s != null)
LockSupport.unpark(s.thread);
}
相比而言,鎖的釋放操作就簡單很多了,代碼也比較少。
三、總結
以上就是AQS獨占鎖的獲取與釋放過程,大致思想很簡單,就是嘗試去獲取鎖,如果失敗就加入一個隊列中掛起。釋放鎖時,如果隊列中有等待的線程就進行喚醒。但如果一步一步看源碼,會發現細節非常多,很多地方很難搞明白,我自己也是反反復復學習很久才有點心得,但也不敢說已經研究通了AQS,甚至不敢說我上面的研究成果就是對的,只是寫篇文章總結一下,跟同行交流交流心得。
除了獨占鎖,后面還會產出AQS一系列的文章,包括共享鎖,條件隊列的實現原理等。