【Java並發編程實戰】----- AQS(四):CLH同步隊列


【Java並發編程實戰】—–“J.U.C”:CLH隊列鎖提過,AQS里面的CLH隊列是CLH同步鎖的一種變形。其主要從兩方面進行了改造:節點的結構與節點等待機制。在結構上引入了頭結點和尾節點,他們分別指向隊列的頭和尾,嘗試獲取鎖、入隊列、釋放鎖等實現都與頭尾節點相關,並且每個節點都引入前驅節點和后后續節點的引用;在等待機制上由原來的自旋改成阻塞喚醒。其結構如下:

2015121100001

知道其結構了,我們再看看他的實現。在線程獲取鎖時會調用AQS的acquire()方法,該方法第一次嘗試獲取鎖如果失敗,會將該線程加入到CLH隊列中:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

addWaiter:

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

這是addWaiter()的實現,在厘清這段代碼之前我們要先看一個更重要的東東,Node,CLH隊列的節點。其源碼如下:

static final class Node {
            /** 線程已被取消 */
            static final int CANCELLED =  1;
            
            /** 當前線程的后繼線程需要被unpark(喚醒) */
            static final int SIGNAL    = -1;
            
            /** 線程(處在Condition休眠狀態)在等待Condition喚醒 */
            static final int CONDITION = -2;
            
            /** 共享鎖 */
            static final Node SHARED = new Node();
            /** 獨占鎖  */
            static final Node EXCLUSIVE = null;

            volatile int waitStatus;

            /** 前繼節點 */
            volatile Node prev;

            /** 后繼節點 */
            volatile Node next;
            
            volatile Thread thread;

            Node nextWaiter;

            final boolean isShared() {
                return nextWaiter == SHARED;
            }

           /** 獲取前繼節點 */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }

            /**
             * 三個構造函數
             */
            Node() { 
            }

            Node(Thread thread, Node mode) {   
                this.nextWaiter = mode;
                this.thread = thread;
            }

            Node(Thread thread, int waitStatus) { 
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }

在這個源代碼中有三個值(CANCELLED、SIGNAL、CONDITION)要特別注意,前面提到過CLH隊列的節點都有一個狀態位,該狀態位與線程狀態密切相關:

CANCELLED =  1:因為超時或者中斷,節點會被設置為取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變為其他狀態;

SIGNAL    = -1:其后繼節點已經被阻塞了,到時需要進行喚醒操作;

CONDITION = -2:表示這個結點在條件隊列中,因為等待某個條件而被阻塞;

0:新建節點一般都為0。

入列

在線程嘗試獲取鎖的時候,如果失敗了需要將該線程加入到CLH隊列,入列中的主要流程是:tail執行新建node,然后將node的后繼節點指向舊tail值。注意在這個過程中有一個CAS操作,采用自旋方式直到成功為止。其代碼如下:

for(;;){
            Node t = tail;
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
其實這段代碼在enq()方法中存在。

出列

當線程是否鎖時,需要進行“出列”,出列的主要工作則是喚醒其后繼節點(一般來說就是head節點),讓所有線程有序地進行下去:

Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;

 

取消

線程因為超時或者中斷涉及到取消的操作,如果某個節點被取消了,那個該節點就不會參與到鎖競爭當中,它會等待GC回收。取消的主要過程是將取消狀態的節點移除掉,移除的過程還是比較簡單的。先將其狀態設置為CANCELLED,然后將其前驅節點的pred執行其后繼節點,當然這個過程仍然會是一個CAS操作:

node.waitStatus = Node.CANCELLED;
Node pred = node.prev;
Node predNext = pred.next;
Node next = node.next;

 

掛起

我們了解了AQS的CLH隊列相比原始的CLH隊列鎖,它采用了一種變形操作,將自旋機制改為阻塞機制。當前線程將首先檢測是否為頭結點且嘗試獲取鎖,如果當前節點為頭結點並成功獲取鎖則直接返回,當前線程不進入阻塞,否則將當前線程阻塞:

for (;;) {
    if (node.prev == head)
if(嘗試獲取鎖成功){
         head=node;
         node.next=null;
         return;
     }
   阻塞線程
}

 

參考

1、Java並發框架——AQS阻塞隊列管理(二)

2、Java並發框架——AQS阻塞隊列管理(三)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM