大家好,我是小黑,一個在互聯網苟且偷生的農民工。
在Java並發編程中,經常會用到鎖,除了Synchronized這個JDK關鍵字以外,還有Lock接口下面的各種鎖實現,如重入鎖ReentrantLock,還有讀寫鎖ReadWriteLock等,他們在實現鎖的過程中都是依賴與AQS來完成核心的加解鎖邏輯的。那么AQS具體是什么呢?
提供一個框架,用於實現依賴先進先出(FIFO)等待隊列的阻塞鎖和相關同步器(信號量,事件等)。 該類被設計為大多數類型的同步器的有用依據,這些同步器依賴於單個原子int值來表示狀態。 子類必須定義改變此狀態的受保護方法,以及根據該對象被獲取或釋放來定義該狀態的含義。 給定這些,這個類中的其他方法執行所有排隊和阻塞機制。 子類可以保持其他狀態字段,但只以原子方式更新int使用方法操縱值getState() , setState(int)和compareAndSetState(int, int)被跟蹤相對於同步。
上述內容來自JDK官方文檔。
簡單來說,AQS是一個先進先出(FIFO)的等待隊列,主要用在一些線程同步場景,需要通過一個int類型的值來表示同步狀態。提供了排隊和阻塞機制。
類圖結構
從類圖可以看出,在ReentrantLock中定義了AQS的子類Sync,可以通過Sync實現對於可重入鎖的加鎖,解鎖。
AQS通過int類型的狀態state來表示同步狀態。
AQS中主要提供的方法:
acquire(int) 獨占方式獲取鎖
acquireShared(int) 共享方式獲取鎖
release(int) 獨占方式釋放鎖
releaseShared(int) 共享方式釋放鎖
獨占鎖和共享鎖
關於獨占鎖和共享鎖先給大家普及一下這個概念。
獨占鎖指該鎖只能同時被一個線程持有;
共享鎖指該鎖可以被多個線程同時持有。
舉個生活中的例子,比如我們使用打車軟件打車,獨占鎖就好比我們打快車或者專車,一輛車只能讓一個客戶打到,不能兩個客戶同時打到一輛車;共享鎖就好比打拼車,可以有多個客戶一起打到同一輛車。
AQS內部結構
我們簡單通過一張圖先來了解下AQS的內部結構。其實就是有一個隊列,這個隊列的頭結點head代表當前正在持有鎖的線程,后續的其他節點代表當前正在等待的線程。
接下來我們通過源碼來看看AQS的加鎖和解鎖過程。先來看看獨占鎖是如何進行加解鎖的。
獨占鎖加鎖過程
ReentrantLock lock = new ReentrantLock();
lock.lock();
public void lock() {
// 調用sync的lock方法
sync.lock();
}
可以看到在ReentrantLock的lock方法中,直接調用了sync這個AQS子類的lock方法。
final void lock() {
// 獲取鎖
acquire(1);
}
public final void acquire(int arg) {
// 1.先嘗試獲取,如果獲取成功,則直接返回,代表加鎖成功
if (!tryAcquire(arg) &&
// 2.如果獲取失敗,則調用addWaiter在等待隊列中增加一個節點
// 3. 調用acquireQueued告訴前一個節點,在解鎖之后喚醒自己,然后線程進入等待狀態
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果在等待過程中被中斷,則當前線程中斷
selfInterrupt();
}
在獲取鎖時,基本可以分為3步:
- 嘗試獲取,如果成功則返回,如果失敗,執行下一步;
- 將當前線程放入等待隊列尾部;
- 標記前面等待的線程執行完之后喚醒當前線程。
/**
* 嘗試獲取鎖(公平鎖實現)
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 獲取state,初始值為0,每次加鎖成功會+1,解鎖成功-1
int c = getState();
// 當前沒有線程占用
if (c == 0) {
// 判斷是否有其他線程排隊在本線程之前
if (!hasQueuedPredecessors() &&
// 如果沒有,通過CAS進行加鎖
compareAndSetState(0, acquires)) {
// 將當前線程設置為AQS的獨占線程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果當前線程是正在獨占的線程(已持有鎖,重入)
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// state+1
setState(nextc);
return true;
}
return false;
}
private Node addWaiter(Node mode) {
// 創建一個當前線程的Node節點
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 如果等待隊列的尾節點!=null
if (pred != null) {
// 將本線程對應節點的前置節點設置為原來的尾節點
node.prev = pred;
// 通過CAS將本線程節點設置為尾節點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//尾節點為空,或者在CAS時失敗,則通過enq方法重新加入到尾部。(本方法內部采用自旋)
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 尾節點為空,代表等待隊列還沒有被初始化過
if (t == null) {
// 創建一個空的Node對象,通過CAS賦值給Head節點,如果失敗,則重新自旋一次,如果成功,將Head節點賦值給尾節點
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 尾節點不為空的情況,說明等待隊列已經被初始化過,將當前節點的前置節點指向尾節點
node.prev = t;
// 將當前節點CAS賦值給尾節點
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
// 標識是否加鎖失敗
boolean failed = true;
try {
// 是否被中斷
boolean interrupted = false;
for (;;) {
// 取出來當前節點的前一個節點
final Node p = node.predecessor();
// 如果前一個節點是head節點,那么自己就是老二,這個時候再嘗試獲取一次鎖
if (p == head && tryAcquire(arg)) {
// 如果獲取成功,把當前節點設置為head節點
setHead(node);
p.next = null; // help GC
failed = false; // 標識加鎖成功
return interrupted;
}
// shouldParkAfterFailedAcquire 檢查並更新前置節點p的狀態,如果node節點應該阻塞就返回true
// 如果返回false,則自旋一次。
if (shouldParkAfterFailedAcquire(p, node) &&
// 當前線程阻塞,在阻塞被喚醒時,判斷是否被中斷
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) // 如果加鎖成功,則取消獲取鎖
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // ws == -1
/*
* 這個節點已經設置了請求釋放的狀態,所以它可以在這里安全park.
*/
return true;
if (ws > 0) {
/*
* 前置節點被取消了,跳過前置節點重試
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 將前置節點的狀態設置為請求釋放
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
在整個加鎖過程可以通過下圖更清晰的理解。
獨占鎖解鎖過程
public void unlock() {
sync.release(1);
}
同樣解鎖時也是直接調用AQS子類sync的release方法。
public final boolean release(int arg) {
// 嘗試解鎖
if (tryRelease(arg)) {
Node h = head;
// 解鎖成功,如果head!=null並且head.ws不等0,代表有其他線程排隊
if (h != null && h.waitStatus != 0)
// 喚醒后續等待的節點
unparkSuccessor(h);
return true;
}
return false;
}
解鎖過程如下:
- 先嘗試解鎖,解鎖失敗則直接返回false。(理論上不會解鎖失敗,因為正在執行解鎖的線程一定是持有鎖的線程)
- 解鎖成功之后,如果有head節點並且狀態不是0,代表有線程被阻塞等待,則喚醒下一個等待的線程。
protected final boolean tryRelease(int releases) {
// state - 1
int c = getState() - releases;
// 如果當前線程不是獨占AQS的線程,但是這時候又來解鎖,這種情況肯定是非法的。
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 如果狀態歸零,代表鎖釋放了,將獨占線程設置為null
free = true;
setExclusiveOwnerThread(null);
}
// 將減1之后的狀態設置為state
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* 如果節點的ws小於0,將ws設置為0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 從等待隊列的尾部往前找,直到第二個節點,ws<=0的節點。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果存在符合條件的節點,unpark喚醒這個節點的線程。
if (s != null)
LockSupport.unpark(s.thread);
}
共享鎖加鎖過程
為了實現共享鎖,AQS中專門有一套和排他鎖不同的實現,我們來看一下源碼具體是怎么做的。
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
// tryAcquireShared 嘗試獲取共享鎖許可,如果返回負數標識獲取失敗
// 返回0表示成功,但是已經沒有多余的許可可用,后續不能再成功,返回正數表示后續請求也可以成功
if (tryAcquireShared(arg) < 0)
// 申請失敗,則加入到共享等待隊列
doAcquireShared(arg);
}
tryAcquireShared嘗試獲取共享許可,本方法需要在子類中進行實現。不同的實現類實現方式不一樣。
下面的代碼是ReentrentReadWriteLock中的實現。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 當前有獨占線程正在持有許可,並且獨占線程不是當前線程,則返回失敗(-1)
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 沒有獨占線程,或者獨占線程是當前線程。
// 獲取已使用讀鎖的個數
int r = sharedCount(c);
// 判斷當前讀鎖是否應該阻塞
if (!readerShouldBlock() &&
// 已使用讀鎖小於最大數量
r < MAX_COUNT &&
// CAS設置state,每次加SHARED_UNIT標識共享鎖+1
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) { // 標識第一次加讀鎖
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 重入加讀鎖
firstReaderHoldCount++;
} else {
// 並發加讀鎖,記錄當前線程的讀的次數,HoldCounter中是一個ThreadLocal。
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 否則自旋嘗試獲取共享鎖
return fullTryAcquireShared(current);
}
本方法可以總結為三步:
- 如果有寫線程獨占,則失敗,返回-1
- 沒有寫線程或者當前線程就是寫線程重入,則判斷是否讀線程阻塞,如果不用阻塞則CAS將已使用讀鎖個數+1
- 如果第2步失敗,失敗原因可能是讀線程應該阻塞,或者讀鎖達到上限,或者CAS失敗,則調用fullTryAcquireShared方法。
private void doAcquireShared(int arg) {
// 加入同步等待隊列,指定是SHARED類型
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 取到當前節點的前一個節點
final Node p = node.predecessor();
// 如果前一個節點是頭節點,則當前節點是第二個節點。
if (p == head) {
// 因為是FIFO隊列,所以當前節點這時可以再嘗試獲取一次。
int r = tryAcquireShared(arg);
if (r >= 0) {
// 獲取成功,把當前節點設置為頭節點。並且判斷是否需要喚醒后面的等待節點。
// 如果條件允許,就會喚醒后面的節點
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 如果前置節點不是頭結點,說明當前節點線程需要阻塞等待,並告知前一個節點喚醒
// 檢查並更新前置節點p的狀態,如果node節點應該阻塞就返回true
// 當前線程被喚醒之后,會從parkAndCheckInterrupt()執行
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
共享鎖釋放過程
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//tryReleaseShared()嘗試釋放許可,這個方法在AQS中默認拋出一個異常,需要在子類中實現
if (tryReleaseShared(arg)) {
// 喚醒線程,設置傳播狀態 WS
doReleaseShared();
return true;
}
return false;
}
AQS是很多並發場景下同步控制的基石,其中的實現相對要復雜很多,還需要多看多琢磨才能完全理解。本文也是和大家做一個初探,給大家展示了核心的代碼邏輯,希望能有所幫助。
好的,本期內容就到這里,我們下期見;關注公眾號【小黑說Java】更多干貨。