AQS詳解
AQS:提供原子式管理同步狀態,阻塞和喚醒線程功能以及隊列模型。
ReentrantLock
特性
- 為可重入鎖,一個線程能夠對一個臨界資源重復加鎖。
- 通過AQS實現鎖機制。
- 支持響應中斷,超時和嘗試獲取鎖。
- 必須使用
unlock()
釋放鎖。 - 有公平鎖和非公平鎖。
- 可以關聯多個條件隊列。
加鎖
非公平鎖:
- 若通過AQS設置變量
state
(同步狀態)成功,即獲取鎖成功,則將當前下線程設置為獨占線程。 - 若獲取失敗,則進入
acquire()
方法進行后續處理。
公平鎖:
- 進入
acquire()
方法進行后續處理。
AQS
核心思想:
- 若請求的共享資源空閑,則將當前請求的線程設置為有效的工作線程,並將共享資源設置為鎖定狀態。
- 若共享資源被占用,則需要阻塞等待喚醒機制保證鎖的分配。
實現:
- 通過CLH隊列的變體:FIFO雙向隊列實現的。
- 每個請求資源的線程被包裝成一個節點來實現鎖的分配。
- 通過
volatile
的int
類型的成員變量state
表示同步狀態。 - 通過FIFO隊列完成資源獲取的排隊工作。
- 通過CAS完成對
state
的修改。
節點Node
方法和屬性
方法或屬性 | 含義 |
---|---|
waitStatus |
當前節點在隊列中的狀態 |
thread |
節點對應的線程 |
prev |
前驅指針 |
next |
后繼指針 |
predecessor |
返回前驅節點 |
nextWaiter |
指向下一個CONDITION狀態節點 |
waitStatus
狀態:
- 0:Node初始化后的默認值。
- CANCELLED:為1,線程獲取鎖的請求已被取消。
- CONDITION:為-2,節點在等待隊列中,等待喚醒。
- PROPAGATE:為-3,線程處於SHARED狀態下使用。
- SIGNAL:為-1,線程已准備,等待資源釋放。
線程的鎖模式:
- SHARED:共享模式等待鎖。
- EXCLUSIVE:獨占模式等待鎖。
state與鎖模式
獨占模式:
- 初始化
state=0
。 - 試圖獲取同步狀態:若
state
為0,則設置為1,獲取鎖,進行后續操作。 - 若
state
非0,則當前線程阻塞。
共享模式:
- 初始化
state=n
(表示最多n個線程並發) - 試圖獲取同步狀態:若
state
大於0,則使用CAS對state
進行自減操作,進行后續操作。 - 若不大於0,則當前線程阻塞。
重寫AQS實現的方法
方法 | 描述 |
---|---|
boolean isHeldExclusively() |
該線程是否正在獨占資源 |
boolean tryAcquire(int arg) |
獨占試圖獲取鎖,arg為獲取鎖的次數,獲取成功則返回true |
boolean tryRelease(int arg) |
獨占方式試圖釋放鎖,arg為釋放的鎖的次數,成功則返回true |
int tryAcquireShared(int arg) |
共享方式獲取鎖,負數則失敗,0表示成功,但沒有剩余可用資源,正數表示成功,有剩余資源 |
boolean tryReleaseShared(int arg) |
共享方式釋放鎖,允許喚醒后續等待節點並返回true |
注:
一般為獨占或共享方式,也可同時實現獨占和共享(ReentrantReadWriteLock)
ReentrantLock非公平鎖lock()
方法執行流程:
加鎖流程:
- 通過
ReentrantLock
的加鎖方法lock()
進行加鎖。 - 調用內部類
Sync
的lock()
方法,由於是抽象方法,則由ReentrantLock
初始化選擇公平鎖和非公平鎖,執行相關內部類的lock()
方法,從而執行AQS的acquire()
方法。 - AQS的
acquire()
方法會執行tryAcquire()
方法,由於tryAcquire()
需要自定義,則會執行ReentrantLock
中的tryAcquire()
方法,根據是公平鎖還是非公平鎖,執行不同的tryAcquire()
。 tryAcquire()
為獲取鎖,若獲取失敗,則執行AQS后續策略。
解鎖流程:
- 通過
ReentrantLock
的解鎖方法unlock()
進行解鎖。 unlock()
調用內部類Sync
的release()
方法。release()
會調用tryRelease()
方法,其由ReentrantLock
中的Sync
實現。- 釋放成功,其余由AQS進行處理。
從ReentrantLock到AQS
ReentrantLock中的lock()
// ReentrantLock
final void lock() {
if (compareAndSetState(0, 1)) // CAS設置state
setExclusiveOwnerThread(Thread.currentThread()); // 設置成功則當前線程獨占資源
else
acquire(1); // 設置失敗則后續處理
}
在ReentrantLock
中,lock()
的實現邏輯為:
- 試圖CAS設置狀態為1。
- 若設置成功,則設置當前線程獨占資源。
- 若設置失敗,則通過
acquire()
方法進一步處理。
acquire()
方法實現:
- 試圖獲取訪問
tryAcquire()
,若成功,則獲取鎖成功。 - 若失敗,則加入等待隊列。
// AbstractQueuedSynchronizer
public final void acquire(int arg) {
// 先嘗試獲取,獲取成功則獨占資源,否則進入等待隊列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
線程加入等待隊列
通過addWaiter(Node.EXCLUSIVE)
將當前線程加入等待隊列。
加入流程:
- 由當前線程構造一個節點。
- 若等待隊列不為空時,則設置當前節點為隊列尾節點。
- 若隊列為空或者失敗時,則重復嘗試將該節點加入到隊列成為尾節點。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 當前線程構造一個節點
if (pred != null) { // 設置當前節點尾隊列的尾節點
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); // 節點加入隊列失敗,則循環嘗試加入隊列,直到成功
return node;
}
// 隊列為空或加入隊列失敗,則循環嘗試加入隊列,直到成功
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 若隊列為空,則當前節點設為頭節點
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t; // 若隊列非空,則當前節點加入隊列為尾節點
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
注:
- 若隊列為空,則構造一個空節點作為頭節點,然后將當前線程構造的節點加入作為尾節點。
- 判斷等待隊列是否有有效節點:
- 若頭節點等於尾節點,則返回false(沒有有效節點,當前節點可以爭奪共享資源).
- 若不等於,則判斷頭節點的下一個節點是否不為null並且是否等於當前節點,若兩個條件均滿足,則返回false.
- 否則返回true(隊列中有有效節點,當前線程進入隊列等待)
線程出隊列
- 獲取當前的前一個節點.
- 若前一個節點為頭節點head並且當前節點獲取鎖成功,則將當前節點設為head結點,當前線程執行后續操作.
- 若前一個節點非頭節點head或者當前結點獲取鎖失敗,則阻塞當前線程,等待被喚醒.
- 被喚醒后,循環重復上面步驟,直到成功獲取鎖.
- 若線程被中斷,則跳出循環,檢查是否獲取成功.成功則執行后續代碼,否則取消當前線程的獲取請求.
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 獲取前一個節點
if (p == head && tryAcquire(arg)) { // 若前一個節點為head,則試圖獲取,若獲取成功,則當前線程設為head
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 若前一個節點非head或獲取失敗,則阻塞當前線程,直到其被喚醒
interrupted = true;
}
} finally {
if (failed) // 若獲取失敗,取消當前線程的請求
cancelAcquire(node);
}
}
獲取失敗后進行阻塞檢查:
- 若前一個節點處於喚醒狀態,則當前線程被阻塞.
- 若前一個節點不處於阻塞狀態,則向前查找節點.
- 若找到一個處於喚醒狀態的節點,則當前線程阻塞.
- 若沒有喚醒狀態的結點,則設置當前結點喚醒,當前線程將循環試圖獲取鎖.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 當前線程的前一個線程處於喚醒狀態,當前線程阻塞
return true;
if (ws > 0) { // 前一個線程的請求被取消,則刪除向前查找,將所有取消的線程從隊列刪除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 當前線程設置為喚醒狀態
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 阻塞當前線程,返回當前線程的中斷狀態
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
線程移出隊列的流程:
取消線程請求
流程:
- 獲取當前節點的前驅節點.
- 若前驅節點的狀態為CANCELLED,則一直向前遍歷,找到第一個非CANCELED節點,設置當前節點為CANCELLED.
- 若當前節點為尾節點:則設置前驅結點指向的下一個節點指針為null.
- 若當前節點為head節點的后繼結點,則設置當前節點的下一個節點指針為null.
- 若當前節點非尾節點並且非head節點的下一個節點,則設置當前結點的下一個結點指向當前結點的下一個節點(從而從前向后遍歷時跳過被CANCELLED的結點)
為什么只對next指針操作,而不對prev指針操作?
修改prev指針,可能導致prev指向一個已經被移出隊列的節點,存在安全問題.
在shouldParkAfterFailedAcquire()
方法內,會處理prev指針,使得CANCELLED的節點從隊列中刪除.
ReentrantLock的unlock()
解鎖流程:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) // 若解鎖的線程非占有資源的線程,則拋出異常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 若持有的線程全部釋放,則占有資源的線程設為null,更新狀態state
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 當前線程釋放鎖
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) // 若隊列非空或者當前線程不處於喚醒狀態,喚醒當前線程后面的一個線程
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// node為當前線程的后一個等待線程
int ws = node.waitStatus;
if (ws < 0) // 若后一個等待線程未被取消,則設置其狀態為可獲取鎖
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) { // 若當前線程下一個線程為null或者被取消
s = null;
// 從后向前查找第一個沒有被取消的等待線程,將其喚醒
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
為什么從后往前查找可喚醒的線程?
新節點入隊列時,是先將其prev指針指向隊尾節點,在將尾節點的next指向新節點,若從前向后查找可喚醒的線程,在這兩個步驟之間發生的查找線程操作會忽略新節點.
同時,在產生CANCELLED節點時,也是先斷開next指針,而prev指針.只有從后向前遍歷才能遍歷完全部的節點.
獲取鎖之后還要進行中斷響應:
- 線程在等待資源后被喚醒,喚醒后不斷嘗試獲得鎖,直到搶到鎖為止.整個流程不會響應中斷,直到搶到鎖后檢查是否被中斷,若被中斷,則補充一次中斷.
小結
- 線程獲取鎖失敗后怎么樣?
在等待隊列中等待,並繼續嘗試獲得鎖.- 排隊隊列的數據類型?
CLH變體的FIFO雙向隊列.- 排隊的線程什么時候有機會獲得鎖?
當前面的線程釋放鎖時,會喚醒后面等待的線程.- 若等待的線程一直無法獲得鎖,需要一直等待嗎?
線程對應的節點被設為CANCELLED狀態,並被清除出隊列.- lock()方法通過acquire()方法進行加鎖,如何進行加鎖?
acquire()調用tryAcquire()進行加鎖,具體由自定義同步器實現.
AQS應用
核心:
- state初始化為0,表示沒有任何線程持有鎖.
- 當有線程持有鎖時,state在原來值上加1,同一個線程多個獲得鎖,則多次加1.
- 如線程釋放鎖,則state減1,直到為0,表示線程釋放鎖.
同步工具 | 特點 |
---|---|
ReentantLock | 使用state保存鎖重復持有的次數,多次獲得鎖時其值遞增 |
Semaphore | 使用AQS同步狀態保存信號量的當前計數,tryRelease 增加計數,acquireShared 減少計數 |
CountDownLatch | 通過AQS同步狀態計數,計數為0,所有的acquire操作才可以通過 |
ReentrantReadWriteLock | AQS同步狀態的16位保存寫鎖持有次數,剩下16位用於保存讀鎖持有次數 |
ThreadPoolExecutor | 利用AQS同步狀態實現獨占線程變量設置 |
參考: