並發編程之CLH同步隊列 出隊入隊詳解


本章重點講解內容如下:

1、什么是CLH同步隊列

2、為什么需要CLH同步隊列

3、CLH同步隊列原理(即隊列如何入隊、出隊)

一 什么是CLH隊列

AbstractQueuedSynchronizer類文件開頭,作者Doug Lea一大篇幅來介紹CLH隊列,大意如下:
  CLH隊列是一個FIFO的雙向鏈表:由head、tail、node中間節點組成,每個Node節點包含:thread、waitStatus、next、pre屬性


  當線程獲取同步狀態失敗后,會將當前線程構造成一個Node節點插入鏈表(如果第一次插入會初始化head節點為虛擬節點),插入鏈表都是尾部插入並且setTail為當前節點,同時會阻塞當前線程(調用LockSupport.park方法)。

 
        

當線程釋放同步狀態后,會喚醒當前節點的next節點,next節點會搶占同步資源,搶占失敗后重新阻塞,成功后next節點會重新setHead為當前線程的節點,將之前的head廢棄。

二 為什么需要CLH隊列

      是為了減少多線程搶占資源造成不必要的cpu上下文切換開銷。通過看AQS源碼我們知道搶占同步器狀態是調用UnSafe.compareAndSwapInt方法,其實底層就是調用的jvm的cas函數。當多個線程同時在cas的時候,最多只能有一個搶占成功,其余的都在自旋,這樣就造成了不必要的cpu開銷。

     若引入CLH隊列隊列,至於pre執行完畢,才喚醒next節點,這樣最多只有next節點和新進入的線程搶占cpu資源,其余的線程都是阻塞狀態,極大的減少了不必要的cpu開銷。

三 CLH隊列原理(如何入隊、出隊)

1)入隊

入隊代碼如下:

 1 //獲取鎖
 2 public final void acquire(int arg) {
 3         //tryAcquire嘗試獲取鎖,Semaphore、coutDownLatch等各個工具類實現不一致
 4         if (!tryAcquire(arg) &&
 5             //acquireQueued:tryAcquire成功就setHead為當前節點,失敗則阻塞當前線程
 6             //addWaiter加入同步等待隊列
 7             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 8             selfInterrupt();
 9 }
10 //加入等待隊列
11 private Node addWaiter(Node mode) {
12         Node node = new Node(Thread.currentThread(), mode);
13         // Try the fast path of enq; backup to full enq on failure
14         // 非首次插入,可直接setTail
15         // 設置老的tail為當天tail的pre節點
16         Node pred = tail;
17         if (pred != null) {
18             node.prev = pred;
19             if (compareAndSetTail(pred, node)) {
20                 pred.next = node;
21                 return node;
22             }
23         }
24         //首次插入,需要創建虛擬的head節點
25         enq(node);
26         return node;
27 }
28 private Node enq(final Node node) {
29     for (;;) {
30         Node t = tail;
31         // 如果 tail 是 null,就創建一個虛擬節點,同時指向 head 和 tail,稱為 初始化。
32         if (t == null) { // Must initialize
33             if (compareAndSetHead(new Node()))
34                 tail = head;
35         } else {// 如果不是 null
36             // 和 上個方法邏輯一樣,將新節點追加到 tail 節點后面,並更新隊列的 tail 為新節點。
37             // 只不過這里是死循環的,失敗了還可以再來 。
38             node.prev = t;
39             if (compareAndSetTail(t, node)) {
40                 t.next = node;
41                 return t;
42             }
43         }
44     }
45 }
View Code

阻塞當前線程代碼如下:

 1 // 這里返回的節點是新創建的節點,arg 是請求的數量
 2 final boolean acquireQueued(final Node node, int arg) {
 3     boolean failed = true;
 4     try {
 5         boolean interrupted = false;
 6         for (;;) {
 7             // 找上一個節點
 8             final Node p = node.predecessor();
 9             // 如果上一個節點是 head ,就嘗試獲取鎖
10             // 如果 獲取成功,就將當前節點設置為 head,注意 head 節點是永遠不會喚醒的。
11             if (p == head && tryAcquire(arg)) {
12                 setHead(node);
13                 p.next = null; // help GC
14                 failed = false;
15                 return interrupted;
16             }
17             // 在獲取鎖失敗后,就需要阻塞了。
18             // shouldParkAfterFailedAcquire ---> 檢查上一個節點的狀態,如果是 SIGNAL 就阻塞,否則就改成 SIGNAL。
19             if (shouldParkAfterFailedAcquire(p, node) &&
20                 //parkAndCheckInterrupt就是調用park方法阻塞當前線程,等待被喚起后,重新進入當前自旋操作,可重新獲取鎖
21                 parkAndCheckInterrupt())
22                 interrupted = true;
23         }
24     } finally {
25         if (failed)
26             cancelAcquire(node);
27     }
28 }
View Code

 

2)出隊

 1 //釋放當前線程
 2 public final boolean release(int arg) {
 3         //實際操作就是cas把AQS的state狀態-1
 4         if (tryRelease(arg)) {
 5             Node h = head;
 6             if (h != null && h.waitStatus != 0)
 7                 //核心方法,見后面詳解
 8                 unparkSuccessor(h);
 9             return true;
10         }
11         return false;
12 }
13 
14 //釋放鎖核心方法
15 private void unparkSuccessor(Node node) {
16     int ws = node.waitStatus;
17     if (ws < 0)
18         // 將 head 節點的 ws 改成 0,清除信號。表示,他已經釋放過了。不能重復釋放。
19         compareAndSetWaitStatus(node, ws, 0);
20 
21     Node s = node.next;
22     // 如果 next 是 null,或者 next 被取消了。就從 tail 開始向上找節點。
23     if (s == null || s.waitStatus > 0) {
24         s = null;
25         // 從尾部開始,向前尋找未被取消的節點,直到這個節點是 null,或者是 head。
26         // 也就是說,如果 head 的 next 是 null,那么就從尾部開始尋找,直到不是 null 為止,找到這個 head 就不管了。
27         // 如果是 head 的 next 不是 null,但是被取消了,那這個節點也會被略過。
28         for (Node t = tail; t != null && t != node; t = t.prev)
29             if (t.waitStatus <= 0)
30                 s = t;
31     }
32     // 喚醒 head.next 這個節點。
33     // 通常這個節點是 head 的 next。
34     // 但如果 head.next 被取消了,就會從尾部開始找。
35     if (s != null)
36         LockSupport.unpark(s.thread);
37 }
View Code

 

3)如何聯動起來(即如何按FIFO順序喚醒隊列)

調用Lock.release()方法會觸發unparkSuccessor()--> LockSupport.unpark(node.next)

喚醒next節點,next節點會繼續在acquireQueued()方法里自旋,若tryAcquire()成功,執行setHead(node)方法,把CLH隊列head設置為當前節點,然后等待當前節點執行邏輯完畢再次調用release方法,重復執行上述邏輯。

若獲取鎖失敗,繼續進入阻塞狀態,等待下一次被喚醒。

 

 

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM