本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》,由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買,京東自營鏈接:http://item.jd.com/12299018.html
上節我們介紹了顯式鎖,本節介紹關聯的顯式條件,介紹其用法和原理。顯式條件也可以被稱做條件變量、條件隊列、或條件,后文我們可能會交替使用。
用法
基本概念和方法
鎖用於解決競態條件問題,條件是線程間的協作機制。顯式鎖與synchronzied相對應,而顯式條件與wait/notify相對應。wait/notify與synchronized配合使用,顯式條件與顯式鎖配合使用。
條件與鎖相關聯,創建條件變量需要通過顯式鎖,Lock接口定義了創建方法:
Condition newCondition();
Condition表示條件變量,是一個接口,它的定義為:
public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
await()對應於Object的wait(),signal()對應於notify,signalAll()對應於notifyAll(),語義也是一樣的。
與Object的wait方法類似,await也有幾個限定等待時間的方法,但功能更多一些:
//等待時間是相對時間,如果由於等待超時返回,返回值為false,否則為true boolean await(long time, TimeUnit unit) throws InterruptedException; //等待時間也是相對時間,但參數單位是納秒,返回值是nanosTimeout減去實際等待的時間 long awaitNanos(long nanosTimeout) throws InterruptedException; //等待時間是絕對時間,如果由於等待超時返回,返回值為false,否則為true boolean awaitUntil(Date deadline) throws InterruptedException;
這些await方法都是響應中斷的,如果發生了中斷,會拋出InterruptedException,但中斷標志位會被清空。Condition還定義了一個不響應中斷的等待方法:
void awaitUninterruptibly();
該方法不會由於中斷結束,但當它返回時,如果等待過程中發生了中斷,中斷標志位會被設置。
一般而言,與Object的wait方法一樣,調用await方法前需要先獲取鎖,如果沒有鎖,會拋出異常IllegalMonitorStateException。await在進入等待隊列后,會釋放鎖,釋放CPU,當其他線程將它喚醒后,或等待超時后,或發生中斷異常后,它都需要重新獲取鎖,獲取鎖后,才會從await方法中退出。
另外,與Object的wait方法一樣,await返回后,不代表其等待的條件就一定滿足了,通常要將await的調用放到一個循環內,只有條件滿足后才退出。
一般而言,signal/signalAll與notify/notifyAll一樣,調用它們需要先獲取鎖,如果沒有鎖,會拋出異常IllegalMonitorStateException。signal與notify一樣,挑選一個線程進行喚醒,signalAll與notifyAll一樣,喚醒所有等待的線程,但這些線程被喚醒后都需要重新競爭鎖,獲取鎖后才會從await調用中返回。
用法示例
ReentrantLock實現了newCondition方法,通過它,我們來看下條件的基本用法。我們實現與67節類似的例子WaitThread,一個線程啟動后,在執行一項操作前,等待主線程給它指令,收到指令后才執行,示例代碼為:
public class WaitThread extends Thread { private volatile boolean fire = false; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); @Override public void run() { try { lock.lock(); try { while (!fire) { condition.await(); } } finally { lock.unlock(); } System.out.println("fired"); } catch (InterruptedException e) { Thread.interrupted(); } } public void fire() { lock.lock(); try { this.fire = true; condition.signal(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { WaitThread waitThread = new WaitThread(); waitThread.start(); Thread.sleep(1000); System.out.println("fire"); waitThread.fire(); } }
需要特別注意的是,不要將signal/signalAll與notify/notifyAll混淆,notify/notifyAll是Object中定義的方法,Condition對象也有,稍不注意就會誤用,比如,對上面例子中的fire方法,可能會寫為:
public void fire() { lock.lock(); try { this.fire = true; condition.notify(); } finally { lock.unlock(); } }
寫成這樣,編譯器不會報錯,但運行時會拋出IllegalMonitorStateException,因為notify的調用不在synchronized語句內。
同樣,避免將鎖與synchronzied混用,那樣非常令人混淆,比如:
public void fire() { synchronized(lock){ this.fire = true; condition.signal(); } }
記住,顯式條件與顯式鎖配合,wait/notify與synchronized配合。
生產者/消費者模式
在67節,我們用wait/notify實現了生產者/消費者模式,我們提到了wait/notify的一個局限,它只能有一個條件等待隊列,分析等待條件也很復雜。在生產者/消費者模式中,其實有兩個條件,一個與隊列滿有關,一個與隊列空有關。使用顯式鎖,可以創建多個條件等待隊列。下面,我們用顯式鎖/條件重新實現下其中的阻塞隊列,代碼為:
static class MyBlockingQueue<E> { private Queue<E> queue = null; private int limit; private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); public MyBlockingQueue(int limit) { this.limit = limit; queue = new ArrayDeque<>(limit); } public void put(E e) throws InterruptedException { lock.lockInterruptibly(); try{ while (queue.size() == limit) { notFull.await(); } queue.add(e); notEmpty.signal(); }finally{ lock.unlock(); } } public E take() throws InterruptedException { lock.lockInterruptibly(); try{ while (queue.isEmpty()) { notEmpty.await(); } E e = queue.poll(); notFull.signal(); return e; }finally{ lock.unlock(); } } }
定義了兩個等待條件:不滿(notFull)、不空(notEmpty),在put方法中,如果隊列滿,則在noFull上等待,在take方法中,如果隊列空,則在notEmpty上等待,put操作后通知notEmpty,take操作后通知notFull。
這樣,代碼更為清晰易讀,同時避免了不必要的喚醒和檢查,提高了效率。Java並發包中的類ArrayBlockingQueue就采用了類似的方式實現。
實現原理
ConditionObject
理解了顯式條件的概念和用法,我們來看下ReentrantLock是如何實現它的,其newCondition()的代碼為:
public Condition newCondition() { return sync.newCondition(); }
sync是ReentrantLock的內部類對象,其newCondition()代碼為:
final ConditionObject newCondition() { return new ConditionObject(); }
ConditionObject是AQS中定義的一個內部類,不了解AQS請參看上節。ConditionObject的實現也比較復雜,我們通過一些主要代碼來簡要探討其實現原理。ConditionObject內部也有一個隊列,表示條件等待隊列,其成員聲明為:
//條件隊列的頭節點 private transient Node firstWaiter; //條件隊列的尾節點 private transient Node lastWaiter;
ConditionObject是AQS的成員內部類,它可以直接訪問AQS中的數據,比如AQS中定義的鎖等待隊列。
我們看下幾個方法的實現,先看await方法。
await實現分析
下面是await方法的代碼,我們通過添加注釋解釋其基本思路。
public final void await() throws InterruptedException { // 如果等待前中斷標志位已被設置,直接拋異常 if (Thread.interrupted()) throw new InterruptedException(); // 1.為當前線程創建節點,加入條件等待隊列 Node node = addConditionWaiter(); // 2.釋放持有的鎖 int savedState = fullyRelease(node); int interruptMode = 0; // 3.放棄CPU,進行等待,直到被中斷或isOnSyncQueue變為true // isOnSyncQueue為true表示節點被其他線程從條件等待隊列 // 移到了外部的鎖等待隊列,等待的條件已滿足 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 4.重新獲取鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 5.處理中斷,拋出異常或設置中斷標志位 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
awaitNanos實現分析
awaitNanos與await的實現是基本類似的,區別主要是會限定等待的時間,如下所示:
public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); long lastTime = System.nanoTime(); int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { //等待超時,將節點從條件等待隊列移到外部的鎖等待隊列 transferAfterCancelledWait(node); break; } //限定等待的最長時間 LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; long now = System.nanoTime(); //計算下次等待的最長時間 nanosTimeout -= now - lastTime; lastTime = now; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return nanosTimeout - (System.nanoTime() - lastTime); }
signal實現分析
signal方法代碼為:
public final void signal() { //驗證當前線程持有鎖 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //調用doSignal喚醒等待隊列中第一個線程 Node first = firstWaiter; if (first != null) doSignal(first); }
doSignal的代碼就不列舉了,其基本邏輯是:
- 將節點從條件等待隊列移到鎖等待隊列
- 調用LockSupport.unpark將線程喚醒
小結
本節介紹了顯式條件的用法和實現原理。它與顯式鎖配合使用,與wait/notify相比,可以支持多個條件隊列,代碼更為易讀,效率更高,使用時注意不要將signal/signalAll誤寫為notify/notifyAll。
從70節到本節,我們介紹了Java並發包的基礎 - 原子變量和CAS、顯式鎖和條件,基於這些,Java並發包還提供了很多更為易用的高層數據結構、工具和服務,從下一節開始,我們先探討一些並發數據結構。
(與其他章節一樣,本節所有代碼位於 https://github.com/swiftma/program-logic)
----------------
未完待續,查看最新文章,敬請關注微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及計算機技術的本質。用心原創,保留所有版權。