AQS框架的理解


在實習的時候,需要對公司內部的分布式框架(RPC框架)進行拓展。在閱讀該RPC框架源碼的時候,發現該框架中較多地方使用了自增原子類,而原子類又是基於AQS實現,在秋招之前閱讀過AQS框架,但是都是粗粗的閱讀了一些博客,並沒有對源碼進行閱讀。如今,趁着過年有時間對AQS源碼進行梳理。

1. 原理簡介

 

2. 部分Node類分析

根據原理可知道,AQS是一個線程同步工具,其主要作用是內部維持了一個雙向隊列,以及一個狀態,如果沒有獲取到狀態,那么該線程則會被加入等待隊列。而這個隊列中的節點(Node)則是AQS內部實現的類,其主要的屬性如下:

        static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
        static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;
        static final int CANCELLED = 1; 
        static final int SIGNAL = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        volatile int waitStatus; // 等待狀態(是否超時等)
        volatile AbstractQueuedSynchronizer.Node prev; // 指向前一個節點
        volatile AbstractQueuedSynchronizer.Node next; // 指向后一個節點
        volatile Thread thread; // 獲取線程信息
        AbstractQueuedSynchronizer.Node nextWaiter; 
        private static final VarHandle NEXT; // VarHandle為本地同步類
        private static final VarHandle PREV;
        private static final VarHandle THREAD;
        private static final VarHandle WAITSTATUS;

  注:在java9 之后均使用VarHandle來實現CAS操作,代替了原先的Unsafe類,其好處是屏蔽了內存模型,使得可以適應不同的系統。

 

框架意義

只需要根據AQS提供的工具實現排斥鎖和共享鎖即可,而無須關注狀態是否獲取成功,是否需要排隊,喚醒等操作。

框架原理

1. 屬性

首先,AQS框架是為了設計鎖而存在的,而鎖的就是對狀態的獲取。在AQS中通過private volatile int state來表示鎖的狀態,在獨占鎖時只有0、1兩個狀態,在共享鎖時大於等於0表示鎖的狀態。其次,我們說過AQS解決了解決了線程等待、排隊以及喚醒的問題,這一過程是通過雙向鏈表來完成的。所以在AQS中有head以及tail兩個節點。AQS的基礎原理是基於CAS來完成狀態獲取操作(無論是獲取排他鎖還是共享鎖),CAS操作又是基於VarHandle來完成,所以在類屬性中存在

    private static final VarHandle STATE; // 操作狀態
    private static final VarHandle HEAD;  // 操作頭節點
    private static final VarHandle TAIL;  // 操作尾節點

 (對於這三個狀態的操作采用了三個工具,本身也是為了互不干擾,如果是采用一個VarHandle變量,會影響框架的效率。比方說:在A線程需要通過VarHandle設置頭節點,此時B線程需要通過VarHandle設置尾節點,需要等待A線程操作結束之后才行。當然以上是我的猜測。)

具體的屬性列表如下:

    private static final long serialVersionUID = 7373984972572414691L;
    private transient volatile AbstractQueuedSynchronizer.Node head;
    private transient volatile AbstractQueuedSynchronizer.Node tail;
    private volatile int state;
    static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
    private static final VarHandle STATE;
    private static final VarHandle HEAD;
    private static final VarHandle TAIL;

2. 方法

    protected final boolean compareAndSetState(int expect, int update) {
        return STATE.compareAndSet(this, expect, update);
    }

上面一段代碼沒啥可以說的,就是利用STATE完成CAS操作。

private AbstractQueuedSynchronizer.Node enq(AbstractQueuedSynchronizer.Node node) {
        while(true) {
            AbstractQueuedSynchronizer.Node oldTail = this.tail; // 獲取尾節點
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail); // 將node的前一個節點設置為oldTail節點
                if (this.compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return oldTail;
                }
            } else {
                this.initializeSyncQueue();
            }
        }
    }
// 下面就是setPrevRelaxed方法的簽名
final void setPrevRelaxed(AbstractQueuedSynchronizer.Node p) {
PREV.set(this, p);
}

enq方法的主要作用就是通過CAS+自旋完成尾節點的插入。

    private AbstractQueuedSynchronizer.Node addWaiter(AbstractQueuedSynchronizer.Node mode) {
        AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(mode);

        AbstractQueuedSynchronizer.Node oldTail;
        do {
            while(true) {
                oldTail = this.tail;
                if (oldTail != null) {
                    node.setPrevRelaxed(oldTail); // node節點的前向指針指向當前尾節點
                    break;
                }

                this.initializeSyncQueue();   // 更新
            }
        } while(!this.compareAndSetTail(oldTail, node));

        oldTail.next = node;  // 尾節點的后向指針指向node節點
        return node;
    }

  addWaiter方法的主要作用就是CAS + 自旋完成節點的添加,與enq方法的不同時,這里的插入是形成了雙向鏈表。

    private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
        int ws = node.waitStatus;
        if (ws < 0) {
            node.compareAndSetWaitStatus(ws, 0);
        }

        AbstractQueuedSynchronizer.Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;

            for(AbstractQueuedSynchronizer.Node p = this.tail; p != node && p != null; p = p.prev) {
                if (p.waitStatus <= 0) {
                    s = p;
                }
            }
        }

        if (s != null) {
            LockSupport.unpark(s.thread);
        }

    }

unparkSuccessor方法的主要任務是:喚醒其他等待鎖的節點。

其主要流程是:

1. 如果當前節點的狀態為取消狀態,則對其進行初始化。

2. 拿到node后面的一個節點。

3. 如果后邊的節點為空或者被取消(waitStatus > 0 則表明節點已經被取消)

4.  開始從尾部開始進行迭代。原因是:節點被阻塞的時候,是在 acquireQueued 方法里面被阻塞的,喚醒時也一定會在 acquireQueued 方法里面被喚醒,喚醒之后的條件是,判斷當前節點的前置節點是否是頭節點,這里是判斷當前節點的前置節點,所以這里必須使用從尾到頭的迭代順序才行,目的就是為了過濾掉無效的前置節點,不然節點被喚醒時,發現其前置節點還是無效節點,就又會陷入阻塞。

 

條件隊列

主要是因為並不是所有場景一個同步隊列就可以搞定的,在遇到鎖 + 隊列結合的場景時,就需要 Lock + Condition 配合才行,先使用 Lock 來決定哪些線程可以獲得鎖,哪些線程需要到同步隊列里面排隊阻塞;獲得鎖的多個線程在碰到隊列滿或者空的時候,可以使用 Condition 來管理這些線程,讓這些線程阻塞等待,然后在合適的時機后,被正常喚醒。

說白了就是同步隊列的預備隊列。在同步隊列滿了之后,阻塞的線程無法進入同步隊列,這時候會進入條件隊列(代表獲得了搶占鎖的機會)。同步隊列是負責互斥,也就是如果沒有獲取鎖就等着,等待獲取鎖的那一刻。而條件隊列是告訴你什么時候可以去等待獲取鎖。類似於銀行排隊,一個窗口只能負責一個人,但是一個窗口有很多人在排隊,如果排隊的人過多,讓你先不要排隊,先去等候區坐着,等到了滿足條件的時候,大堂經理會告訴你要准備了(可以進入排隊)。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM