在【Java並發編程實戰】—–“J.U.C”:CLH隊列鎖提過,AQS里面的CLH隊列是CLH同步鎖的一種變形。其主要從兩方面進行了改造:節點的結構與節點等待機制。在結構上引入了頭結點和尾節點,他們分別指向隊列的頭和尾,嘗試獲取鎖、入隊列、釋放鎖等實現都與頭尾節點相關,並且每個節點都引入前驅節點和后后續節點的引用;在等待機制上由原來的自旋改成阻塞喚醒。其結構如下:
知道其結構了,我們再看看他的實現。在線程獲取鎖時會調用AQS的acquire()方法,該方法第一次嘗試獲取鎖如果失敗,會將該線程加入到CLH隊列中:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
addWaiter:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
這是addWaiter()的實現,在厘清這段代碼之前我們要先看一個更重要的東東,Node,CLH隊列的節點。其源碼如下:
static final class Node { /** 線程已被取消 */ static final int CANCELLED = 1; /** 當前線程的后繼線程需要被unpark(喚醒) */ static final int SIGNAL = -1; /** 線程(處在Condition休眠狀態)在等待Condition喚醒 */ static final int CONDITION = -2; /** 共享鎖 */ static final Node SHARED = new Node(); /** 獨占鎖 */ static final Node EXCLUSIVE = null; volatile int waitStatus; /** 前繼節點 */ volatile Node prev; /** 后繼節點 */ volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } /** 獲取前繼節點 */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } /** * 三個構造函數 */ Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
在這個源代碼中有三個值(CANCELLED、SIGNAL、CONDITION)要特別注意,前面提到過CLH隊列的節點都有一個狀態位,該狀態位與線程狀態密切相關:
CANCELLED = 1:因為超時或者中斷,節點會被設置為取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變為其他狀態;
SIGNAL = -1:其后繼節點已經被阻塞了,到時需要進行喚醒操作;
CONDITION = -2:表示這個結點在條件隊列中,因為等待某個條件而被阻塞;
0:新建節點一般都為0。
入列
在線程嘗試獲取鎖的時候,如果失敗了需要將該線程加入到CLH隊列,入列中的主要流程是:tail執行新建node,然后將node的后繼節點指向舊tail值。注意在這個過程中有一個CAS操作,采用自旋方式直到成功為止。其代碼如下:
for(;;){ Node t = tail; node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } }
出列
當線程是否鎖時,需要進行“出列”,出列的主要工作則是喚醒其后繼節點(一般來說就是head節點),讓所有線程有序地進行下去:
Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true;
取消
線程因為超時或者中斷涉及到取消的操作,如果某個節點被取消了,那個該節點就不會參與到鎖競爭當中,它會等待GC回收。取消的主要過程是將取消狀態的節點移除掉,移除的過程還是比較簡單的。先將其狀態設置為CANCELLED,然后將其前驅節點的pred執行其后繼節點,當然這個過程仍然會是一個CAS操作:
node.waitStatus = Node.CANCELLED; Node pred = node.prev; Node predNext = pred.next; Node next = node.next;
掛起
我們了解了AQS的CLH隊列相比原始的CLH隊列鎖,它采用了一種變形操作,將自旋機制改為阻塞機制。當前線程將首先檢測是否為頭結點且嘗試獲取鎖,如果當前節點為頭結點並成功獲取鎖則直接返回,當前線程不進入阻塞,否則將當前線程阻塞:
for (;;) { if (node.prev == head) if(嘗試獲取鎖成功){ head=node; node.next=null; return; } 阻塞線程 }
參考

