本博客系列是學習並發編程過程中的記錄總結。由於文章比較多,寫的時間也比較散,所以我整理了個目錄貼(傳送門),方便查閱。
本文參考了[Java多線程進階(六)—— J.U.C之locks框架:AQS綜述(1)]和Java技術之AQS詳解兩篇文章。
AQS 簡介
AbstractQueuedSynchronizer
(簡稱AQS)類是整個 JUC包的核心類。JUC 中的ReentrantLock
、ReentrantReadWriteLock
、 CountDownLatch
、Semaphore
和LimitLatch
等同步工具都是基於AQS實現的。
AQS 分離出了構建同步器時的通用關注點,這些關注點主要包括如下:
- 資源是可以被同時訪問?還是在同一時間只能被一個線程訪問?(共享/獨占功能)
- 訪問資源的線程如何進行並發管理?(等待隊列)
- 如果線程等不及資源了,如何從等待隊列退出?(超時/中斷)
這些關注點都是圍繞着資源——同步狀態(synchronization state)來展開的,AQS將這些通用的關注點封裝成了一個個模板方法,讓子類可以直接使用。
AQS 留給用戶的只有兩個問題:
- 什么是資源
- 什么情況下資源是可以被訪問的
這樣一來,定義同步器的難度就大大降低了。用戶只要解決好上面兩個問題,就能構建出一個性能優秀的同步器。
下面是幾個常見的同步器對資源的定義:
同步器 | 資源的定義 |
---|---|
ReentrantLock | 資源表示獨占鎖。State為0表示鎖可用;為1表示被占用;為N表示重入的次數 |
ReentrantReadWriteLock | 資源表示共享的讀鎖和獨占的寫鎖。state邏輯上被分成兩個16位的unsigned short,分別記錄讀鎖被多少線程使用和寫鎖被重入的次數。 |
CountDownLatch | 資源表示倒數計數器。State為0表示計數器歸零,所有線程都可以訪問資源;為N表示計數器未歸零,所有線程都需要阻塞。 |
Semaphore | 資源表示信號量或者令牌。State≤0表示沒有令牌可用,所有線程都需要阻塞;大於0表示由令牌可用,線程每獲取一個令牌,State減1,線程沒釋放一個令牌,State加1。 |
AQS 原理
上面一節中介紹到 AQS 抽象出了三個關注點,下面就具體看下 AQS 是如果解決這三個問題的。
同步狀態的管理
同步狀態,其實就是資源。AQS使用單個int(32位)來保存同步狀態,並暴露出getState、setState以及compareAndSetState操作來讀取和更新這個狀態。
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
線程的阻塞和喚醒
在JDK1.5之前,除了內置的監視器機制外,沒有其它方法可以安全且便捷得阻塞和喚醒當前線程。
JDK1.5以后,java.util.concurrent.locks包提供了LockSupport類來作為線程阻塞和喚醒的工具。
等待隊列
等待隊列,是AQS框架的核心,整個框架的關鍵其實就是如何在並發狀態下管理被阻塞的線程。
等待隊列是嚴格的FIFO隊列,是Craig,Landin和Hagersten鎖(CLH鎖)的一種變種,采用雙向循環鏈表實現,因此也叫CLH隊列。
1. 節點定義
CLH隊列中的結點是對線程的包裝,結點一共有兩種類型:獨占(EXCLUSIVE)和共享(SHARED)。
每種類型的結點都有一些狀態,其中獨占結點使用其中的CANCELLED(1)、SIGNAL(-1)、CONDITION(-2),共享結點使用其中的CANCELLED(1)、SIGNAL(-1)、PROPAGATE(-3)。
結點狀態 | 值 | 描述 |
---|---|---|
CANCELLED | 1 | 取消。表示后驅結點被中斷或超時,需要移出隊列 |
SIGNAL | -1 | 發信號。表示后驅結點被阻塞了(當前結點在入隊后、阻塞前,應確保將其prev結點類型改為SIGNAL,以便prev結點取消或釋放時將當前結點喚醒。) |
CONDITION | -2 | Condition專用。表示當前結點在Condition隊列中,因為等待某個條件而被阻塞了 |
PROPAGATE | -3 | 傳播。適用於共享模式(比如連續的讀操作結點可以依次進入臨界區,設為PROPAGATE有助於實現這種迭代操作。) |
INITIAL | 0 | 默認。新結點會處於這種狀態 |
AQS使用CLH隊列實現線程的結構管理,而CLH結構正是用前一結點某一屬性表示當前結點的狀態,之所以這種做是因為在雙向鏈表的結構下,這樣更容易實現取消和超時功能。
next指針:用於維護隊列順序,當臨界區的資源被釋放時,頭結點通過next指針找到隊首結點。
prev指針:用於在結點(線程)被取消時,讓當前結點的前驅直接指向當前結點的后驅完成出隊動作。
static final class Node {
// 共享模式結點
static final Node SHARED = new Node();
// 獨占模式結點
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**
* INITAL: 0 - 默認,新結點會處於這種狀態。
* CANCELLED: 1 - 取消,表示后續結點被中斷或超時,需要移出隊列;
* SIGNAL: -1- 發信號,表示后續結點被阻塞了;(當前結點在入隊后、阻塞前,應確保將其prev結點類型改為SIGNAL,以便prev結點取消或釋放時將當前結點喚醒。)
* CONDITION: -2- Condition專用,表示當前結點在Condition隊列中,因為等待某個條件而被阻塞了;
* PROPAGATE: -3- 傳播,適用於共享模式。(比如連續的讀操作結點可以依次進入臨界區,設為PROPAGATE有助於實現這種迭代操作。)
*
* waitStatus表示的是后續結點狀態,這是因為AQS中使用CLH隊列實現線程的結構管理,而CLH結構正是用前一結點某一屬性表示當前結點的狀態,這樣更容易實現取消和超時功能。
*/
volatile int waitStatus;
// 前驅指針
volatile Node prev;
// 后驅指針
volatile Node next;
// 結點所包裝的線程
volatile Thread thread;
// Condition隊列使用,存儲condition隊列中的后繼節點
Node nextWaiter;
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
}
2. 隊列定義
對於CLH隊列,當線程請求資源時,如果請求不到,會將線程包裝成結點,將其掛載在隊列尾部。
下面結合代碼一起看下節點進入隊列的過程。
private Node enq(final Node node) {
for (;;) {
Node t = tail; // 1
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) // 2
tail = head;
} else {
node.prev = t; // 3
if (compareAndSetTail(t, node)) { // 4
t.next = node;
return t;
}
}
}
}
如上代碼在第一次循環中,當要在AQS隊列尾部插入元素時,AQS隊列狀態如下圖中(default)所示。也就是隊列頭、尾節點都指向null;當執行代碼(1)后節點t指向了尾部節點,這時候隊列狀態如圖中(I)所示。
這時候t為null,故執行代碼(2),使用CAS算法設置一個哨兵節點為頭節點,如果CAS設置成功,則讓尾部節點也指向哨兵節點,這時候隊列狀態如圖中(II)所示。
到現在為止只插入了一個哨兵節點,還需要插入node節點,所以在第二次循環后執行到代碼(1),這時候隊列狀態如圖中(III)所示;然后執行代碼(3)設置node的前驅節點為尾部節點,這時候隊列狀態如圖中(IV)所示;
然后通過CAS算法設置node節點為尾部節點,CAS成功后隊列狀態如圖中(V)所示;
CAS成功后再設置原來的尾部節點的后驅節點為node,這時候就完成了雙向鏈表的插入,此時隊列狀態如圖中(VI)所示。
AQS 的方法介紹
用戶需要自己重寫的方法
上面介紹到 AQS 已經幫用戶解決了同步器定義過程中的大部分問題,只將下面兩個問題丟給用戶解決:
- 什么是資源
- 什么情況下資源是可以被訪問的
具體的,AQS 是通過暴露以下 API 來讓用戶解決上面的問題的。
鈎子方法 | 描述 |
---|---|
tryAcquire | 獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。 |
tryRelease | 獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。 |
tryAcquireShared | 共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。 |
tryReleaseShared | 共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回true,否則返回false。 |
isHeldExclusively | 該線程是否正在獨占資源。只有用到condition才需要去實現它。 |
如果你需要實現一個自己的同步器,一般情況下只要繼承 AQS ,並重寫 AQS 中的這個幾個方法就行了。至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。要不怎么說Doug Lea貼心呢。
需要注意的是:如果你沒在子類中重寫這幾個方法就直接調用了,會直接拋出異常。所以,在你調用這些方法之前必須重寫他們。不使用的話可以不重寫。
AQS 提供的一系列模板方法
查看 AQS 的源碼我們就可以發現這個類提供了很多方法,看起來讓人“眼花繚亂”的。但是最主要的兩類方法就是獲取資源的方法和釋放資源的方法。因此我們抓住主要矛盾就行了:
- public final void acquire(int arg) // 獨占模式的獲取資源
- public final boolean release(int arg) // 獨占模式的釋放資源
- public final void acquireShared(int arg) // 共享模式的獲取資源
- public final boolean releaseShared(int arg) // 共享模式的釋放資源
acquire(int)方法
該方法以獨占方式獲取資源,如果獲取到資源,線程繼續往下執行,否則進入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響。該方法是獨占模式下線程獲取共享資源的頂層入口。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
下面分析下這個acquire
方法的具體執行流程:
step1:首先這個方法調用了用戶自己實現的方法
tryAcquire
方法嘗試獲取資源,如果這個方法返回true,也就是表示獲取資源成功,那么整個acquire
方法就執行結束了,線程繼續往下執行;step2:如果
tryAcquir
方法返回false,也就表示嘗試獲取資源失敗。這時acquire
方法會先調用addWaiter
方法將當前線程封裝成Node類並加入一個FIFO的雙向隊列的尾部。step3:再看
acquireQueued
這個關鍵方法。首先要注意的是這個方法中哪個無條件的for循環,這個for循環說明acquireQueued
方法一直在自旋嘗試獲取資源。進入for循環后,首先判斷了當前節點的前繼節點是不是頭節點,如果是的話就再次嘗試獲取資源,獲取資源成功的話就直接返回false(表示未被中斷過)假如還是沒有獲取資源成功,判斷是否需要讓當前節點進入
waiting
狀態,經過shouldParkAfterFailedAcquire
這個方法判斷,如果需要讓線程進入waiting
狀態的話,就調用LockSupport的park方法讓線程進入waiting
狀態。進入waiting
狀態后,這線程等待被interupt
或者unpark
(在release操作中會進行這樣的操作,可以參見后面的代碼)。這個線程被喚醒后繼續執行for循環來嘗試獲取資源。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//首先判斷了當前節點的前繼節點是不是頭節點,如果是的話就再次嘗試獲取資源,
//獲取資源成功的話就直接返回false(表示未被中斷過)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//判斷是否需要讓當前節點進入waiting狀態
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果在整個等待過程中被中斷過,則返回true,否則返回false。
// 如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
以上就是acquire
方法的簡單分析。
單獨看這個方法的話可能會不太清晰,結合ReentrantLock
、ReentrantReadWriteLock
、 CountDownLatch
、Semaphore
和LimitLatch
等同步工具看這個代碼的話就會好理解很多。
release(int)方法
release(int)
方法是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//上面已經講過了,需要用戶自定義實現
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
與acquire()方法中的tryAcquire()類似,tryRelease()方法也是需要獨占模式的自定義同步器去實現的。正常來說,tryRelease()都會成功的,因為這是獨占模式,該線程來釋放資源,那么它肯定已經拿到獨占資源了,直接減掉相應量的資源即可(state-=arg),也不需要考慮線程安全的問題。
但要注意它的返回值,上面已經提到了,release()是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!所以自義定同步器在實現時,如果已經徹底釋放資源(state=0),要返回true,否則返回false。
unparkSuccessor(Node)
方法用於喚醒等待隊列中下一個線程。這里要注意的是,下一個線程並不一定是當前節點的next節點,而是下一個可以用來喚醒的線程,如果這個節點存在,調用unpark()
方法喚醒。
總之,release()是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。(需要注意的是隊列中被喚醒的線程不一定能立馬獲取資源,因為資源在釋放后可能立馬被其他線程(不是在隊列中等待的線程)搶掉了)
acquireShared(int)方法
acquireShared(int)
方法是共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源為止,整個過程忽略中斷。
public final void acquireShared(int arg) {
//tryAcquireShared需要用戶自定義實現
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
可以發現,這個方法的關鍵實現其實是獲取資源失敗后,怎么管理線程。也就是doAcquireShared
的邏輯。
//不響應中斷
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看出,doAcquireShared
的邏輯和acquireQueued
的邏輯差不多。將當前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應量的資源后才返回。
簡單總結下acquireShared
的流程:
step1:tryAcquireShared()嘗試獲取資源,成功則直接返回;
step2:失敗則通過doAcquireShared()進入等待隊列park(),直到被unpark()/interrupt()並成功獲取到資源才返回。整個等待過程也是忽略中斷的。
releaseShared(int)方法
releaseShared(int)
方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列里的其他線程來獲取資源。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
釋放掉資源后,喚醒后繼。跟獨占模式下的release()相似,但有一點稍微需要注意:獨占模式下的tryRelease()在完全釋放掉資源(state=0)后,才會返回true去喚醒其他線程,這主要是基於獨占下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制一定量的線程並發執行,那么擁有資源的線程在釋放掉部分資源時就可以喚醒后繼等待結點。