操作系統中的經典問題——生產者消費者問題(兩種方式實現)


操作系統中的經典問題——生產者消費者問題(兩種方式實現)

1、問題引入:什么是生產者消費者問題?

生產者消費者問題(英語:Producer-consumer problem),也稱有限緩沖問題(英語:Bounded-buffer problem),是一個多線程同步問題的經典案例。該問題描述了共享固定大小緩沖區的兩個線程——即所謂的“生產者”和“消費者”——在實際運行時會發生的問題。生產者的主要作用是生成一定量的數據放到緩沖區中,然后重復此過程。與此同時,消費者也在緩沖區消耗這些數據。該問題的關鍵就是要保證生產者不會在緩沖區滿時加入數據,消費者也不會在緩沖區中空時消耗數據。
.

要解決該問題,就必須讓生產者在緩沖區滿時休眠(要么干脆就放棄數據),等到下次消費者消耗緩沖區中的數據的時候,生產者才能被喚醒,開始往緩沖區添加數據。同樣,也可以讓消費者在緩沖區空時進入休眠,等到生產者往緩沖區添加數據之后,再喚醒消費者。通常采用進程間通信的方法解決該問題。如果解決方法不夠完善,則容易出現死鎖的情況。出現死鎖時,兩個線程都會陷入休眠,等待對方喚醒自己。該問題也能被推廣到多個生產者和消費者的情形。

2、問題分析

該問題需要注意的幾點:

  1. 在緩沖區為空時,消費者不能再進行消費
  2. 在緩沖區為滿時,生產者不能再進行生產
  3. 在一個線程進行生產或消費時,其余線程不能再進行生產或消費等操作,即保持線程間的同步
  4. 注意條件變量與互斥鎖的順序

由於前兩點原因,因此需要保持線程間的同步,即一個線程消費(或生產)完,其他線程才能進行競爭CPU,獲得消費(或生產)的機會。對於這一點,可以使用條件變量進行線程間的同步:生產者線程在product之前,需要wait直至獲取自己所需的信號量之后,才會進行product的操作;同樣,對於消費者線程,在consume之前需要wait直到沒有線程在訪問共享區(緩沖區),再進行consume的操作,之后再解鎖並喚醒其他可用阻塞線程。

在訪問共享區資源時,為避免多個線程同時訪問資源造成混亂,需要對共享資源加鎖,從而保證某一時刻只有一個線程在訪問共享資源。

3、第一類:使用synchronized關鍵字來進行實現

3.1、生產者、消費者問題,歸根到底也都是線程間的通信問題。一個處於活動狀態,一個處於等待喚醒狀態。

3.2、這里設共享的資源為一個int數,調用方法進行加一、減一的操作模擬操作系統對共享資源的分配。

3.3、實現對操作的資源進行共享,主要有三步,判斷等待、業務、通知,因此可以將資源類的代碼實現如下:(代碼如果看不懂,詳情請看注釋,注釋詳盡)

/**資源類**/
class Data {
    private int number = 0;

    /**
     * 判斷等待、業務、通知
     */

    //+1
    public synchronized void increment() throws InterruptedException {
        if (number != 0) {
            // 等待
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName() + "=>" + number);
        // 通知其他線程,我+1完畢了
        this.notifyAll();
    }
    //-1
    public synchronized void decrement() throws InterruptedException {
        if (number == 0) {
            //等待
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName() + "=>" + number);
        // 通知其他線程,我-1完畢了
        this.notifyAll();
    }
}

3.4、編寫兩個線程,進行測試,看是否能有序的進行資源的共享

package com.xgp.pc;

/**
 * 線程之間的通信問題:生產者和消費者問題! 等待喚醒 通知
 * 線程交替問題 A  B 操作同一個變量  num = 0
 * A num+1
 * B num-1
 * @author 薛國鵬
 */
@SuppressWarnings("all")
public class A {
    public static void main(String[] args) {
        Data data = new Data();

        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.increment();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"A").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.decrement();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"B").start();
    }
}

運行結果:

A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0

進程完成,退出碼 0

3.5、有運行結果可以看出,當系統中只有兩個線程時,能夠有序的進行進行對臨界資源的共享。此時,將兩個線程改為4個線程(兩個增加操作,兩個減少操作)再進行測試

package com.xgp.pc;

/**
 * 線程之間的通信問題:生產者和消費者問題! 等待喚醒 通知
 * 線程交替問題 A  B 操作同一個變量  num = 0
 * A num+1
 * B num-1
 * @author 薛國鵬
 */
@SuppressWarnings("all")
public class A {
    public static void main(String[] args) {
        Data data = new Data();

        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.increment();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"A").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.decrement();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"B").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.increment();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"C").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.decrement();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"D").start();
    }
}

運行結果為:

A=>1
B=>0
A=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
D=>0
C=>1
D=>0

進程完成,退出碼 0

由運行的結果可以看出,當有四個線程時,此時的資源分配共享出現了問題。

3.6、分析:

假如:當一個減法操作結束時,此時會通知喚醒其他三個線程我減一完畢了,此時共享資源的值為0,另一個減一線程判斷的number=0,自然會進行等待,改線程肯定不是問題所在。兩個加一線程,判斷的結果是number=1,於是都被喚醒了,當第一個加一線程完成加一操作后,第二個加一線程隨即跟隨其后完成加一操作。問題的關鍵就是,當第一個加一線程完成加一操作后,第二個線程因為之前的判斷被喚醒了,而且后面並沒有其他判斷使他沉睡了,因此在值發生改變后,仍然進行了加一操作。所以,我們得限制各個進行隨時隨刻得檢測共享資源得變化,在發生變化時立即判斷是否喚醒還是等待,因此代碼中不應該使用if判斷,而應該使用while進行循環判斷,雖然,這在一定程度上會占用一定得系統性能。改進后的代碼如下:

/**資源類**/
class Data {
    private int number = 0;

    /**
     * 判斷等待、業務、通知
     */

    //+1
    public synchronized void increment() throws InterruptedException {
        while (number != 0) {
            // 等待
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName() + "=>" + number);
        // 通知其他線程,我+1完畢了
        this.notifyAll();
    }
    //-1
    public synchronized void decrement() throws InterruptedException {
        while (number == 0) {
            //等待
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName() + "=>" + number);
        // 通知其他線程,我-1完畢了
        this.notifyAll();
    }
}

此時,開啟四個線程進行測試的結果正常

A=>1
B=>0
A=>1
B=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
D=>0
C=>1
D=>0

進程完成,退出碼 0

4、使用java的JUC來實現

package com.xgp.pc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@SuppressWarnings("all")
public class B {
    public static void main(String[] args) {
        Data2 data = new Data2();

        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.increment();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"A").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.decrement();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"B").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.increment();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"C").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.decrement();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        },"D").start();
    }
}

/**資源類**/
class Data2 {
    private int number = 0;

    /**
     * 判斷等待、業務、通知
     */
    Lock lock = new ReentrantLock();
    //鎖監視器(取代了對象監視器的使用)
    Condition condition = lock.newCondition();

    //+1
    public void increment() throws InterruptedException {
        lock.lock();
        try {
            while (number != 0) {
                // 等待
                condition.await();      //等待
            }
            number++;
            System.out.println(Thread.currentThread().getName() + "=>" + number);
            // 通知其他線程,我+1完畢了
            condition.signalAll();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    //-1
    public void decrement() throws InterruptedException {
        lock.lock();

        try {
            while (number == 0) {
                //等待
                condition.await();
            }
            number--;
            System.out.println(Thread.currentThread().getName() + "=>" + number);
            // 通知其他線程,我-1完畢了
            condition.signalAll();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }
}

運行結果:

A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0

進程完成,退出碼 0

通過該方式也同樣可以實現資源的共享,並且此方式還有許多優點,能過將控制權更大程度的交給代碼的編寫者。相當於汽車中的手動擋,雖然缺少自動,但是會的話開的更快。

5、使用java的JUC來實現點對點喚醒

package com.xgp.pc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@SuppressWarnings("all")
public class C {
    public static void main(String[] args) {

        Data3 data = new Data3();

        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.printA();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        },"A").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.printB();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        },"B").start();
        new Thread(() -> {for(int i = 0;i < 10;i++) {
            try {
                data.printC();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        },"C").start();
    }
}


class Data3 {

    private int number = 1;

    private Lock lock = new ReentrantLock();
    //鎖監視器(取代了對象監視器的使用)
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();

    public void printA() {
        lock.lock();

        try {
            while (number != 1) {
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName());
            //喚醒指定的B
            number = 2;
            condition2.signal();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public void printB() {
        lock.lock();

        try {
            while (number != 2) {
                condition2.await();
            }
            System.out.println(Thread.currentThread().getName());
            //喚醒指定的B
            number = 3;
            condition3.signal();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    public void printC() {
        lock.lock();

        try {
            while (number != 3) {
                condition3.await();
            }
            System.out.println(Thread.currentThread().getName());
            //喚醒指定的B
            number = 1;
            condition1.signal();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

運行結果:

A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C

進程完成,退出碼 0


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM