Java 線程間通信 —— 等待 / 通知機制



本文部分摘自《Java 並發編程的藝術》


volatile 和 synchronize 關鍵字

每個處於運行狀態的線程,如果僅僅是孤立地運行,那么它產生的作用很小,如果多個線程能夠相互配合完成工作,則將帶來更大的價值

Java 支持多個線程同時訪問一個對象或者對象的成員變量,使用 volatile 關鍵字可以保證被修飾變量的可見性,意味着任一線程對該變量的任何修改,其他線程都可以立即感知到

synchronize 關鍵字可以修飾方法或者同步塊,它主要確保多個線程在同一時刻,只能有一個線程處於方法或者同步塊中,它保證了線程對變量訪問的可見性和排他性。synchronize 關鍵字的實現,本質是對一個對象的監視器(monitor)進行獲取,而這個獲取過程是排他的,也就是同一時刻只能有一個線程獲取到由 synchronize 所保護對象的監視器

任何一個對象都擁有自己的監視器,任意一個線程對 Object 的訪問(Object 由 synchronize 保護)的訪問,首先要獲得 Object 的監視器。如果獲取失敗,線程進入同步隊列,線程狀態變為 BLOCKED。當訪問 Object 的前驅(獲得了鎖的線程)釋放了鎖,則該釋放操作將喚醒阻塞在同步隊列中的線程,使其重新嘗試獲取監視器


等待 - 通知機制

一個線程修改了一個對象的值,另一個線程感知到變化,然后進行相應的操作,前者是生產者,后者是消費者,這種通信方式實現了解耦,更具伸縮性。在 Java 中為了實現類似的功能,我們可以讓消費者線程不斷地循環檢查變量是否符合預期,條件滿足則退出循環,從而完成消費者的工作

while(value != desire) {
    Thread.sleep(1000);
}
doSomething();

睡眠一段時間的目的是防止過快的無效嘗試,這種實現方式看似能滿足需求,但存在兩個問題:

  • 難以確保及時性

    如果睡眠時間太長,就難以及時發現條件已經變化

  • 難以降低開銷

    如果降低睡眠時間,又會消耗更多的處理器資源

使用 Java 提供了內置的等待 - 通知機制能夠很好地解決上述問題,等待 - 通知的相關方法是任意 Java 對象都具備的

方法名稱 描述
notify() 通知一個在對象上等待的線程,使其從 wait() 方法返回,返回的前提是該線程獲取到了對象的鎖
notifyAll() 通知所有等待在該對象上的線程
wait() 調用該方法的線程進入 WAITING 狀態,只有等待另外的線程通知或被中斷才返回,調用此方法會釋放對象的鎖
wait(long) 超時等待一段時間,參數時間是毫秒
wait(long, int) 對於超時時間更細粒度的控制,可以達到納秒

等待 - 通知機制,是指一個線程 A 調用了對象 O 的 wait() 方法進入等待狀態,而另一個線程 B 調用了對象 O 的 notify() 或者 notifyAll() 方法,線程 A 收到通知后從對象 O 的 wait() 方法返回,進而執行后續操作。上述兩個線程通過對象 O 來完成交互,而對象上的 wait() 和 notify/notifyAll() 的關系就如同開關信號一樣,用來完成等待方和通知方之間的交互工作

下述例子中,創建兩個線程 WaitThread 和 NotifyThread,前者檢查 flag 值是否為 false,如果符合要求,進行后續操作,否則在 lock 上等待,后者在睡眠一段時間后對 lock 進行通知

public class WaitNotify {

    static boolean flag = true;
    static Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify(), "NotifyThread");
        notifyThread.start();
    }

    static class Wait implements Runnable {

        @Override
        public void run() {
            // 加鎖,擁有 lock 的 Monitor
            synchronized (lock) {
                // 繼續 wait,同時釋放 lock 的鎖
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread() + "flag is true. wait @ "
                                + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 完成工作
                System.out.println(Thread.currentThread() + "flag is false. running @ "
                    + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            }
        }
    }

    static class Notify implements Runnable {

        @Override
        public void run() {
            // 加鎖,擁有 lock 的 Monitor
            synchronized (lock) {
                // 獲取 lock 的鎖,然后進行通知,通知時不會釋放 lock 的鎖
                // 直到當前線程釋放 lock 后,WaitThread 才能從 wait 方法中返回
                System.out.println(Thread.currentThread() + " hold lock. notify @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                lock.notifyAll();
                flag = false;
                SleepUtils.second(5);
            }
            // 再次加鎖
            synchronized (lock) {
                System.out.println(Thread.currentThread() + " hold lock again. sleep @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                SleepUtils.second(5);
            }
        }
    }
}

運行結果如下

上述結果的第三行和第四行順序可能會互換,下面簡單描述一下代碼的執行過程

  1. WaitThread 線程先啟動,NotifyThread 線程后啟動,由於中間有睡眠一秒的操作,所以 WaitThread 線程首先獲得鎖
  2. WaitThread 線程循環判斷條件是否滿足,不滿足則調用執行 lock.wait() 方法,釋放 lock 對象上的鎖,進入 lock 對象的等待隊列中,進入等待狀態
  3. 由於 WaitThread 線程釋放了鎖,所以 NotifyThread 獲得 lock 對象上的鎖,執行 lock.notifyAll() 方法,但並不會立即釋放鎖,只是通知所有等待在 lock 上的線程可以參與競爭鎖了(notify 也同理),並把 flag 設為 false,本段代碼執行結束,NotifyThread 線程釋放鎖,此時 WaitThread 線程和 NotifyThread 線程共同競爭 lock 的鎖
  4. 無論誰先拿到鎖,WaitThread 線程和 NotifyThread 線程都能順利完成任務

等待 - 通知機制的經典范式

從上節的內容中,我們可以提煉出等待 - 通知機制的經典范式,該范式分為兩部分,分別針對等待方(消費方)和通知方(生產者)

等待方遵循如下原則:

  • 獲取對象上的鎖
  • 如果條件不滿足,調用對象的 wait() 方法,被通知后仍要檢查條件
  • 條件滿足則執行對應的邏輯

偽代碼如下:

synchronized(對象) {
	while(條件不滿足) {
    	對象.wait();
    }
    對應的處理邏輯
}

通知方遵循如下原則:

  • 獲取對象上的鎖
  • 改變條件
  • 通知所有等待在對象上的線程

偽代碼如下:

synchronized(對象) {
	改變條件
    對象.notifyAll();
}

Thread.join() 的使用

如果一個線程 A 執行了 thread.join() 語句,其含義是:當前線程 A 等待 thread 線程終止之后才從 thread.join() 返回

下面的例子中,創建 10 個線程,編號為 0 ~ 9,每個線程線程調用前一個線程的 join() 方法,也就是線程 0 結束了,線程 1 才能從 join() 方法中返回,而線程 0 需要等待 main 線程結束

public class Join {

    public static void main(String[] args) throws InterruptedException {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; i++) {
            // 每個線程擁有前一個線程的引用,需要等待前一個線程終止,才能從等待中返回
            Thread thread = new Thread(new Domino(previous), String.valueOf(i));
            thread.start();
            previous = thread;
        }
        TimeUnit.SECONDS.sleep(5);
        System.out.println(Thread.currentThread().getName() + " terminate.");
    }

    static class Domino implements Runnable {

        private Thread thread;

        public Domino(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " terminate.");
        }
    }
}

輸出如下

從上述輸出可以看到,每個線程等待前驅線程終止后,才從 join() 方法返回,這里涉及了等待 - 通知機制(等待前驅線程結束,接收前驅線程結束通知)

Thread.join() 源碼簡化如下

public final synchronized void join(long millis) throws InterruptedException {
    	// 省略前面代碼...
        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        }
    	// 省略后面代碼...
    }

假設當前線程是 main 線程,main 線程調用 thread.join() 方法,則 main 線程會獲取 thread 線程對象上的鎖,並循環判斷 thread 線程是否還存活,如果存活,調用 wait 方法釋放鎖並等待,如果 thread 線程已經結束,則 thread 線程會自動調用自身 notifyAll() 方法通知所有等待在自身線程對象上的線程,則 main 線程可以繼續執行后續操作



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM