在實習的時候,需要對公司內部的分布式框架(RPC框架)進行拓展。在閱讀該RPC框架源碼的時候,發現該框架中較多地方使用了自增原子類,而原子類又是基於AQS實現,在秋招之前閱讀過AQS框架,但是都是粗粗的閱讀了一些博客,並沒有對源碼進行閱讀。如今,趁着過年有時間對AQS源碼進行梳理。
1. 原理簡介
2. 部分Node類分析
根據原理可知道,AQS是一個線程同步工具,其主要作用是內部維持了一個雙向隊列,以及一個狀態,如果沒有獲取到狀態,那么該線程則會被加入等待隊列。而這個隊列中的節點(Node)則是AQS內部實現的類,其主要的屬性如下:
static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node(); static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; // 等待狀態(是否超時等) volatile AbstractQueuedSynchronizer.Node prev; // 指向前一個節點 volatile AbstractQueuedSynchronizer.Node next; // 指向后一個節點 volatile Thread thread; // 獲取線程信息 AbstractQueuedSynchronizer.Node nextWaiter; private static final VarHandle NEXT; // VarHandle為本地同步類 private static final VarHandle PREV; private static final VarHandle THREAD; private static final VarHandle WAITSTATUS;
注:在java9 之后均使用VarHandle來實現CAS操作,代替了原先的Unsafe類,其好處是屏蔽了內存模型,使得可以適應不同的系統。
框架意義
只需要根據AQS提供的工具實現排斥鎖和共享鎖即可,而無須關注狀態是否獲取成功,是否需要排隊,喚醒等操作。
框架原理
1. 屬性
首先,AQS框架是為了設計鎖而存在的,而鎖的就是對狀態的獲取。在AQS中通過private volatile int state來表示鎖的狀態,在獨占鎖時只有0、1兩個狀態,在共享鎖時大於等於0表示鎖的狀態。其次,我們說過AQS解決了解決了線程等待、排隊以及喚醒的問題,這一過程是通過雙向鏈表來完成的。所以在AQS中有head以及tail兩個節點。AQS的基礎原理是基於CAS來完成狀態獲取操作(無論是獲取排他鎖還是共享鎖),CAS操作又是基於VarHandle來完成,所以在類屬性中存在
private static final VarHandle STATE; // 操作狀態 private static final VarHandle HEAD; // 操作頭節點 private static final VarHandle TAIL; // 操作尾節點
(對於這三個狀態的操作采用了三個工具,本身也是為了互不干擾,如果是采用一個VarHandle變量,會影響框架的效率。比方說:在A線程需要通過VarHandle設置頭節點,此時B線程需要通過VarHandle設置尾節點,需要等待A線程操作結束之后才行。當然以上是我的猜測。)
具體的屬性列表如下:
private static final long serialVersionUID = 7373984972572414691L; private transient volatile AbstractQueuedSynchronizer.Node head; private transient volatile AbstractQueuedSynchronizer.Node tail; private volatile int state; static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; private static final VarHandle STATE; private static final VarHandle HEAD; private static final VarHandle TAIL;
2. 方法
protected final boolean compareAndSetState(int expect, int update) { return STATE.compareAndSet(this, expect, update); }
上面一段代碼沒啥可以說的,就是利用STATE完成CAS操作。
private AbstractQueuedSynchronizer.Node enq(AbstractQueuedSynchronizer.Node node) { while(true) { AbstractQueuedSynchronizer.Node oldTail = this.tail; // 獲取尾節點 if (oldTail != null) { node.setPrevRelaxed(oldTail); // 將node的前一個節點設置為oldTail節點 if (this.compareAndSetTail(oldTail, node)) { oldTail.next = node; return oldTail; } } else { this.initializeSyncQueue(); } } }
// 下面就是setPrevRelaxed方法的簽名
final void setPrevRelaxed(AbstractQueuedSynchronizer.Node p) {
PREV.set(this, p);
}
enq方法的主要作用就是通過CAS+自旋完成尾節點的插入。
private AbstractQueuedSynchronizer.Node addWaiter(AbstractQueuedSynchronizer.Node mode) { AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(mode); AbstractQueuedSynchronizer.Node oldTail; do { while(true) { oldTail = this.tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); // node節點的前向指針指向當前尾節點 break; } this.initializeSyncQueue(); // 更新 } } while(!this.compareAndSetTail(oldTail, node)); oldTail.next = node; // 尾節點的后向指針指向node節點 return node; }
addWaiter方法的主要作用就是CAS + 自旋完成節點的添加,與enq方法的不同時,這里的插入是形成了雙向鏈表。
private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) { int ws = node.waitStatus; if (ws < 0) { node.compareAndSetWaitStatus(ws, 0); } AbstractQueuedSynchronizer.Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for(AbstractQueuedSynchronizer.Node p = this.tail; p != node && p != null; p = p.prev) { if (p.waitStatus <= 0) { s = p; } } } if (s != null) { LockSupport.unpark(s.thread); } }
unparkSuccessor方法的主要任務是:喚醒其他等待鎖的節點。
其主要流程是:
1. 如果當前節點的狀態為取消狀態,則對其進行初始化。
2. 拿到node后面的一個節點。
3. 如果后邊的節點為空或者被取消(waitStatus > 0 則表明節點已經被取消)
4. 開始從尾部開始進行迭代。原因是:節點被阻塞的時候,是在 acquireQueued 方法里面被阻塞的,喚醒時也一定會在 acquireQueued 方法里面被喚醒,喚醒之后的條件是,判斷當前節點的前置節點是否是頭節點,這里是判斷當前節點的前置節點,所以這里必須使用從尾到頭的迭代順序才行,目的就是為了過濾掉無效的前置節點,不然節點被喚醒時,發現其前置節點還是無效節點,就又會陷入阻塞。
條件隊列
主要是因為並不是所有場景一個同步隊列就可以搞定的,在遇到鎖 + 隊列結合的場景時,就需要 Lock + Condition 配合才行,先使用 Lock 來決定哪些線程可以獲得鎖,哪些線程需要到同步隊列里面排隊阻塞;獲得鎖的多個線程在碰到隊列滿或者空的時候,可以使用 Condition 來管理這些線程,讓這些線程阻塞等待,然后在合適的時機后,被正常喚醒。
說白了就是同步隊列的預備隊列。在同步隊列滿了之后,阻塞的線程無法進入同步隊列,這時候會進入條件隊列(代表獲得了搶占鎖的機會)。同步隊列是負責互斥,也就是如果沒有獲取鎖就等着,等待獲取鎖的那一刻。而條件隊列是告訴你什么時候可以去等待獲取鎖。類似於銀行排隊,一個窗口只能負責一個人,但是一個窗口有很多人在排隊,如果排隊的人過多,讓你先不要排隊,先去等候區坐着,等到了滿足條件的時候,大堂經理會告訴你要准備了(可以進入排隊)。