java多線程:線程間通信——生產者消費者模型



一、背景 && 定義


多線程環境下,只要有並發問題,就要保證數據的安全性,一般指的是通過 synchronized 來進行同步。

另一個問題是,多個線程之間如何協作呢

我們看一個倉庫出貨問題(更具體一些,快餐店直接放好炸貨的架子,不過每次只放一份)

  1. 假設倉庫中只能存放一件商品,生產者將生產出來的產品放入倉庫,消費者將倉庫中產品取走進行消費;
  2. 如果倉庫中沒有商品,那么生產者將產品放入倉庫,否則停止生產並等待,直到倉庫中的產品被消費者取走為止;
  3. 如果倉庫中放有產品,消費者可快速取走並消費,否則停止消費並等待,直到倉庫中再次放入產品為止。

這其實就是一個線程同步問題。生產者和消費者共享同一個資源,並且生產者和消費者之間互相依賴,互為條件。

如果一個快餐店:
先點單,餐出來之后再收錢。這種模式叫BIO-阻塞IO模式。
如果一個快餐店:
先收錢,收完錢消費者在旁邊等。這種就是生產者-消費者模式。

這類問題里,同步的候只有 synchronized 是不夠的,因為他雖然能解決資源的共享問題,實現資源的同步更新,但是無法在不同線程之間進行消息傳遞(通信)。

所以只有我們之前所說的加鎖排隊是不夠的,還要有通知

定義:

生產者和消費者在同一時間段內共用同一個存儲空間,生產者往存儲空間中添加產品,消費者從存儲空間中取走產品,當存儲空間為空時,消費者阻塞,當存儲空間滿時,生產者阻塞。

為了解決雙方能力不等而等待的問題,引入對應的解決方案。生產者消費者模型是一種並發協作模型。


二、解決方式介紹


2.1 管程法

  1. 生產者:負責生產數據的模塊(模塊可能是方法、對象、線程、進程);
  2. 消費者:負責處理數據的模塊(模塊可能是方法、對象、線程、進程);
  3. 緩沖區:消費者不能直接使用生產者的數據,它們之間有個“緩沖區”(緩沖區一般是隊列)。

生產者和消費者都是通過緩沖區進行數據的 放 和 拿 。

這樣的話,一來可以避免旱的旱死,澇的澇死的問題:不管哪一方過快或者過慢,緩沖區始終有一部分數據;二來能夠達到生產者和消費者的解耦,不再直接通信,從而提高效率。

因為容器相當於一個輸送商品的管道,所以成為管程法

2.2 信號燈法

采用類似紅燈綠燈的模式,決定車走還是人走。

  • 管程法使用容器的狀態來控制,數據在容器中;
  • 而信號燈法只是用信號來給生產者和消費者提醒,他們的交互數據並不由信號燈來保管。

2.3 Object類

jdk 里面 Object 類老早就有提供解決線程間通信的問題的方法:

  1. wait():表示線程一直等待,直到其他線程通知(也就是調用了notify或者notifyAll方法),與sleep不同,會釋放鎖;
  2. wait(long timeout):指定時間;
  3. notify():喚醒一個處於等待狀態的線程;
  4. notifyAll():喚醒同一個對象上所有調用 wait() 方法的線程,優先級別高的線程優先調度。

這幾個方法都是在同步方法或者同步代碼塊中使用,否則會拋出異常。

(很多面試題問 Java 的 Object 類有哪些方法,都是希望得到關於這塊的答案,引到多線程)


三、管程法實現


管程法實現的四個角色:

  1. 生產者和消費者都是多線程;
  2. 中間的緩沖區應該是一個容器,並且需要的是一個並發容器,java.util.concurrent包里面已經提供了;
  3. 資源,也就是各個角色來回交換的商品。

利用 Object 類的幾個方法,來實現管程法,以下是代碼示例:

/**
* 協作模型:生產者消費者模型實現:管程法
*/
public class Cooperation1 {
    public static void main(String[] args) {
        Container container = new Container();
        new Producer(container).start();
        new Consumer(container).start();
    }
}

/**
* 生產者
*/
class Producer extends Thread{
    Container container;
    public Producer(Container container){
        this.container = container;
    }
    @Override
    public void run() {
        //生產過程
        for (int i=0; i<10; i++){
            System.out.println("生產第 " + i + " 個饅頭");
            container.push(new Hamburger(i));
        }
    }
}

/**
* 消費者
*/
class Consumer extends Thread{
    Container container;
    public Consumer(Container container){
        this.container = container;
    }
    @Override
    public void run() {
        //消費過程
        for (int i=0; i<10; i++){
            System.out.println("消費第 " + container.pop().id + " 個饅頭");
        }
    }
}

/**
* 緩沖區,操作商品,並和生產者、消費者交互
*/
class Container{
    Hamburger[] food = new Hamburger[10];
    private int count = 0;
    //存儲:生產
    public synchronized void push(Hamburger hamburger){
        if (count == food.length){
            try {
                this.wait();//阻塞,但是等待消費者通知后會解除
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        food[count++] = hamburger;
        this.notifyAll();//說明存在數據了,通知消費者消費
    }
    //獲取:消費
    public synchronized Hamburger pop(){
        if (count ==0 ){
            try {
                this.wait();//阻塞,直到生產者通知后會解除
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        Hamburger ans = food[--count];
        this.notifyAll();//存在空余空間了,通知生產者生產
        return ans;
    }
}

/**
* 商品
*/
class Hamburger{
    int id;
    public Hamburger(int id) {
        this.id = id;
    }
}

其中的核心有這么幾點:

  1. 容器相當於一個棧,是后進先出的;
  2. 容器的兩個方法對於資源的操作,一個和生產者交互,一個和消費者交互,除了 synchronized 修飾,因為兩個方法是互斥的,所以利用 wait 和 notify 方法使他們完成阻塞和解除阻塞;
  3. 生產者和容器交互,添加數據;
  4. 消費者和容器交互,刪除數據。

前面關於 線程的阻塞問題,生命周期里的阻塞,完整的可能情況,就包含這里的阻塞情況:


四、信號燈法實現


和上一種通過容器的容量讓線程之間互相通知的方法不同,信號燈法沒有用數據緩存的方式,而是用信號燈來指示雙方,對方是否已經准備好了要和你通信。

下面是一個 電視直播和觀眾的代碼示例,通過信號燈,通知演員和觀眾直播,確保演員在演的時候,讓觀眾來看。

/**
* 協作模型:生產者消費者實現:信號燈法
*/
public class Cooperation2 {
    public static void main(String[] args) {
        TV tv = new TV();
        new Actor(tv).start();
        new Fans(tv).start();
    }
}
/**
* 生產者:演員
*/
class Actor extends Thread{
    TV tv;
    public Actor(TV tv){
        this.tv = tv;
    }
    @Override
    public void run() {
        for (int i=0; i<10; i++){
            if (i%2 == 0){
                this.tv.play("節目 " + i);
            }else{
                this.tv.play("廣告 " + i);
            }
        }
    }
}
/**
* 消費者:觀眾
*/
class Fans extends Thread{
    TV tv;
    public Fans(TV tv){
        this.tv = tv;
    }
    @Override
    public void run() {
        for (int i=0; i<10; i++){
            tv.watch();
        }
    }
}

/**
* 共同資源:電視直播
*/
class TV{
    String voice;
    //信號燈,如果為真則演員准備,觀眾等待
    //如果為假,則觀眾就位,演員等待
    boolean flag = true;

    //表演方法:針對生產者
    public synchronized void play(String voice){
        //演員等待
        if (!flag){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.voice = voice;
        System.out.println("表演 "+voice +" ing");
        //喚醒觀眾
        this.notifyAll();
        this.flag = !flag;
    }

    //觀看方法:針對消費者
    public synchronized void watch(){
        //觀眾等待
        if (flag){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("觀看 " + voice +" ing");
        this.notifyAll();
        this.flag = !flag;
    }
}

可以看到,相比管程法的核心區別是:

TV 沒有用一個容器存儲數據,只是通過生產者是否生產,來決定信號燈的標志,以此通知消費者來消費。

顯然這兩種實現方法,有不同的適用場景,那就是決定於生產者消費者是否有數據溝通。


五、虛假喚醒問題


對於上面的代碼,一個生產者和一個消費者的模型,程序是沒有問題的。

但是如果說我們把生產者和消費者線程的個數都增加的話,就會出現問題,可能會出現多生產(一個消費者買到的時候得到的number>1)、或出現邏輯錯誤(一個消費者買的時候number<0),這個執行結果碰運氣, 可能不會出現,也可能出現很離譜的結果。

為什么呢?

因為 Object 類的 wait 和 notifyall 方法,可能會產生虛假喚醒的情況。所謂虛假喚醒,就是我們采用 if 進行的這種寫法:

  1. 如果只有兩個線程,那么一個 wait ,等待另一個喚醒它,就能夠完成生產者和消費者的協作;
  2. 但如果是再多個線程,當某一個線程執行完畢,調用 notifyAll:
    1. 本來我們希望的是只喚醒對立面的角色;
    2. 但是 notifyall 喚醒了所有的線程;
    3. 結合 if 的 bug, 被喚醒之后,if 已經判斷過了,因此會跳出去,導致好幾個生產者都執行++,或者好幾個消費者都執行--。
    4. 這就叫虛假喚醒,本質就是 if 值判斷一次,進入分支體之后就失效。

解決方案,其實就是官方文檔給出的使用 wait 和 notify 時候伴隨條件的推薦寫法,那就是使用 while 循環 而不是 if。

這樣無論多少個線程,能保證生產者和消費者問題的安全性。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM