並發編程之 wait notify 方法剖析


前言

2018 元旦快樂。

摘要:

  1. notify wait 如何使用?
  2. 為什么必須在同步塊中?
  3. 使用 notify wait 實現一個簡單的生產者消費者模型
  4. 底層實現原理

1. notify wait 如何使用?

今天我們要學習或者說分析的是 Object 類中的 wait notify 這兩個方法,其實說是兩個方法,這兩個方法包括他們的重載方法一共有5個,而Object 類中一共才 12 個方法,可見這2個方法的重要性。我們先看看 JDK 中的代碼:

public final native void notify();

public final native void notifyAll();
 
public final void wait() throws InterruptedException {
    wait(0);
}

public final native void wait(long timeout) throws InterruptedException;

public final void wait(long timeout, int nanos) throws InterruptedException {
    if (timeout < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (nanos < 0 || nanos > 999999) {
        throw new IllegalArgumentException(
                            "nanosecond timeout value out of range");
    }

    if (nanos > 0) {
        timeout++;
    }

    wait(timeout);
}

就是這五個方法。其中有3個方法是 native 的,也就是由虛擬機本地的c代碼執行的。有2個 wait 重載方法最終還是調用了 wait(long) 方法。

首先還是 know how。來一個最簡單的例子,看看如何使用這兩個方法。

package cn.think.in.java.two;

import java.util.concurrent.TimeUnit;

public class WaitNotify {

  final static Object lock = new Object();

  public static void main(String[] args) {

    new Thread(new Runnable() {
      @Override
      public void run() {
        System.out.println("線程 A 等待拿鎖");
        synchronized (lock) {
          try {
            System.out.println("線程 A 拿到鎖了");
            TimeUnit.SECONDS.sleep(1);
            System.out.println("線程 A 開始等待並放棄鎖");
            lock.wait();
            System.out.println("被通知可以繼續執行 則 繼續運行至結束");
          } catch (InterruptedException e) {
          }
        }
      }
    }, "線程 A").start();

    new Thread(new Runnable() {
      @Override
      public void run() {
        System.out.println("線程 B 等待鎖");
        synchronized (lock) {
          System.out.println("線程 B 拿到鎖了");
          try {
            TimeUnit.SECONDS.sleep(5);
          } catch (InterruptedException e) {
          }
          lock.notify();
          System.out.println("線程 B 隨機通知 Lock 對象的某個線程");
        }
      }
    }, "線程 B").start();
  }


}

運行結果:

線程 A 等待拿鎖
線程 B 等待鎖
線程 A 拿到鎖了
線程 A 開始等待並放棄鎖
線程 B 拿到鎖了
線程 B 隨機通知 Lock 對象的某個線程
被通知可以繼續執行 則 繼續運行至結束

在上面的代碼中,線程 A 和 B 都會搶這個 lock 對象的鎖,A 的運氣比較好(也可能使 B 拿到鎖),他先拿到了鎖,然后調用了 wait 方法,放棄了鎖,並掛起了自己,這個時候等待鎖的 B 就拿到了鎖,然后通知了A,但是請注意,通知完畢之后,B 線程並沒有執行完同步代碼塊中的代碼,因此,A 還是拿不到鎖的,因此無法運行,等到B線程執行完畢,出了同步塊,這個時候 A 線程才被激活得以繼續執行。

使用 wait 方法和 notify 方法可以使 2 個無關的線程進行通信。也就是面試題中常提到的線程之間如何通信。

如果沒有 wait 方法和 noitfy 方法,我們如何讓兩個線程通信呢?簡單的辦法就是讓某個線程循環去檢查某個標記變量,比如:

while (value != flag) {
  Thread.sleep(1000);
}
doSomeing();

上面的這段代碼在條件不滿足使就睡眠一段時間,這樣做到目的是防止過快的”無效嘗試“,這種方式看似能夠實現所需的功能,但是卻存在如下問題:

  1. 難以確保及時性。因為等待的1000時間會導致時間差。
  2. 難以降低開銷,如果確保了及時性,休眠時間縮短,將大大消耗CPU。

但是有了Java 自帶的 wait 方法 和 notify 方法,一切迎刃而解。官方說法是等待/通知機制。一個線程在等待,另一個線程可以通知這個線程,實現了線程之間的通信。

2. 為什么必須在同步塊中?

注意,這兩個方法的使用必須是在 synchroized 同步塊中,並且在當前對象的同步塊中,如果在 A 對象的方法中調用 B 對象的 wait 或者 notify 方法,虛擬機會拋出 IllegalMonitorStateException,非法的監視器異常,因為你這個線程持有的監視器和你調用的監視器的不是一個對象。

那么為什么這兩個方法一定要在同步塊中呢?

這里要說一個專業名詞:競態條件。什么是競太條件呢?

當兩個線程競爭同一資源時,如果對資源的訪問順序敏感,就稱存在競態條件。

競態條件會導致程序在並發情況下出現一些bugs。多線程對一些資源的競爭的時候就會產生競態條件,如果首先要執行的程序競爭失敗排到后面執行了,那么整個程序就會出現一些不確定的bugs。這種bugs很難發現而且會重復出現,這是因為線程間會隨機競爭。

假設有2個線程,分別是生產者和消費者,他們有各自的任務。

1.1生產者檢查條件(如緩存滿了)-> 1.2生產者必須等待
2.1消費者消費了一個單位的緩存 -> 2.2重新設置了條件(如緩存沒滿) -> 2.3調用notifyAll()喚醒生產者

我們希望的順序是: 1.1->1.2->2.1->2.2->2.3
但是由於CPU執行是隨機的,可能會導致 2.3 先執行,1.2 后執行,這樣就會導致生產者永遠也醒不過來了!

所以我們必須對流程進行管理,也就是同步,通過在同步塊中並結合 wait 和 notify 方法,我們可以手動對線程的執行順序進行調整。

3. 使用 notify wait 實現一個簡單的生產者消費者模型

雖然很多書中都不建議我們直接使用 notify 和 wait 方法進行並發編程,但仍然需要我們重點掌握。樓主寫了一個簡單的生產者消費者例子:

簡單的緩存類:


public class Queue {

  final int num;
  final List<String> list;
  boolean isFull = false;
  boolean isEmpty = true;


  public Queue(int num) {
    this.num = num;
    this.list = new ArrayList<>();
  }


  public synchronized void put(String value) {
    try {
      if (isFull) {
        System.out.println("putThread 暫停了,讓出了鎖");
        this.wait();
        System.out.println("putThread 被喚醒了,拿到了鎖");
      }

      list.add(value);
      System.out.println("putThread 放入了" + value);
      if (list.size() >= num) {
        isFull = true;
      }
      if (isEmpty) {
        isEmpty = false;
        System.out.println("putThread 通知 getThread");
        this.notify();
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public synchronized String get(int index) {
    try {
      if (isEmpty) {
        System.err.println("getThread 暫停了,並讓出了鎖");
        this.wait();
        System.err.println("getThread 被喚醒了,拿到了鎖");
      }

      String value = list.get(index);
      System.err.println("getThread 獲取到了" + value);
      list.remove(index);

      Random random = new Random();
      int randomInt = random.nextInt(5);
      if (randomInt == 1) {
        System.err.println("隨機數等於1, 清空集合");
        list.clear();
      }

      if (getSize() < num) {
        if (getSize() == 0) {
          isEmpty = true;
        }
        if (isFull) {
          isFull = false;
          System.err.println("getThread 通知 putThread 可以添加了");
          Thread.sleep(10);
          this.notify();
        }
      }
      return value;


    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return null;
  }


  public int getSize() {
    return list.size();
  }


生產者線程:

class PutThread implements Runnable {

  Queue queue;

  public PutThread(Queue queue) {
    this.queue = queue;
  }

  @Override
  public void run() {
    int i = 0;
    for (; ; ) {
      i++;
      queue.put(i + "號");

    }
  }
}

消費者線程:

class GetThread implements Runnable {

  Queue queue;

  public GetThread(Queue queue) {
    this.queue = queue;
  }

  @Override
  public void run() {
    for (; ; ) {
      for (int i = 0; i < queue.getSize(); i++) {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        String value = queue.get(i);

      }
    }
  }
}

大家有興趣可以跑跑看,能夠加深這兩個方法的理解,實際上,JDK 內部的阻塞隊列也是類似這種實現,但是,不是用的 synchronized ,而是使用的重入鎖。

基本上經典的生產者消費者模式的有着如下規則:

等待方遵循如下規則:

  1. 獲取對象的鎖。
  2. 如果條件不滿足,那么調用對象的 wait 方法,被通知后仍要檢查條件。
  3. 條件滿足則執行相應的邏輯。

對應的偽代碼入下:

synchroize( 對象 ){
    while(條件不滿足){
      對象.wait();
    }
    對應的處理邏輯......
}

通知方遵循如下規則:

  1. 獲得對象的鎖。
  2. 改變條件。
  3. 通知所有等待在對象上的線程。

對應的偽代碼如下:

synchronized(對象){
  改變條件
  對象.notifyAll();
}

4. 底層實現原理

知道了如何使用,就得知道他的原理到底是什么?

首先我們看,使用這兩個方法的順序一般是什么?

  1. 使用 wait ,notify 和 notifyAll 時需要先對調用對象加鎖。
  2. 調用 wait 方法后,線程狀態有 Running 變為 Waiting,並將當前線程放置到對象的 等待隊列
  3. notify 或者 notifyAll 方法調用后, 等待線程依舊不會從 wait 返回,需要調用 noitfy 的線程釋放鎖之后,等待線程才有機會從 wait 返回。
  4. notify 方法將等待隊列的一個等待線程從等待隊列種移到同步隊列中,而 notifyAll 方法則是將等待隊列種所有的線程全部移到同步隊列,被移動的線程狀態由 Waiting 變為 Blocked。
  5. 從 wait 方法返回的前提是獲得了調用對象的鎖。

從上述細節可以看到,等待/通知機制依托於同步機制,其目的就是確保等待線程從 wait 方法返回后能夠感知到通知線程對變量做出的修改。

該圖描述了上面的步驟:

WaitThread 獲得了對象的鎖,調用對象的 wait 方法,放棄了鎖,進入的等待隊列,然后 NotifyThread 拿到了對象的鎖,然后調用對象的 notify 方法,將 WatiThread 移動到同步隊列中,最后,NotifyThread 執行完畢,釋放鎖, WaitThread 再次獲得鎖並從 wait 方法返回繼續執行。

到這里,關於應用層面的 wait 和 notify 基本就差不多了,后面的是關於虛擬機層面的拋磚引玉,涉及到 Java 的內置鎖實現,synchronized 關鍵字底層實現,JVM 源碼。算是本文的擴展吧。

注意:我們看到圖中出現了 Monitor 這個詞,也就是監視器,實際上,在 JDK 的注釋中,也有 The current thread must own this object's monitor 這句話,當前線程必須擁有該對象的監視器。

如果我們編譯這段含有 synchronized 關鍵字的代碼,就會發現有一段代碼被 monitorenter 指令和 monitorexit 指令括住了,這就是 synchronized 在編譯期間做的事情,那么,在字節碼被執行的時侯,該指令對應的 c 代碼將會被執行。這里,我們必須打住,這里已經開始涉及到 synchronized 的相關原理了,本篇文章不會討論這個。

wait noitfy 的答案都在 Java HotSpot 虛擬機的 C 代碼中。但 R 大告訴我們不要輕易閱讀虛擬機源碼,眾多細節可能會掩蓋抽象,導致學習效率不高。如果同學們有興趣,有大神寫了3篇文章專門從 HotSpot 中解析源碼,地址:

Java的wait()、notify()學習三部曲之一:JVM源碼分析
Java的wait()、notify()學習三部曲之二:修改JVM源碼看參數
Java的wait()、notify()學習三部曲之三:修改JVM源碼控制搶鎖順序
還有狼哥的 JVM源碼分析之Object.wait/notify實現.

上面四篇文章都從 JVM 的源碼層面解析了 wait ,notify 的實現原理,非常清楚。

拾遺

  1. wait(long) 方法,該方法參數是毫秒,也就是說,如果線程等待了指定的毫秒數,就會自動返回該線程。
  2. wait(long, int)方法,該方法增加了納秒級別的設置,算法是,前面的毫秒加上后面的納秒,注意,是直接加一毫秒。
  3. notify 方法調用后,如果等待的線程很多,JDK 源碼中說將會隨機找一個,但是 JVM 的源碼中實際上是找第一個。
  4. notifyAll 和 notify 不會立即生效,必須等到調用方執行完同步代碼塊,放棄鎖之后才起作用。

總結

好了,關於 wait noitfy 的使用和基本原理就介紹到這里,不知道大家發現沒有,並發和虛擬機高度相關。因此,可以說,學習並發的過程就是學習虛擬機的過程。而閱讀虛擬機里的 openjdk 代碼讓人頭大,但不管怎么樣,丑媳婦遲早見公婆,openjdk 代碼是一定要看的,加油!!!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM