Java編程的邏輯 (72) - 顯式條件


本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》,由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買,京東自營鏈接http://item.jd.com/12299018.html


上節我們介紹了顯式鎖,本節介紹關聯的顯式條件,介紹其用法和原理。顯式條件也可以被稱做條件變量、條件隊列、或條件,后文我們可能會交替使用。

用法

基本概念和方法

鎖用於解決競態條件問題,條件是線程間的協作機制。顯式鎖與synchronzied相對應,而顯式條件與wait/notify相對應。wait/notify與synchronized配合使用,顯式條件與顯式鎖配合使用。

條件與鎖相關聯,創建條件變量需要通過顯式鎖,Lock接口定義了創建方法:

Condition newCondition();

Condition表示條件變量,是一個接口,它的定義為:

public interface Condition {
  void await() throws InterruptedException;
  void awaitUninterruptibly();
  long awaitNanos(long nanosTimeout) throws InterruptedException;
  boolean await(long time, TimeUnit unit) throws InterruptedException;
  boolean awaitUntil(Date deadline) throws InterruptedException;
  void signal();
  void signalAll();
}

await()對應於Object的wait(),signal()對應於notify,signalAll()對應於notifyAll(),語義也是一樣的。

與Object的wait方法類似,await也有幾個限定等待時間的方法,但功能更多一些:

//等待時間是相對時間,如果由於等待超時返回,返回值為false,否則為true
boolean await(long time, TimeUnit unit) throws InterruptedException;
//等待時間也是相對時間,但參數單位是納秒,返回值是nanosTimeout減去實際等待的時間
long awaitNanos(long nanosTimeout) throws InterruptedException;
//等待時間是絕對時間,如果由於等待超時返回,返回值為false,否則為true
boolean awaitUntil(Date deadline) throws InterruptedException;

這些await方法都是響應中斷的,如果發生了中斷,會拋出InterruptedException,但中斷標志位會被清空。Condition還定義了一個不響應中斷的等待方法:

void awaitUninterruptibly();

該方法不會由於中斷結束,但當它返回時,如果等待過程中發生了中斷,中斷標志位會被設置。

一般而言,與Object的wait方法一樣,調用await方法前需要先獲取鎖,如果沒有鎖,會拋出異常IllegalMonitorStateException。await在進入等待隊列后,會釋放鎖,釋放CPU,當其他線程將它喚醒后,或等待超時后,或發生中斷異常后,它都需要重新獲取鎖,獲取鎖后,才會從await方法中退出。

另外,與Object的wait方法一樣,await返回后,不代表其等待的條件就一定滿足了,通常要將await的調用放到一個循環內,只有條件滿足后才退出。

一般而言,signal/signalAll與notify/notifyAll一樣,調用它們需要先獲取鎖,如果沒有鎖,會拋出異常IllegalMonitorStateException。signal與notify一樣,挑選一個線程進行喚醒,signalAll與notifyAll一樣,喚醒所有等待的線程,但這些線程被喚醒后都需要重新競爭鎖,獲取鎖后才會從await調用中返回。

用法示例

ReentrantLock實現了newCondition方法,通過它,我們來看下條件的基本用法。我們實現與67節類似的例子WaitThread,一個線程啟動后,在執行一項操作前,等待主線程給它指令,收到指令后才執行,示例代碼為:

public class WaitThread extends Thread {
    private volatile boolean fire = false;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    @Override
    public void run() {
        try {
            lock.lock();
            try {
                while (!fire) {
                    condition.await();
                }
            } finally {
                lock.unlock();
            }
            System.out.println("fired");
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    public void fire() {
        lock.lock();
        try {
            this.fire = true;
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        WaitThread waitThread = new WaitThread();
        waitThread.start();
        Thread.sleep(1000);
        System.out.println("fire");
        waitThread.fire();
    }
}

需要特別注意的是,不要將signal/signalAll與notify/notifyAll混淆,notify/notifyAll是Object中定義的方法,Condition對象也有,稍不注意就會誤用,比如,對上面例子中的fire方法,可能會寫為:

public void fire() {
    lock.lock();
    try {
        this.fire = true;
        condition.notify();
    } finally {
        lock.unlock();
    }
}

寫成這樣,編譯器不會報錯,但運行時會拋出IllegalMonitorStateException,因為notify的調用不在synchronized語句內。

同樣,避免將鎖與synchronzied混用,那樣非常令人混淆,比如:

public void fire() {
    synchronized(lock){
        this.fire = true;
        condition.signal();
    }
}

記住,顯式條件與顯式鎖配合,wait/notify與synchronized配合。

生產者/消費者模式

67節,我們用wait/notify實現了生產者/消費者模式,我們提到了wait/notify的一個局限,它只能有一個條件等待隊列,分析等待條件也很復雜。在生產者/消費者模式中,其實有兩個條件,一個與隊列滿有關,一個與隊列空有關。使用顯式鎖,可以創建多個條件等待隊列。下面,我們用顯式鎖/條件重新實現下其中的阻塞隊列,代碼為:

static class MyBlockingQueue<E> {
    private Queue<E> queue = null;
    private int limit;
    private Lock lock = new ReentrantLock();
    private Condition notFull  = lock.newCondition();
    private Condition notEmpty = lock.newCondition();


    public MyBlockingQueue(int limit) {
        this.limit = limit;
        queue = new ArrayDeque<>(limit);
    }

    public void put(E e) throws InterruptedException {
        lock.lockInterruptibly();
        try{
            while (queue.size() == limit) {
                notFull.await();
            }
            queue.add(e);
            notEmpty.signal();    
        }finally{
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        lock.lockInterruptibly();
        try{
            while (queue.isEmpty()) {
                notEmpty.await();
            }
            E e = queue.poll();
            notFull.signal();
            return e;    
        }finally{
            lock.unlock();
        }
    }
}

定義了兩個等待條件:不滿(notFull)、不空(notEmpty),在put方法中,如果隊列滿,則在noFull上等待,在take方法中,如果隊列空,則在notEmpty上等待,put操作后通知notEmpty,take操作后通知notFull。

這樣,代碼更為清晰易讀,同時避免了不必要的喚醒和檢查,提高了效率。Java並發包中的類ArrayBlockingQueue就采用了類似的方式實現。

實現原理
ConditionObject
理解了顯式條件的概念和用法,我們來看下ReentrantLock是如何實現它的,其newCondition()的代碼為:

public Condition newCondition() {
    return sync.newCondition();
}

sync是ReentrantLock的內部類對象,其newCondition()代碼為:

final ConditionObject newCondition() {
    return new ConditionObject();
}

ConditionObject是AQS中定義的一個內部類,不了解AQS請參看上節。ConditionObject的實現也比較復雜,我們通過一些主要代碼來簡要探討其實現原理。ConditionObject內部也有一個隊列,表示條件等待隊列,其成員聲明為:

//條件隊列的頭節點
private transient Node firstWaiter;
//條件隊列的尾節點
private transient Node lastWaiter;

ConditionObject是AQS的成員內部類,它可以直接訪問AQS中的數據,比如AQS中定義的鎖等待隊列。

我們看下幾個方法的實現,先看await方法。

await實現分析

下面是await方法的代碼,我們通過添加注釋解釋其基本思路。

public final void await() throws InterruptedException {
    // 如果等待前中斷標志位已被設置,直接拋異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 1.為當前線程創建節點,加入條件等待隊列
    Node node = addConditionWaiter();
    // 2.釋放持有的鎖
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 3.放棄CPU,進行等待,直到被中斷或isOnSyncQueue變為true
    // isOnSyncQueue為true表示節點被其他線程從條件等待隊列
    // 移到了外部的鎖等待隊列,等待的條件已滿足
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 4.重新獲取鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 5.處理中斷,拋出異常或設置中斷標志位
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

awaitNanos實現分析

awaitNanos與await的實現是基本類似的,區別主要是會限定等待的時間,如下所示:

public final long awaitNanos(long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    long lastTime = System.nanoTime();
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            //等待超時,將節點從條件等待隊列移到外部的鎖等待隊列
            transferAfterCancelledWait(node);
            break;
        }
        //限定等待的最長時間
        LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;

        long now = System.nanoTime();
        //計算下次等待的最長時間
        nanosTimeout -= now - lastTime;
        lastTime = now;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return nanosTimeout - (System.nanoTime() - lastTime);
}

signal實現分析

signal方法代碼為:

public final void signal() {
    //驗證當前線程持有鎖
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //調用doSignal喚醒等待隊列中第一個線程
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

doSignal的代碼就不列舉了,其基本邏輯是:

  1. 將節點從條件等待隊列移到鎖等待隊列
  2. 調用LockSupport.unpark將線程喚醒

小結

本節介紹了顯式條件的用法和實現原理。它與顯式鎖配合使用,與wait/notify相比,可以支持多個條件隊列,代碼更為易讀,效率更高,使用時注意不要將signal/signalAll誤寫為notify/notifyAll。

70節到本節,我們介紹了Java並發包的基礎 - 原子變量和CAS、顯式鎖和條件,基於這些,Java並發包還提供了很多更為易用的高層數據結構、工具和服務,從下一節開始,我們先探討一些並發數據結構。

(與其他章節一樣,本節所有代碼位於 https://github.com/swiftma/program-logic)

----------------

未完待續,查看最新文章,敬請關注微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及計算機技術的本質。用心原創,保留所有版權。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM