AbstractQueuedSynchronizer(以下簡稱AQS)的內容確實有點多,博主考慮再三,還是決定把它拆成三期。原因有三,一是放入同一篇博客勢必影響閱讀體驗,而是為了表達對這個偉大基礎並發組件的崇敬之情。第三點其實是為了偷懶。
又扯這么多沒用的,還是直接步入正題吧~
AQS介紹
AQS是一個抽象類,它是實現多種並發同步工具的核心組件。比如大名鼎鼎的可重入鎖(ReentrantLock),它的底層實現就是借助內部類Sync,而Sync類就是繼承了AQS並實現了AQS定義的若干鈎子方法。這些並發同步工具包括:
ReentrantLock(ReentrantLock可重入鎖—源碼詳解)ReentrantReadWriteLock(全網最詳細的ReentrantReadWriteLock源碼剖析(萬字長文))Semaphore(Semaphore信號量源碼解析)CountDownLatch(CountDownLatch源碼閱讀)
從設計模式上來看,AQS主要使用的是模板方法模式(Template Method Pattern)。它提供了若干鈎子方法供子類實現(如tryAcquire、tryRelease等),AQS的模板方法(如acquire、release等)會調用這些鈎子方法。子類使用AQS的方式就是直接調用AQS的模板方法,並重寫這些模板方法涉及到的特定鈎子方法即可。不需要調用的鈎子方法可以不用重寫,AQS為它們均提供了默認實現:拋出UnsupportedOperationException異常
此外,AQS也提供了其他一些方法供子類調用,如getState、hasQueuedPredecessors等方法,方便子類獲取、判斷同步器的狀態
什么是鈎子方法?
鈎子方法的概念源於模板方法模式,這種模式是在一個方法中定義了算法的骨架,某些關鍵步驟會交給子類去實現。模板方法在不改變算法本身結構的情況下,允許子類自定義其中一些關鍵步驟
這些關鍵步驟可以由父類定義成方法,這些方法可以是抽象方法,或鈎子方法
- 抽象方法:父類定義但不實現,由
abstract關鍵字標識- 鈎子方法:父類定義且實現,但這種實現一般都是空實現,並沒有任何意義,這么做只是為了方便子類根據需要重寫特定的鈎子方法,而不用實現所有的鈎子方法
AQS的核心思想:
- 使用一個
volatile int變量state(也被稱為資源),進行同步控制,但是state在不同的同步工具實現中具有不同的語義。另外配合Unsafe類提供的CAS操作,原子性地修改state值,保證其線程安全性 - AQS內部維護了一個同步隊列,用來管理排隊的線程。另外需要借助
LockSupport類提供的線程阻塞、喚醒方法
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任
AQS的基本結構
狀態state
AQS使用volatile int變量state來作為核心狀態,所有的同步控制都是圍繞這個state來進行的,volatile保證其內存可見性,並使用CAS確保state的修改是原子的。volatile和CAS同時存在,就保證了state的線程安全性
對於不同的同步工具實現來說,語義是不同的,如下:
ReentratntLock:表示當前線程獲取鎖的重入次數,0表示鎖空閑ReentrantReadWriteLock:state的高16位表示讀鎖數量,低16位表示寫鎖數量CountDownLatch:表示當前的計數值Semaphore:表示當前可用信號量的個數
針對state這個核心狀態,AQS提供了getState、setState等多個獲取、修改方法,源碼如下:
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任
同步隊列
Node類
AQS內部維護了一個同步隊列(網上有些文章會叫它為CLH隊列,至於為啥叫這個我也不知道-_-||,但不重要~)。隊列中的每個節點都是Node類型其源碼如下:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
prev、next用於保存該節點的前驅、后繼節點,表明這個同步隊列是一個雙向隊列
Node的thread域保存了對應的線程,只有在創建時賦值,使用完要null掉,以方便GC
Node使用SHARED、EXCLUSIVE兩個常量來標記該線程是由於獲取共享資源、互斥資源失敗,而被阻塞並放入到同步隊列中進行等待
Node使用waitStatus來記錄當前線程的等待狀態,通過CAS進行修改。它的取值可以是:
CANCELLED:表示該節點由於超時或中斷而被取消。該狀態不會再轉變為其他狀態,而且該節點的線程再也不會被阻塞SIGNAL:表示其后繼節點(后面相鄰的那個節點)需要被喚醒,即該線程被釋放或被取消時,必須喚醒其后繼節點CONDITION:表示該節點的線程在條件隊列中等待,而非在同步隊列中。如果該條件變量signal該節點后,該節點會被轉移到同步隊列中參與資源競爭PROPAGATE:只有在共享模式下才會被用到,表示無條件傳播狀態。引入這個狀態是為了解決共享模式下並發釋放而引起的線程掛起的bug,這里不多解釋,網上有文章給出了更加詳細的解釋,見下方
AQS:為什么需要PROPAGATE?
AQS源碼深入分析之共享模式-你知道為什么AQS中要有PROPAGATE這個狀態嗎?
同步隊列的結構
AQS中維護了一個同步隊列,它通過兩個指針標記隊頭、隊尾,分別是head和tail,源碼如下:
private transient volatile Node head;
private transient volatile Node tail;
該隊列的出入規則遵循FIFO(First In, First Out)
注意:如果該同步隊列非空,那么head其實並不是指向第一個線程對應的Node,而是指向一個空的Node
接下來讓我們剖析一下AQS針對這個同步隊列設計的入隊、出隊算法
入隊算法
入隊事件主要在線程嘗試獲取資源失敗時觸發。當線程嘗試獲取資源失敗之后,會將該線程加入到同步隊列的隊尾
入隊算法的源碼見AQS的addWaiter方法,如下:
// mode可以是Node.EXCLUSIVE或Node.SHARED
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
首先為該線程創建一個Node節點,mode可以是Node.EXCLUSIVE或Node.SHARED,表示兩種不同的模式。
之后直接CAS試圖將其入隊。這里注意,如果隊列本身為空,或CAS競爭失敗,才會進入enq方法。這里addWaiter方法出於性能考慮,先嘗試快捷的入隊方式,不成功才執行enq方法
enq方法是完整的入隊邏輯,源碼如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 如果隊列為空,則將head和tail初始化為同一個空Node
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // 不斷CAS直到成功為止
t.next = node;
return t;
}
}
}
}
enq中的代碼都包含在for循環中,如果CAS失敗,就會不斷循環CAS直到成功為止
注意,這段代碼也體現出同步隊列的三個特點:
- 入隊都是從隊尾
- 進入隊列的操作都是CAS操作,保證了線程安全性
- 如果隊列為空,則
head和tail都為null;如果不為空,head指向的節點並不是第一個線程對應的節點,而是一個啞節點
出隊算法
出隊事件主要發生在:位於同步隊列中的線程再次獲取資源,並成功獲得時
出隊算法在AQS中並沒有直接對應的方法,而是零散分布在某些方法中。因為獲取資源失敗而被阻塞的線程被喚醒后,會重新嘗試獲取資源。如果獲取成功,則會執行出隊邏輯
例如,在acquireQueued方法中,就包含了出隊事件:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
出隊的邏輯體現在第6-9行,此時p指向head指向的空節點,而node是隊首元素(不是第一個空節點)
首先調用setHead方法,將head指向node、將node的thread域、prev域置空,然后將head的next域置空,以方便該節點的GC
節點的取消
線程會因為超時或中斷而被取消,之后不會再參與鎖的競爭,會等待GC
取消的過程見cancelAcquire方法,該方法的調用時機都是在獲取資源失敗之后,而失敗就是由於超時或中斷。其源碼如下:
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null; // 將thread域置空以方便GC
// 向前遍歷並跳過被取消的Node
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// 如果是tail,那么將tail修改為pred
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 如果node的next需要signal,那么就將pred的next設為node的next
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
總之,cancelAcquire方法就是將目標節點node的thread域置空,並將waitStatus置為CANCELLED
這里有一個問題:
node的后繼節點next的prev指針仍然指向node,沒有更新為pred,這不僅語義上是錯誤的,而且會阻礙node被GC。那么何時進行更新?
答:任何其他線程嘗試獲取鎖失敗之后,都會被放入同步隊列,然后調用shouldParkAfterFailedAcquire方法判斷是否應該被阻塞。如果發現當前節點的前驅節點被置為CANCELLED,就會執行:
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
此外,cancelAcquire方法也會做類似的操作,如下:
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
這兩處都會更新被取消節點的后繼節點的
prev指針,所以前面說到的的問題根本不存在
注意:cancelAcquire的調用時機一般都是在獲取鎖邏輯后面的finally塊中,如果獲取失敗就會調用cancelAcquire方法。獲取失敗的原因主要有兩個,中斷或超時
總結:
- 節點被取消的原因:獲取鎖超時或在獲取的過程中被中斷
- 取消節點的主要邏輯:將其
waitStatus修改為CANCELLED。再將節點thread域置空,將指向它的next指針指向其后繼節點,以方便GC
好了,到這里為止,我們就完成了對AQS基本結構的分析。這里如果有不懂的地方,可以暫時跳過,等看完后續博客再回頭看這篇,應該就能明白了
下一篇我們會逐步剖析AQS如何實現對資源的獲取和釋放,go go go!
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(一)AQS基礎
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(二)資源的獲取和釋放
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(三)條件變量
