在上一篇博客,簡單的說下了AQS的基本概念,核心源碼解析,但是還有一部分內容沒有涉及到,就是AQS對條件變量的支持,這篇博客將着重介紹這方面的內容。
條件變量
基本應用
我們先通過模擬一個消費者/生產者模型來看下條件變量的基本應用:
- 當有數據的時候,生產者停止生產數據,通知消費者消費數據;
- 當沒有數據的時候,消費者停止消費數據,通知生產者生產數據;
public class CommonResource {
private boolean isHaveData = false;
Lock lock = new ReentrantLock();
Condition producer_con = lock.newCondition();
Condition consumer_con = lock.newCondition();
public void product() {
lock.lock();
try {
while (isHaveData) {
try {
System.out.println("還有數據,等待消費數據");
producer_con.await();
} catch (InterruptedException e) {
}
}
System.out.println("生產者生產數據了");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
isHaveData = true;
consumer_con.signal();
} finally {
lock.unlock();
}
}
public void consume() {
lock.lock();
try {
while (!isHaveData) {
try {
System.out.println("沒有數據了,等待生產者消費數據");
consumer_con.await();
} catch (InterruptedException e) {
}
}
System.out.println("消費者消費數據");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
isHaveData = false;
producer_con.signal();
} finally {
lock.unlock();
}
}
}
public class Main {
public static void main(String[] args) {
CommonResource resource = new CommonResource();
new Thread(() -> {
while (true) {
resource.product();
}
}).start();
new Thread(() -> {
while (true) {
resource.consume();
}
}).start();
}
}
運行結果:
這就是條件變量的應用,第一反應是不是和object中的wait/nofity很像,wait/nofity是配合synchronized工作的,而條件變量的await/signal是配合使用AQS實現的鎖
來完成工作的,當然也要看用AQS實現的鎖是否支持了條件變量。synchronized只能與一個共享變量進行工作,而AQS實現的鎖支持多個條件變量。
我們試着分析下上面的代碼:
首先創建了兩個條件變量,一個條件變量用來阻塞/喚醒消費者線程,一個條件變量用來阻塞/喚醒生產者線程。
生產者,首先獲取了獨占鎖,判斷是否有數據:
- 如果有數據,則調用條件變量producer_con的await方法,阻塞當前線程,當消費者線程再次調用該條件變量producer_con的signal方法,就會喚醒該線程。
- 如果沒有數據,則生產數據,並且調用條件變量consumer_con的signal方法,喚醒因為調用consumer_con的await方法而被阻塞的消費者線程。
最終釋放鎖。
消費者,首先獲取了獨占鎖,判斷是否有數據:
- 如果沒有數據,則調用條件變量consumer_con的await方法,阻塞當前線程,當生產者線程再次調用該條件變量consumer_con的signal方法,就會喚醒該線程。
- 如果有數據,則消費數據,並且調用條件變量producer_con的signal方法,喚醒因為調用producer_con的await方法而被阻塞的生產者線程。
最終釋放鎖。
這里有一點需要特別注意:
- 釋放鎖,一般應該放在finally里面,以防中間出現異常,鎖沒有被釋放。
為了加深對條件變量的理解,我們再來看一個例子,兩個線程交替打印奇偶數:
public class Test {
private int num = 0;
private Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void add() {
while(num<100) {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + ":" + num++);
condition.signal();
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
public class Main {
public static void main(String[] args) {
Test test=new Test();
new Thread(() -> {
test.add();
}).start();
new Thread(() -> {
test.add();
}).start();
}
}
運行結果:
翻閱網上的大多數案例是分兩個線程方法交替打印,同時開兩個條件變量,其中一個條件變量負責阻塞/喚醒打印奇數的線程,一個變量負責阻塞/喚醒打印偶數的線程,但是個人覺得沒什么必要,兩個線程共用一個線程方法,共用一個條件變量也可以。不知道各位看官是什么想的?
源碼解析
當我們點開lock.newCondition,發現它有好幾個實現類:
我們選擇ReentrantLock的實現類,實際上其他實現類也是相同的,只是為了和上面案例中的對應起來,所以先選擇ReentrantLock的實現類:
public Condition newCondition() {
return sync.newCondition();
}
繼續往下點:
final ConditionObject newCondition() {
return new ConditionObject();
}
可以看到,當我們調用lock.newnewCondition,最終會new出一個ConditionObject對象,而ConditionObject類是AbstractQueuedSynchronizer的內部類,我們先看下ConditionObject的UML圖:
其中firstWaiter保存的是該條件變量下條件隊列的首節點,lastWaiter保存的是該條件變量下條件隊列的尾節點。這里只保存了條件隊列的首節點和尾節點,中間的節點保存在哪里呢? 讓我們點開await方法:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
在這里,我們就搞清楚三個問題即可:
- 完整的條件隊列保存在哪里,以什么方式保存?
- await方法,是如何釋放鎖的?
- await方法,是如何阻塞線程的?
第一個問題在addConditionWaiter方法可以得到答案:
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
首先是判斷條件隊列中的尾節點是否被取消了,如果被取消了,執行unlinkCancelledWaiters方法。我們這里肯定沒有被取消,事實上,如果是第一次調用await方法,lastWaiter是為空的,所以肯定不會進入第一個if。隨后,新建一個Node,這個Node類就是上一篇博客中大量介紹過的,也是AbstractQueuedSynchronizer的內部類,也就是新建了一個Node節點,其中保存了當前線程和Node的類型,這里Node的類型是CONDITION,如果t==null,則說明新建的Node是第一個節點,所以賦值給firstWaiter ,否則將尾節點的nextWaiter設置為新Node,形成一個單向鏈表,這個nextWaiter在哪里呢,它是通過node點出來的,也就是它也屬於node類的一個字段:
這說明了一個比較重要的問題:
AQS的阻塞隊列是以雙向的鏈表的形式保存的,是通過prev和next建立起關系的,但是AQS中的條件隊列是以單向鏈表的形式保存的,是通過nextWaiter建立起關系的,也就是AQS的阻塞隊列和AQS中的條件隊列並非同一個隊列。
第一個問題解決了,我們再來看第二個問題,第二個問題答案在await的第二個方法:
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
首先調用getState方法,這個state是什么,不知大家是否還有印象,對於ReentrantLock來說,state就是重入次數,隨后調用release方法,傳入state。也就是不管重入了多少次,這里是一次性把鎖完全釋放掉。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
可以看到釋放鎖還是調用了tryRelease方法,這個方法正是需要被重寫的。
當完成了前兩個方法的調用后,就會進行一個判斷isOnSyncQueue,一般來說會進入這個if,park這個線程,等待喚醒,這就解決了第三個問題。
下面我們再來看看signal方法,同樣的,我們需要解決幾個問題:
- AQS的條件隊列和阻塞隊列既然不是同一個隊列,那么是不是被await的線程永遠不會進入阻塞隊列?
- signal方法是如何喚醒線程的?
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
重點在於doSignal中的transferForSignal方法:
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
在這個方法中,我們會調用enq方法,把條件隊列的線程放入阻塞隊列中,然后調用unpark方法,喚醒線程。
本篇博客到這里也結束了。
經過上下兩篇博客,相信大家對AQS一定有了一個比較淺顯的理解。聰明的你,可以看出來,其實這兩篇博客有很多內容都沒有講透,甚至有點模棱兩可,只是“蜻蜓點水”,所以這也符合了我的標題:難以理解的AQS。的確,AQS要深入研究的話,不比線程池簡單多少。看,我又再給自己找理由了。希望經過今后的沉淀,我可以把這兩篇博客重寫下,然后換個標題“徹底理解AQS”,嘿嘿。