在研究AbstractQueuedSynchronizer的時候我是以ReentrantLock入手的。所以理所當然會設計到一些ReentrantLock的方法。因為網上已經有很多關於AQS的文章了,所以這篇文章不會特別詳細的去記錄類的實現,主要是記錄幾個我覺得需要主要的點。
1、阻塞隊列實現
AbstractQueuedSynchronizer用一個Node隊列來實現線程阻塞。處理當前正在執行的線程,后續的所有的線程都會進入到這個虛擬的CLH隊列。下面該圖是我根據源碼畫的一個鏈表隊列。head是一個空對象,也就是這個對象是沒有thread的,后續的thread都會添加到tail。進入到這個隊列的thread都會被操作系統掛起,等正在執行的thread被釋放后操作系統會喚醒被阻塞的head的next節點的線程,具體的喚醒方法在unparkSuccess函數,下面會有所分析。
代碼的實現思路圖解
為了方便理解,我粗略的用word畫了一個代碼流程圖,包含lock和unlock方法。
鎖的實現分析
當我們跟蹤lock代碼的時候,在隊列第一次創建的時候會執行這個enq函數:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
仔細看下這個函數會發現,第一個節點是默認給的,在代碼第5行,也就是一個空節點new Node()。然后接下來就是把新建的節點插入到tail。
在進入隊列后,還沒被阻塞之前,會進行一次判斷,判斷當前node的prev節點是不是head。為什么不直接判斷當前節點是不是head,以上的圖片已經說明清楚了,head節點的thread是null的,也就是head是默認生成的。為什么要這樣做?這樣做的好處是什么我還暫時還沒想到。如果有網友知道麻煩留言告訴我下哈。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//1
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
但是在上面注釋1中獲取當前node的前置節點。如果p是head的話,那么node的線程會嘗試獲取一次鎖tryAcquire。
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
如果還是獲取不到鎖,那么久當前線程阻塞。實現方法是調用 LockSupport.park,改方法會直接調用操作系統的內置方法來實現線程阻塞:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
2、線程喚醒
當依噶線程unlock后就會釋放鎖,那么之前爭奪鎖的線程都被阻塞掛起了,現在要做的一件事情就是喚醒掛起的線程。當然不是把所有的線程都喚醒,那么它的喚醒規則是怎么樣的呢?顯然因為阻塞鎖是一個FIFO的隊列,所以肯定是從head開始。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
開始的時候我們已經說過head是空的,所以喚醒要從第二個節點開始,看下面Node s = node.next;就知。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
3、Node幾點狀態碼
把線程要包裝為Node對象的主要原因,除了用Node構造供虛擬隊列外,還用Node包裝了各種線程狀態,這些狀態被精心設計為一些數字值:
SIGNAL(-1) :線程的后繼線程正/已被阻塞,當該線程release或cancel時要重新這個后繼線程(unpark)
CANCELLED(1):因為超時或中斷,該線程已經被取消
CONDITION(-2):表明該線程被處於條件隊列,就是因為調用了Condition.await而被阻塞
PROPAGATE(-3):傳播共享鎖
0:0代表無狀態
參考:
http://www.open-open.com/lib/view/open1352431606912.html