在實習的時候,需要對公司內部的分布式框架(RPC框架)進行拓展。在閱讀該RPC框架源碼的時候,發現該框架中較多地方使用了自增原子類,而原子類又是基於AQS實現,在秋招之前閱讀過AQS框架,但是都是粗粗的閱讀了一些博客,並沒有對源碼進行閱讀。如今,趁着過年有時間對AQS源碼進行梳理。
1. 原理簡介
2. 部分Node類分析
根據原理可知道,AQS是一個線程同步工具,其主要作用是內部維持了一個雙向隊列,以及一個狀態,如果沒有獲取到狀態,那么該線程則會被加入等待隊列。而這個隊列中的節點(Node)則是AQS內部實現的類,其主要的屬性如下:
static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus; // 等待狀態(是否超時等)
volatile AbstractQueuedSynchronizer.Node prev; // 指向前一個節點
volatile AbstractQueuedSynchronizer.Node next; // 指向后一個節點
volatile Thread thread; // 獲取線程信息
AbstractQueuedSynchronizer.Node nextWaiter;
private static final VarHandle NEXT; // VarHandle為本地同步類
private static final VarHandle PREV;
private static final VarHandle THREAD;
private static final VarHandle WAITSTATUS;
注:在java9 之后均使用VarHandle來實現CAS操作,代替了原先的Unsafe類,其好處是屏蔽了內存模型,使得可以適應不同的系統。
框架意義
只需要根據AQS提供的工具實現排斥鎖和共享鎖即可,而無須關注狀態是否獲取成功,是否需要排隊,喚醒等操作。
框架原理
1. 屬性
首先,AQS框架是為了設計鎖而存在的,而鎖的就是對狀態的獲取。在AQS中通過private volatile int state來表示鎖的狀態,在獨占鎖時只有0、1兩個狀態,在共享鎖時大於等於0表示鎖的狀態。其次,我們說過AQS解決了解決了線程等待、排隊以及喚醒的問題,這一過程是通過雙向鏈表來完成的。所以在AQS中有head以及tail兩個節點。AQS的基礎原理是基於CAS來完成狀態獲取操作(無論是獲取排他鎖還是共享鎖),CAS操作又是基於VarHandle來完成,所以在類屬性中存在
private static final VarHandle STATE; // 操作狀態
private static final VarHandle HEAD; // 操作頭節點
private static final VarHandle TAIL; // 操作尾節點
(對於這三個狀態的操作采用了三個工具,本身也是為了互不干擾,如果是采用一個VarHandle變量,會影響框架的效率。比方說:在A線程需要通過VarHandle設置頭節點,此時B線程需要通過VarHandle設置尾節點,需要等待A線程操作結束之后才行。當然以上是我的猜測。)
具體的屬性列表如下:
private static final long serialVersionUID = 7373984972572414691L;
private transient volatile AbstractQueuedSynchronizer.Node head;
private transient volatile AbstractQueuedSynchronizer.Node tail;
private volatile int state;
static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
private static final VarHandle STATE;
private static final VarHandle HEAD;
private static final VarHandle TAIL;
2. 方法
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
上面一段代碼沒啥可以說的,就是利用STATE完成CAS操作。
private AbstractQueuedSynchronizer.Node enq(AbstractQueuedSynchronizer.Node node) {
while(true) {
AbstractQueuedSynchronizer.Node oldTail = this.tail; // 獲取尾節點
if (oldTail != null) {
node.setPrevRelaxed(oldTail); // 將node的前一個節點設置為oldTail節點
if (this.compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return oldTail;
}
} else {
this.initializeSyncQueue();
}
}
}
// 下面就是setPrevRelaxed方法的簽名
final void setPrevRelaxed(AbstractQueuedSynchronizer.Node p) {
PREV.set(this, p);
}
enq方法的主要作用就是通過CAS+自旋完成尾節點的插入。
private AbstractQueuedSynchronizer.Node addWaiter(AbstractQueuedSynchronizer.Node mode) {
AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(mode);
AbstractQueuedSynchronizer.Node oldTail;
do {
while(true) {
oldTail = this.tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail); // node節點的前向指針指向當前尾節點
break;
}
this.initializeSyncQueue(); // 更新
}
} while(!this.compareAndSetTail(oldTail, node));
oldTail.next = node; // 尾節點的后向指針指向node節點
return node;
}
addWaiter方法的主要作用就是CAS + 自旋完成節點的添加,與enq方法的不同時,這里的插入是形成了雙向鏈表。
private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
int ws = node.waitStatus;
if (ws < 0) {
node.compareAndSetWaitStatus(ws, 0);
}
AbstractQueuedSynchronizer.Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for(AbstractQueuedSynchronizer.Node p = this.tail; p != node && p != null; p = p.prev) {
if (p.waitStatus <= 0) {
s = p;
}
}
}
if (s != null) {
LockSupport.unpark(s.thread);
}
}
unparkSuccessor方法的主要任務是:喚醒其他等待鎖的節點。
其主要流程是:
1. 如果當前節點的狀態為取消狀態,則對其進行初始化。
2. 拿到node后面的一個節點。
3. 如果后邊的節點為空或者被取消(waitStatus > 0 則表明節點已經被取消)
4. 開始從尾部開始進行迭代。原因是:節點被阻塞的時候,是在 acquireQueued 方法里面被阻塞的,喚醒時也一定會在 acquireQueued 方法里面被喚醒,喚醒之后的條件是,判斷當前節點的前置節點是否是頭節點,這里是判斷當前節點的前置節點,所以這里必須使用從尾到頭的迭代順序才行,目的就是為了過濾掉無效的前置節點,不然節點被喚醒時,發現其前置節點還是無效節點,就又會陷入阻塞。
條件隊列
主要是因為並不是所有場景一個同步隊列就可以搞定的,在遇到鎖 + 隊列結合的場景時,就需要 Lock + Condition 配合才行,先使用 Lock 來決定哪些線程可以獲得鎖,哪些線程需要到同步隊列里面排隊阻塞;獲得鎖的多個線程在碰到隊列滿或者空的時候,可以使用 Condition 來管理這些線程,讓這些線程阻塞等待,然后在合適的時機后,被正常喚醒。
說白了就是同步隊列的預備隊列。在同步隊列滿了之后,阻塞的線程無法進入同步隊列,這時候會進入條件隊列(代表獲得了搶占鎖的機會)。同步隊列是負責互斥,也就是如果沒有獲取鎖就等着,等待獲取鎖的那一刻。而條件隊列是告訴你什么時候可以去等待獲取鎖。類似於銀行排隊,一個窗口只能負責一個人,但是一個窗口有很多人在排隊,如果排隊的人過多,讓你先不要排隊,先去等候區坐着,等到了滿足條件的時候,大堂經理會告訴你要准備了(可以進入排隊)。
