Java多線程——Condition條件


簡介

Condition中的await()方法相當於Object的wait()方法,Condition中的signal()方法相當於Object的notify()方法,Condition中的signalAll()相當於Object的notifyAll()方法。

不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步鎖"(synchronized關鍵字)捆綁使用的;而Condition是需要與"互斥鎖"/"共享鎖"捆綁使用的。

簡單應用:

 

Condition的實現分析

Condition是同步器AbstractQueuedSynchronized的內部類,因為Condition的操作需要獲取相關的鎖,所以作為同步器的內部類比較合理。每個Condition對象都包含着一個隊列(等待隊列),該隊列是Condition對象實現等待/通知功能的關鍵。

等待隊列:

等待隊列是一個FIFO的隊列,隊列的每一個節點都包含了一個線程引用,該線程就是在Condition對象上等待的線程,如果一個線程調用了await()方法,該線程就會釋放鎖、構造成節點進入等待隊列並進入等待狀態。

這里的節點定義也就是AbstractQueuedSynchronizer.Node的定義。

一個Condition包含一個等待隊列,Condition擁有首節點(firstWaiter)和尾節點(lastWaiter)。當前線程調用Condition.await()方法時,將會以當前線程構造節點,並將節點從尾部加入等待隊列。

在Object的監視器模型上,一個對象擁有一個同步隊列和等待隊列,而Lock(同步器)擁有一個同步隊列和多個等待隊列。

等待(await):

調用Condition的await()方法,會使當前線程進入等待隊列並釋放鎖,同時線程狀態變為等待狀態。

從隊列的角度來看,相當於同步隊列的首節點(獲取了鎖的節點)移動到Condition的等待隊列中。

當等待隊列中的節點被喚醒,則喚醒節點的線程開始嘗試獲取同步狀態。如果不是通過Condition.signal()方法喚醒,而是對等待線程進行中斷,則拋出InterruptedException。

復制代碼+ View code
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加至等待隊列中
    Node node = addConditionWaiter();
    // 釋放同步狀態,釋放鎖
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
復制代碼

通知(signal):

調用Condition的signal()方法,將會喚醒在等待隊列中從首節點開始搜索未解除Condition的節點,在喚醒節點之前,會將節點移到同步隊列中。

Condition的signalAll()方法,相當於對等待隊列中的每個節點均執行一次signal()方法,將等待隊列中的節點全部移動到同步隊列中,並喚醒每個節點的線程。

復制代碼+ View code
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}
復制代碼

栗子

經典問題,消費者/生產者:

復制代碼+ View code
package ConsumerAndProduce;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by zhengbinMac on 2017/2/20.
 */
class Depot {
    private int capacity;
    private int size;
    private Lock lock;
    private Condition consumerCond;
    private Condition produceCond;

    public Depot(int capacity) {
        this.capacity = capacity;
        this.size = 0;
        this.lock = new ReentrantLock();
        this.consumerCond = lock.newCondition();
        this.produceCond = lock.newCondition();
    }

    public void produce(int val) {
        lock.lock();
        try {
            int left = val;
            while (left > 0) {
                while (size >= capacity) {
                    produceCond.await();
                }
                int produce = (left+size) > capacity ? (capacity-size) : left;
                size += produce;
                left -= produce;
                System.out.println(Thread.currentThread().getName() + ", ProduceVal=" + val + ", produce=" + produce + ", size=" + size);
                consumerCond.signalAll();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void consumer(int val) {
        lock.lock();
        try {
            int left = val;
            while (left > 0) {
                while (size <= 0) {
                    consumerCond.await();
                }
                int consumer = (size <= left) ? size : left;
                size -= consumer;
                left -= consumer;
                System.out.println(Thread.currentThread().getName() + ", ConsumerVal=" + val + ", consumer=" + consumer + ", size=" + size);
                produceCond.signalAll();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
class Consumer {
    private Depot depot;
    public Consumer(Depot depot) {
        this.depot = depot;
    }

    public void consumerThing(final int amount) {
        new Thread(new Runnable() {
            public void run() {
                depot.consumer(amount);
            }
        }).start();
    }
}
class Produce {
    private Depot depot;
    public Produce(Depot depot) {
        this.depot = depot;
    }

    public void produceThing(final int amount) {
        new Thread(new Runnable() {
            public void run() {
                depot.produce(amount);
            }
        }).start();
    }
}
public class Entrepot {
    public static void main(String[] args) {
        // 倉庫
        Depot depot = new Depot(100);
        // 消費者
        Consumer consumer = new Consumer(depot);
        // 生產者
        Produce produce = new Produce(depot);
        produce.produceThing(5);
        consumer.consumerThing(5);
        produce.produceThing(2);
        consumer.consumerThing(5);
        produce.produceThing(3);
    }
}
復制代碼

某次輸出:

復制代碼+ View code
Thread-0, ProduceVal=5, produce=5, size=5
Thread-1, ConsumerVal=5, consumer=5, size=0
Thread-2, ProduceVal=2, produce=2, size=2
Thread-3, ConsumerVal=5, consumer=2, size=0
Thread-4, ProduceVal=3, produce=3, size=3
Thread-3, ConsumerVal=5, consumer=3, size=0
復制代碼

輸出結果中,Thread-3出現兩次,就是因為要消費5個產品,但倉庫中只有2個產品,所以先將庫存的2個產品全部消費,然后這個線程進入等待隊列,等待生產,隨后生產出了3個產品,生產者生產后又執行signalAll方法將等待隊列中所有的線程都喚醒,Thread-3繼續消費還需要的3個產品。

參考資料:

《Java並發編程的藝術》 - 5.6 Condition接口

Java多線程系列--“JUC鎖”06之 Condition條件


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM