Java並發之等待/通知機制



1 前言

本篇文章默認大家對synchronizedReentrantLock有一定了解。

1.1 先來段代碼放松一下

下面一段簡單的代碼,主要是通過3個線程對count進行累計來進行模擬多線程的場景。

/**
 * zhongxianyao
 */
public class Test {
    private static final int N = 3;
    private int count = 0;

    public void doSomething() {
        // 實際業務中,這里可能是遠程獲取數據之類的耗時操作
        for (int i=0; i<1000_000; i++) {
            synchronized (this) {
                count ++;
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Test test = new Test();
        for (int i=0; i<N; i++) {
            Runnable runnable = () -> test.doSomething();
            new Thread(runnable).start();
        }

        Thread.sleep(1000);
        System.out.println(test.count);
    }

}

在多線程編程中,一旦調用start()后,什么時候真正分配CPU時間片運行是不確定的,運行多久也是不確定的,所以有時候可能根據經驗,預估一下程序的運行時間,然后進行sleep,最后獲取結果。但這種方式太low了,有沒有那么一種方式,當程序獲取到結果后進行通知呢?下面將引出今天要講的等待/通知機制。


2 Object wait()/notify()

2.1 一段入門代碼

先來一段代碼看一下wait()/notify()的基本用法

/**
 * zhongxianyao
 */
public class Test {

    private static final int N = 3;
    private int count = 0;
    private int finishCount = 0;

    public void doSomething() {
        for (int i=0; i<1000_000; i++) {
            synchronized (this) {
                count ++;
            }
        }
        synchronized (this) {
            finishCount ++;
            notify();
        }
    }

    public static void main(String[] args) {
        Test test = new Test();
        for (int i=0; i<N; i++) {
            Runnable runnable = () -> test.doSomething();
            new Thread(runnable).start();
        }

        synchronized (test) {
            try {
                while (test.finishCount != N) {
                    test.wait();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        System.out.println(test.count);
    }

}

結果輸出3000000,結果是正確,是自己想要的。

2.2 問題三連擊

a.為什么官方說wait() 要放在while里面?

接口描述如下

As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop:
     synchronized (obj) {
         while (<condition does not hold>)
             obj.wait();
         ... // Perform action appropriate to condition
     }

翻譯一下:在一個論點版本中,中斷跟虛假喚醒是可能,所以這個方法應始終放在一個循環中。

加上一句自己的解釋:一般在項目中,一個線程不可能無緣無故等待,總是需要在某種條件下進行等待,而且其他線程喚醒這個線程的時候,可能用的是notifyAll(),數據被其他線程消費了,這里需要在判斷一下是否滿足特定的條件再繼續運行。

b.為什么wait()必須在同步方法/代碼塊中調用?

解釋1:wait()本身設計的邏輯就是在釋放鎖進行等待,如果沒有獲取鎖,談何釋放。

解釋2:通常在wait()的方法前面都會有while語句的判斷,在這兩條語句中會有時間間隔,可能會破壞程序,需要加上synchronized同步代碼塊來保證原子操作。

c.為什么wait(), notify() 和 notifyAll()是定義在Object里面而不是在Thread里面?

因為wait()等方法都是鎖級別操作,再者Java提供的鎖都是對象級別的而不是線程級別的,每個對象都有鎖。如果wait()方法定義在Thread類中,線程正在等待的是哪個鎖就不明顯了。

2.3 wait(long timeout)

在上面的例子中,如果notify();那行代碼刪除,wait()改為wait(100),如下,那么程序是否可以獲取到正確的結果呢?

/**
 * zhongxianyao
 */
public class Test {

    private static final int N = 3;
    private int count = 0;
    private int finishCount = 0;

    public void doSomething() {
        for (int i=0; i<1000_000; i++) {
            synchronized (this) {
                count ++;
            }
        }
        synchronized (this) {
            finishCount ++;
            //notify();
        }
    }

    public static void main(String[] args) {
        Test test = new Test();
        for (int i=0; i<N; i++) {
            Runnable runnable = () -> test.doSomething();
            new Thread(runnable).start();
        }

        synchronized (test) {
            try {
                while (test.finishCount != N) {
                    test.wait(100);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        System.out.println(test.count);
    }

}

運行結果是3000000,是正確的結果,看了一下文檔,發現這個字段跟直覺理解的不一樣,直覺告訴我,這個是最長等多久,等太久了就InterruptedException,結果不是。這個方法設置的時間,是說等待多久就喚醒自己。


3 Condition await()/signal()

3.1 用Condition進行替換

下面的代碼,把前一個例子中的synchronized代碼塊,換成了lock()/unlock,notify()換成了condition.signal(),wait()換成了condition.await()。運行結果也是正確的。

/**
 * zhongxianyao
 */
public class Test {

    private static final int N = 3;
    private int count = 0;
    private int finishCount = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void doSomething() {
        for (int i=0; i<1000_000; i++) {
            synchronized (this) {
                count ++;
            }
        }
        lock.lock();
        finishCount ++;
        if (finishCount == N) {
            condition.signal();
        }
        lock.unlock();
    }

    public static void main(String[] args) {
        Test test = new Test();
        for (int i=0; i<N; i++) {
            Runnable runnable = () -> test.doSomething();
            new Thread(runnable).start();
        }

        test.lock.lock();
        try {
            test.condition.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            test.lock.unlock();
        }

        System.out.println(test.count);
    }

}

3.2 signal()方法后不建議添加邏輯

public class ConditionTest {

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        new Thread(() -> {
            try {
                long time = System.currentTimeMillis();
                lock.lock();
                System.out.println("await start");
                condition.await();
                System.out.println("await end " + (System.currentTimeMillis() - time) + "ms");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "Thread-await").start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        new Thread(() -> {
            try {
                lock.lock();
                System.out.println("signal start");
                TimeUnit.SECONDS.sleep(5);
                condition.signal();
                System.out.println("signal end");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
                System.out.println("signal unlock");
            }
        }, "Thread-signal").start();

    }
}

多次運行,結果都是一樣的,如下:

await start
signal start
signal end
signal unlock
await end 5005ms

從運行結果可以看出,await()后,鎖就釋放了,但signal()后,鎖不釋放,一定要在unlock()之后,鎖才釋放,await()才會往下執行。

既然喚醒了其他線程,又不釋放鎖,可以調整喚醒的時機。一般在實際代碼中,也是不建議signal()方法后添加邏輯,應該直接釋放鎖。

同理,上面的notify()也是在synchronized代碼塊結束后,wait()后面的語句才能真正執行。

3.3 boolean await(long time, TimeUnit unit)

把上面的condition.await()改為condition.await(1, TimeUnit.SECONDS),然后獲取返回值,運行結果返回的是false

這個時候,如果把TimeUnit.SECONDS.sleep(5)condition.signal()這兩行代碼順序調換一下,那么await的返回值就是true

再看到官方文檔對這個返回值的描述,如下

{@code false} if the waiting time detectably elapsed
before return from the method, else {@code true}

翻譯過來,大致意思就是“如果等待時間可以在方法返回之前檢測到返回false,否則返回true”。但實際測試結果卻是await()被喚醒的時候,而不是方法返回的時候。


4 區別

  • Object wait() notify() 搭配synchronized使用
  • Condition await() signal() 搭配Lock使用
  • Object notify() 是隨機喚醒一個
  • Condition signal() 是喚醒第一個await()的線程
  • Object wait()有虛假喚醒,而Condition await() 沒有

5 參考文檔


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM