關於AQS的源碼解析,本來是沒有打算特意寫一篇文章來介紹的。不過在寫本學期課程作業中,有一門寫了關於AQS的,而且也畫了一些相關的圖,所以直接拿過來分享一下,如有錯誤歡迎指正。
然后基本簡介也都不介紹了,網上一大堆,這里就直接進行源碼的分析了。
AQS基本屬性
AQS屬性簡介:
屬性 | 類型 | 詳解 |
---|---|---|
Head | Node類型 | 持有鎖的線程結點,也是隊列中的頭結點 |
Tail | Node類型 | 阻塞隊列中的尾結點,同時每一個新的結點進來,都插入到阻塞隊列的最后。 |
State | int類型 | 大於等於0。代表當前鎖的狀態。0代表沒有線程占用當前鎖,大於0代表有線程持有鎖。 |
exclusiveOwnerThread(繼承自AOS) | Thread類型 | 代表獨占鎖的線程。 |
AQS的具體結構如下圖所示:
在AQS鏈表中,將每一個線程包裝成Node實例,並通過鏈表的形式鏈接保存,在鏈式結構中,節點通過next和prev分別與前驅節點和后置節點相連接。其中head節點表示為當前持有鎖的線程,不在阻塞隊列中。tail節點為鏈表中最后一個節點,當有新的節點被添加到鏈表中后,AQS會將tail引用指向最后一個被添加進鏈表的節點。
AQS中Node內部類
Node屬性簡介:
字段 | 簡介 | 字段 | 簡介 |
---|---|---|---|
SHARE | 標識節點當前在共享模式下 | EXCLUSIVE | 標識節點當前在獨占模式下 |
CANCELLED | 標識當前節點所表示的線程已取消搶鎖 | SIGNAL | 標識當前節點需要在釋放鎖后喚醒后繼節點 |
CONDITION | 與ConditionObject內部類有關 | waitStatue | 取值為以上幾種狀態 |
prev | 代表當前節點的前驅節點 | next | 代表當前節點的后繼節點 |
thread | 代表當前節點所表示的線程 |
1 加鎖
這里以一個鎖的具體使用方法對AQS類進行詳細的分析:
首先,線程先對鎖對象進行獲取操作,如果當前需要獲取的鎖對象並沒有其他線程所持有,成功獲取到了鎖,將執行相關的業務代碼,執行完畢后,對鎖資源進行釋放,以便其他線程所使用。如果當前線程獲取鎖資源失敗,說明鎖資源有其他線程在使用,當前線程將進行阻塞狀態,等待再次獲取鎖資源。
1.1 AQS中如何獲取鎖
以java.util.concurrent.locks.ReentrantLock.java
文件中的公平鎖為例:
abstract static class Sync extends AbstractQueuedSynchronizer
#java.util.concurrent.locks.ReentrantLock中第220行
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1); #調用了AQS中的方法
}
...
}
================AQS====================
#java.util.concurrent.locks.AbstractQueuedSynchronizer中第1197行
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
在lock()
方法中,線程首先嘗試搶鎖tryAcquire(1)
,如果搶鎖成功則直接返回true
,代表當前線程已持有鎖資源,否則返回false
,進行下一次搶鎖動作。
當線程搶鎖失敗后,AQS將將當前線程封裝成Node
節點,並添加到阻塞隊列。之后將從阻塞隊列中依次取出等待鎖的Node
節點,並再次嘗試獲取鎖.如果再次獲取鎖失敗,則使當前線程自己中斷自己。
1.2 嘗試獲取鎖資源
首先獲取鎖的狀態,判斷當前是否有線程持有鎖,這里分為兩種情況:
-
如果當前並沒有線程持有鎖資源,則判斷阻塞隊列中是否有節點排在當前節點的前面等待獲取鎖資源。這里分為兩種情況:
- 如果有其他線程在等待獲取鎖資源,則進行等待
- 如果沒有其他線程在等待獲取鎖資源,表明當前線程是第一個等待獲取鎖的線程,隨后嘗試對鎖資源進行獲取,如果成功獲取到鎖資源則將當前線程設置為獨占鎖的線程,同時返回true.
-
如果當前有線程持有鎖,則進行判斷是否是當前線程所持有鎖資源,這是分為兩種情況:
- 鎖資源被當前線程所持有,則表明是重入鎖,隨后將獲取鎖的次數加一,返回true.
- 持有鎖資源並不是當前線程,返回false.
流程圖如下:
源碼:
#java.util.concurrent.locks.ReentrantLock中第231行
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1.3 判斷阻塞隊列中是否有其他節點
在線程獲取鎖之前,首先判斷阻塞隊列中是否有其他節點,如果有其他節點則放棄搶鎖。
首先獲取AQS鏈表中的頭節點與尾節點,分別進行判斷:
- 頭節點是否等於尾節點
- 如果頭節點等於尾節點說明阻塞隊列為空,沒有其他節點返回false
- 如果頭節點不等於尾節點,則判斷頭節點的后置節點是否為空
- 如果頭節點的后置節點不為空,則說明阻塞隊列不為空,則判斷阻塞隊列中第一個節點線程是否為當前線程
- 如果是當前線程說明阻塞隊列中沒有其他節點返回false。
- 如果不是當前線程說明阻塞隊列中有其他節點,返回true.
- 如果頭節點的后置節點不為空,則說明阻塞隊列不為空,則判斷阻塞隊列中第一個節點線程是否為當前線程
流程圖如下:
源碼:
#java.util.concurrent.locks.AbstractQueuedSynchronizer中第1512行
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
1.4 將當前線程添加到阻塞隊列
如果當前線程搶鎖失敗則通過AQS將當前線程包裝成Node節點添加進阻塞隊列。
-
將當前線程以獨占鎖的模式包裝成Node節點。
-
將當前節點添加進阻塞隊列。分兩種情況:
- 阻塞隊列中尾節點不為空。
- 將尾節點置為當前節點的前驅節點,通過CAS操作將當前節點置為尾節點。
- 如果成功,則將之前尾結點的后置引用指向當前節點,將當前節點返回。
- 如果存在另一節點提前完成上一步操作,則進行入隊操作。
- 將尾節點置為當前節點的前驅節點,通過CAS操作將當前節點置為尾節點。
- 阻塞隊列中尾節點為空,則進行入隊操作。
- 阻塞隊列中尾節點不為空。
-
入隊操作結束將當前節點返回。
流程圖如下:
源碼:
#java.util.concurrent.locks.AbstractQueuedSynchronizer中第605行
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
1.5 入隊操作
這一步將當前節點添加到阻塞隊列中。
首先獲取阻塞隊列中的尾節點,判斷是否為空,有兩種情況:
- 阻塞隊列中尾節點為空,則初始化阻塞隊列,將頭節點設置為尾節點,
再次獲取尾節點,判斷是否為空。 - 阻塞隊列中尾節點不為空,則將尾節點設置為當前節點的前驅節點。
通過CAS將當前節點設置為尾節點。這里有兩種情況:- 如果成功,則將之前尾結點的后置引用指向當前節點,將當前節點的前驅節點返回。
- 存在另一節點提前完成上一步操作,則再次獲取阻塞隊列中的尾節點,判斷是否為空。
流程圖如下:
源碼:
#java.util.concurrent.locks.AbstractQueuedSynchronizer中第583行
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
1.6 搶鎖或將線程掛起
到達這一步說明節點已進入阻塞隊列,節點嘗試獲取鎖或者進行掛起操作。
- 獲取當前節點的前驅節點
- 判斷前驅節點是否為頭節點,這里有兩種情況:
- 前驅節點為頭節點,說明當前節點前面沒有節點在等待獲取鎖資源,只需要等待前驅節點釋放鎖資源。所以可以嘗試搶鎖,這里有兩種情況:
- 搶鎖成功,則將當前節點設置為頭節點,將當前節點前驅節點的后置引用設置為空,返回false
- 搶鎖失敗,說明頭節點還沒有釋放鎖資源,此時將當前節點掛起。這里有兩種情況:
- 如果掛起成功,則線程等待被喚醒,喚醒之后再次判斷前驅節點是否為頭節點。
- 如果掛起失敗,再次判斷前驅節點是否為頭節點。
- 前驅節點不是頭節點,說明當前節點前面有其他節點在等待獲取鎖資源,此時將當前節點掛起。
- 前驅節點為頭節點,說明當前節點前面沒有節點在等待獲取鎖資源,只需要等待前驅節點釋放鎖資源。所以可以嘗試搶鎖,這里有兩種情況:
- 如果在掛起階段發生異常,則取消搶鎖。
- 這里為無限循環,直到線程獲取到鎖資源或者取消搶鎖才會退出循環。
流程圖如下:
源碼:
#java.util.concurrent.locks.AbstractQueuedSynchronizer中第857行
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1.7 判斷是否應該掛起當前線程
當線程暫時獲取不到鎖資源時,判斷是否應該掛起當前線程。
首先獲取當前節點的前驅節點的狀態,這里有三種情況:
* 前驅節點的狀態為SIGNAL。其中,SIGNAL表明該節點在釋放鎖資源后應該將后置節點喚醒。返回true。
* 前驅節點的狀態為CANCELLED。CANCELLED表明該節點已取消搶鎖,此時將從當前節點開始向前尋找,直到找到一個節點的狀態不為CANCELLED,然后將他設置為當前節點的前驅節點。之后返回false.
* 如果前驅節點的狀態不是以上兩種情況,則通過CAS將前驅節點的狀態設置為SIGNAL,之后返回false。
流程圖如下:
源碼:
#java.util.concurrent.locks.AbstractQueuedSynchronizer中第795行
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
1.8 掛起當前線程
將當前線程掛起,當線程被喚醒后將線程的中斷狀態返回.
源碼:
#java.util.concurrent.locks.AbstractQueuedSynchronizer中第835行
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
2 解鎖
2.1 解鎖操作
嘗試釋放鎖資源,這里有兩種情況:
- 成功釋放鎖資源,獲取到AQS鏈表中頭節點,判斷頭節點是否為空,這里有兩種情況:
- 如果頭節點為空,說明沒有節點持有鎖資源,返回true.
- 如果頭節點不為空,判斷頭節點狀態是否為0:
- 如果頭節點狀態為0,說明阻塞隊列中沒有線程在等待獲取鎖,返回true.
- 如果頭節點狀態不為0,則將阻塞隊列中第一個等待獲取鎖資源的線程喚醒。隨后返回ture.
流程圖如下:
源碼:
#java.util.concurrent.locks.ReentrantLock中第456行
public void unlock() {
sync.release(1);
}
==============================
#java.util.concurrent.locks.AbstractQueuedSynchronizer中第1260行
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
2.2 喚醒后置節點
當持有鎖的節點執行相關代碼完成后,需要釋放鎖資源並喚醒后置節點。
- 首先獲取頭節點的狀態,如果小於0則通過CAS將狀態設置為0.
- 獲取頭節點的后置節點,這里有兩種情況:
- 如果頭節點的后置節點為空或者頭節點的后置節點的狀態大於0,則將頭節點的后置節點置為空,同時從AQS鏈表的尾節點向前搜索,直到找到最后一個節點狀態小於等於0的節點,將該節點喚醒。
- 如果頭節點的后置節點不為空,則直接將該節點喚醒。
流程圖如下:
源碼:
#java.util.concurrent.locks.AbstractQueuedSynchronizer中第638行
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
2.3 取消搶鎖
當線程由於異常或某些特殊情況的發生,需要取消對鎖資源的獲取,將執行取消搶鎖操作。
- 如果需要取消搶鎖的節點為空,則直接返回。
- 否則將節點所包裝的線程置為空。
- 獲取節點的前驅節點,判斷前驅節點的狀態是否大於0,如果大於0則一直向前找,直到找到一個節點的狀態小於等於0,將該節點設置為當前節點的前驅節點。
- 獲取當前節點的后置節點。
- 將當前節點的狀態設置為CANCELLED。
- 判斷當前節點是否為尾節點,這里有兩種情況:
- 如果當前節點是尾節點,則將當前節點的前驅節點設置為尾節點,
同時將后置引用設置為空。 - 如果當前節點不是尾節點,判斷當前節點的前驅節點是否為頭節
點。這里有兩種情況:- 如果當前節點的前驅節點是頭節點,則將當前節點喚醒。
- 如果當前節點的前驅節點不是頭節點,則判斷該節點狀態是否為SIGNAL,如果為SIGNAL,則將該節點的后置引用指向當前節點的后置節點。
- 斷開當前節點與鏈表的連接。
- 如果當前節點是尾節點,則將當前節點的前驅節點設置為尾節點,
流程圖如下:
源碼:
#java.util.concurrent.locks.AbstractQueuedSynchronizer中第742行
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
其實到這里還有一些內容並沒有分析完,以后再補上好了。