本章重點講解內容如下:
1、什么是CLH同步隊列
2、為什么需要CLH同步隊列
3、CLH同步隊列原理(即隊列如何入隊、出隊)
一 什么是CLH隊列
AbstractQueuedSynchronizer類文件開頭,作者Doug Lea一大篇幅來介紹CLH隊列,大意如下:
CLH隊列是一個FIFO的雙向鏈表:由head、tail、node中間節點組成,每個Node節點包含:thread、waitStatus、next、pre屬性
當線程獲取同步狀態失敗后,會將當前線程構造成一個Node節點插入鏈表(如果第一次插入會初始化head節點為虛擬節點),插入鏈表都是尾部插入並且setTail為當前節點,同時會阻塞當前線程(調用LockSupport.park方法)。
當線程釋放同步狀態后,會喚醒當前節點的next節點,next節點會搶占同步資源,搶占失敗后重新阻塞,成功后next節點會重新setHead為當前線程的節點,將之前的head廢棄。
二 為什么需要CLH隊列
是為了減少多線程搶占資源造成不必要的cpu上下文切換開銷。通過看AQS源碼我們知道搶占同步器狀態是調用UnSafe.compareAndSwapInt方法,其實底層就是調用的jvm的cas函數。當多個線程同時在cas的時候,最多只能有一個搶占成功,其余的都在自旋,這樣就造成了不必要的cpu開銷。
若引入CLH隊列隊列,至於pre執行完畢,才喚醒next節點,這樣最多只有next節點和新進入的線程搶占cpu資源,其余的線程都是阻塞狀態,極大的減少了不必要的cpu開銷。
三 CLH隊列原理(如何入隊、出隊)
1)入隊
入隊代碼如下:

1 //獲取鎖 2 public final void acquire(int arg) { 3 //tryAcquire嘗試獲取鎖,Semaphore、coutDownLatch等各個工具類實現不一致 4 if (!tryAcquire(arg) && 5 //acquireQueued:tryAcquire成功就setHead為當前節點,失敗則阻塞當前線程 6 //addWaiter加入同步等待隊列 7 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 8 selfInterrupt(); 9 } 10 //加入等待隊列 11 private Node addWaiter(Node mode) { 12 Node node = new Node(Thread.currentThread(), mode); 13 // Try the fast path of enq; backup to full enq on failure 14 // 非首次插入,可直接setTail 15 // 設置老的tail為當天tail的pre節點 16 Node pred = tail; 17 if (pred != null) { 18 node.prev = pred; 19 if (compareAndSetTail(pred, node)) { 20 pred.next = node; 21 return node; 22 } 23 } 24 //首次插入,需要創建虛擬的head節點 25 enq(node); 26 return node; 27 } 28 private Node enq(final Node node) { 29 for (;;) { 30 Node t = tail; 31 // 如果 tail 是 null,就創建一個虛擬節點,同時指向 head 和 tail,稱為 初始化。 32 if (t == null) { // Must initialize 33 if (compareAndSetHead(new Node())) 34 tail = head; 35 } else {// 如果不是 null 36 // 和 上個方法邏輯一樣,將新節點追加到 tail 節點后面,並更新隊列的 tail 為新節點。 37 // 只不過這里是死循環的,失敗了還可以再來 。 38 node.prev = t; 39 if (compareAndSetTail(t, node)) { 40 t.next = node; 41 return t; 42 } 43 } 44 } 45 }
阻塞當前線程代碼如下:

1 // 這里返回的節點是新創建的節點,arg 是請求的數量 2 final boolean acquireQueued(final Node node, int arg) { 3 boolean failed = true; 4 try { 5 boolean interrupted = false; 6 for (;;) { 7 // 找上一個節點 8 final Node p = node.predecessor(); 9 // 如果上一個節點是 head ,就嘗試獲取鎖 10 // 如果 獲取成功,就將當前節點設置為 head,注意 head 節點是永遠不會喚醒的。 11 if (p == head && tryAcquire(arg)) { 12 setHead(node); 13 p.next = null; // help GC 14 failed = false; 15 return interrupted; 16 } 17 // 在獲取鎖失敗后,就需要阻塞了。 18 // shouldParkAfterFailedAcquire ---> 檢查上一個節點的狀態,如果是 SIGNAL 就阻塞,否則就改成 SIGNAL。 19 if (shouldParkAfterFailedAcquire(p, node) && 20 //parkAndCheckInterrupt就是調用park方法阻塞當前線程,等待被喚起后,重新進入當前自旋操作,可重新獲取鎖 21 parkAndCheckInterrupt()) 22 interrupted = true; 23 } 24 } finally { 25 if (failed) 26 cancelAcquire(node); 27 } 28 }
2)出隊

1 //釋放當前線程 2 public final boolean release(int arg) { 3 //實際操作就是cas把AQS的state狀態-1 4 if (tryRelease(arg)) { 5 Node h = head; 6 if (h != null && h.waitStatus != 0) 7 //核心方法,見后面詳解 8 unparkSuccessor(h); 9 return true; 10 } 11 return false; 12 } 13 14 //釋放鎖核心方法 15 private void unparkSuccessor(Node node) { 16 int ws = node.waitStatus; 17 if (ws < 0) 18 // 將 head 節點的 ws 改成 0,清除信號。表示,他已經釋放過了。不能重復釋放。 19 compareAndSetWaitStatus(node, ws, 0); 20 21 Node s = node.next; 22 // 如果 next 是 null,或者 next 被取消了。就從 tail 開始向上找節點。 23 if (s == null || s.waitStatus > 0) { 24 s = null; 25 // 從尾部開始,向前尋找未被取消的節點,直到這個節點是 null,或者是 head。 26 // 也就是說,如果 head 的 next 是 null,那么就從尾部開始尋找,直到不是 null 為止,找到這個 head 就不管了。 27 // 如果是 head 的 next 不是 null,但是被取消了,那這個節點也會被略過。 28 for (Node t = tail; t != null && t != node; t = t.prev) 29 if (t.waitStatus <= 0) 30 s = t; 31 } 32 // 喚醒 head.next 這個節點。 33 // 通常這個節點是 head 的 next。 34 // 但如果 head.next 被取消了,就會從尾部開始找。 35 if (s != null) 36 LockSupport.unpark(s.thread); 37 }
3)如何聯動起來(即如何按FIFO順序喚醒隊列)
調用Lock.release()方法會觸發unparkSuccessor()--> LockSupport.unpark(node.next)
喚醒next節點,next節點會繼續在acquireQueued()方法里自旋,若tryAcquire()成功,執行setHead(node)方法,把CLH隊列head設置為當前節點,然后等待當前節點執行邏輯完畢再次調用release方法,重復執行上述邏輯。
若獲取鎖失敗,繼續進入阻塞狀態,等待下一次被喚醒。