Java並發——使用Condition線程間通信


線程間通信

線程之間除了同步互斥,還要考慮通信。在Java5之前我們的通信方式為:wait 和 notify。Condition的優勢是支持多路等待,即可以定義多個Condition,每個condition控制線程的一條執行通路。傳統方式只能是一路等待

Condition提供不同於Object 監視器方法的行為和語義,如受保證的通知排序,或者在執行通知時不需要保持一個鎖。

 

Condition接口

Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成截然不同的對象,以便通過將這些對象與任意 Lock 實現組合使用,為每個對象提供多個等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。

public interface Condition {
    void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }

說明:

await()方法會使當前線程等待,同時釋放當前鎖,當其他線程中使用signal()時或者signalAll()方法時,線程會重新獲得鎖並繼續執行。或者當線程被中斷時,也能跳出等待。這和Object.wait()方法很相似。

awaitUninterruptibly()方法與await()方法基本相同,但awaitUninterruptibly()方法不會在等待過程中響應中斷。

singal()方法用於喚醒一個在等待中的線程。相對的singalAll()方法會喚醒所有在等待中的線程。這和Obejct.notify()方法類似。

condition.await()方法必須在lock.lock()與lock.unlock()方法之間調用。

 

獲取Condition

Condition實例實質上被綁定到一個鎖上。一個鎖內部可以有多個Condition,即有多路等待和通知。要為特定 Lock 實例獲得 Condition 實例,請使用其 newCondition() 方法。

Condition newCondition() 返回用來與當前Lock實例一起使用的Condition 實例。

類似於 object.wait()和object.notify()的功能。object.wait()與object.notify()需要結合synchronized使用。Condition需要結合ReentrantLock使用。

 

"虛假喚醒"

所謂"虛假喚醒",即其他地方的代碼觸發了condition.signal(),喚醒condition上等待的線程。但被喚醒的線程仍然不滿足執行條件。

condition通常與條件語句一起使用:

if(!條件){
    condition.await(); //不滿足條件,當前線程等待;
}

更好的方法是使用while:

while(!條件){
    condition.await(); //不滿足條件,當前線程等待;
}

在等待Condition時,允許發生"虛假喚醒",這通常作為對基礎平台語義的讓步。若使用"if(!條件)"則被"虛假喚醒"的線程可能繼續執行。所以"while(!條件)"可以防止"虛假喚醒"。建議總是假定這些"虛假喚醒"可能發生,因此總是在一個循環中等待。

 

例:緩沖隊列的實現。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();// 鎖對象
    final Condition notFull = lock.newCondition(); //寫線程條件
    final Condition notEmpty = lock.newCondition();//讀線程條件
    final Object[] items = new Object[100];// 初始化一個長度為100的隊列
    int putptr/* 寫索引 */, takeptr/* 讀索引 */, count/* 隊列中存在的數據個數 */;

    public void put(Object x) throws InterruptedException {
        lock.lock(); //獲取鎖
        try {
            while (count == items.length)
                notFull.await();// 當計數器count等於隊列的長度時,不能再插入,因此等待。阻塞寫線程。
            items[putptr] = x;//賦值
            putptr++;

            if (putptr == items.length)
                putptr = 0;// 若寫索引寫到隊列的最后一個位置了,將putptr置為0。
            count++; // 每放入一個對象就將計數器加1。
            notEmpty.signal(); // 一旦插入就喚醒取數據線程。
        } finally {
            lock.unlock(); // 最后釋放鎖
        }
    }

    public Object take() throws InterruptedException {
        lock.lock(); // 獲取鎖
        try {
            while (count == 0)
                notEmpty.await(); // 如果計數器等於0則等待,即阻塞讀線程。
            Object x = items[takeptr]; // 取值
            takeptr++;
            if (takeptr == items.length)
                takeptr = 0; //若讀鎖應讀到了隊列的最后一個位置了,則讀鎖應置為0;即當takeptr達到隊列長度時,從零開始取
            count++; // 每取一個將計數器減1。
            notFull.signal(); //枚取走一個就喚醒存線程。
            return x;
        } finally {
            lock.unlock();// 釋放鎖
        }
    }

}

此即Condition的強大之處,假設緩存隊列中已經存滿,那么阻塞的肯定是寫線程,喚醒的肯定是讀線程,相反,阻塞的肯定是讀線程,喚醒的肯定是寫線程,那么假設只有一個Condition會有什么效果呢,緩存隊列中已經存滿,這個Lock不知道喚醒的是讀線程還是寫線程了,如果喚醒的是讀線程,皆大歡喜,如果喚醒的是寫線程,那么線程剛被喚醒,又被阻塞了,這時又去喚醒,這樣就浪費了很多時間。

 

 

例:經典問題:三個線程依次打印ABC,代碼示例如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Business {
    private Lock lock = new ReentrantLock();
    private Condition conditionA = lock.newCondition();
    private Condition conditionB = lock.newCondition();
    private Condition conditionC = lock.newCondition();
    private String type = "A"; //內部狀態

    /*
     * 方法的基本要求為:
     * 1、該方法必須為原子的。
     * 2、當前狀態必須滿足條件。若不滿足,則等待;滿足,則執行業務代碼。
     * 3、業務執行完畢后,修改狀態,並喚醒指定條件下的線程。
     */
    public void printA() {
        lock.lock(); //鎖,保證了線程安全。
        try {
            while (type != "A") { //type不為A,
                try {
                    conditionA.await(); //將當前線程阻塞於conditionA對象上,將被阻塞。
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //type為A,則執行。
            System.out.println(Thread.currentThread().getName() + " 正在打印A");
            type = "B"; //將type設置為B。
            conditionB.signal(); //喚醒在等待conditionB對象上的一個線程。將信號傳遞出去。
        } finally {
            lock.unlock(); //解鎖
        }
    }

    public void printB() {
        lock.lock(); //
        try {
            while (type != "B") { //type不為B,
                try {
                    conditionB.await(); //將當前線程阻塞於conditionB對象上,將被阻塞。
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //type為B,則執行。
            System.out.println(Thread.currentThread().getName() + " 正在打印B");
            type = "C"; //將type設置為C。
            conditionC.signal(); //喚醒在等待conditionC對象上的一個線程。將信號傳遞出去。
        } finally {
            lock.unlock(); //解鎖
        }
    }

    public void printC() {
        lock.lock(); //
        try {
            while (type != "C") {
                try {
                    conditionC.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println(Thread.currentThread().getName() + " 正在打印C");
            type = "A";
            conditionA.signal();
        } finally {
            lock.unlock(); //解鎖
        }
    }
}


public class Test{

    public static void main(String[] args) {
        final Business business = new Business();//業務對象。

        //線程1號,打印10次A。
        Thread ta = new Thread(new Runnable() {

            @Override
            public void run() {
                for(int i=0;i<10;i++){
                    business.printA();
                }
            }
        });

        //線程2號,打印10次B。
        Thread tb = new Thread(new Runnable() {

            @Override
            public void run() {
                for(int i=0;i<10;i++){
                    business.printB();
                }
            }
        });

        //線程3號,打印10次C。
        Thread tc = new Thread(new Runnable() {

            @Override
            public void run() {
                for(int i=0;i<10;i++){
                    business.printC();
                }
            }
        });

        //執行3條線程。
        ta.start();
        tb.start();
        tc.start();
    }

}

執行代碼,控制台依次顯示了A、B、C,10次。可以看到3條線程之間共享Business類中的資源type,且3條線程之間進行了有效的協調。

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM