操作系統中的經典問題——生產者消費者問題(兩種方式實現)
1、問題引入:什么是生產者消費者問題?
生產者消費者問題(英語:Producer-consumer problem),也稱有限緩沖問題(英語:Bounded-buffer problem),是一個多線程同步問題的經典案例。該問題描述了共享固定大小緩沖區的兩個線程——即所謂的“生產者”和“消費者”——在實際運行時會發生的問題。生產者的主要作用是生成一定量的數據放到緩沖區中,然后重復此過程。與此同時,消費者也在緩沖區消耗這些數據。該問題的關鍵就是要保證生產者不會在緩沖區滿時加入數據,消費者也不會在緩沖區中空時消耗數據。
.
要解決該問題,就必須讓生產者在緩沖區滿時休眠(要么干脆就放棄數據),等到下次消費者消耗緩沖區中的數據的時候,生產者才能被喚醒,開始往緩沖區添加數據。同樣,也可以讓消費者在緩沖區空時進入休眠,等到生產者往緩沖區添加數據之后,再喚醒消費者。通常采用進程間通信的方法解決該問題。如果解決方法不夠完善,則容易出現死鎖的情況。出現死鎖時,兩個線程都會陷入休眠,等待對方喚醒自己。該問題也能被推廣到多個生產者和消費者的情形。
2、問題分析
該問題需要注意的幾點:
- 在緩沖區為空時,消費者不能再進行消費
- 在緩沖區為滿時,生產者不能再進行生產
- 在一個線程進行生產或消費時,其余線程不能再進行生產或消費等操作,即保持線程間的同步
- 注意條件變量與互斥鎖的順序
由於前兩點原因,因此需要保持線程間的同步,即一個線程消費(或生產)完,其他線程才能進行競爭CPU,獲得消費(或生產)的機會。對於這一點,可以使用條件變量進行線程間的同步:生產者線程在product之前,需要wait直至獲取自己所需的信號量之后,才會進行product的操作;同樣,對於消費者線程,在consume之前需要wait直到沒有線程在訪問共享區(緩沖區),再進行consume的操作,之后再解鎖並喚醒其他可用阻塞線程。
在訪問共享區資源時,為避免多個線程同時訪問資源造成混亂,需要對共享資源加鎖,從而保證某一時刻只有一個線程在訪問共享資源。
3、第一類:使用synchronized關鍵字來進行實現
3.1、生產者、消費者問題,歸根到底也都是線程間的通信問題。一個處於活動狀態,一個處於等待喚醒狀態。
3.2、這里設共享的資源為一個int數,調用方法進行加一、減一的操作模擬操作系統對共享資源的分配。
3.3、實現對操作的資源進行共享,主要有三步,判斷等待、業務、通知,因此可以將資源類的代碼實現如下:(代碼如果看不懂,詳情請看注釋,注釋詳盡)
/**資源類**/
class Data {
private int number = 0;
/**
* 判斷等待、業務、通知
*/
//+1
public synchronized void increment() throws InterruptedException {
if (number != 0) {
// 等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他線程,我+1完畢了
this.notifyAll();
}
//-1
public synchronized void decrement() throws InterruptedException {
if (number == 0) {
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他線程,我-1完畢了
this.notifyAll();
}
}
3.4、編寫兩個線程,進行測試,看是否能有序的進行資源的共享
package com.xgp.pc;
/**
* 線程之間的通信問題:生產者和消費者問題! 等待喚醒 通知
* 線程交替問題 A B 操作同一個變量 num = 0
* A num+1
* B num-1
* @author 薛國鵬
*/
@SuppressWarnings("all")
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}
}
運行結果:
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
進程完成,退出碼 0
3.5、有運行結果可以看出,當系統中只有兩個線程時,能夠有序的進行進行對臨界資源的共享。此時,將兩個線程改為4個線程(兩個增加操作,兩個減少操作)再進行測試
package com.xgp.pc;
/**
* 線程之間的通信問題:生產者和消費者問題! 等待喚醒 通知
* 線程交替問題 A B 操作同一個變量 num = 0
* A num+1
* B num-1
* @author 薛國鵬
*/
@SuppressWarnings("all")
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
運行結果為:
A=>1
B=>0
A=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
A=>2
D=>1
D=>0
A=>1
C=>2
B=>1
B=>0
C=>1
D=>0
C=>1
D=>0
進程完成,退出碼 0
由運行的結果可以看出,當有四個線程時,此時的資源分配共享出現了問題。
3.6、分析:
假如:當一個減法操作結束時,此時會通知喚醒其他三個線程我減一完畢了,此時共享資源的值為0,另一個減一線程判斷的number=0,自然會進行等待,改線程肯定不是問題所在。兩個加一線程,判斷的結果是number=1,於是都被喚醒了,當第一個加一線程完成加一操作后,第二個加一線程隨即跟隨其后完成加一操作。問題的關鍵就是,當第一個加一線程完成加一操作后,第二個線程因為之前的判斷被喚醒了,而且后面並沒有其他判斷使他沉睡了,因此在值發生改變后,仍然進行了加一操作。所以,我們得限制各個進行隨時隨刻得檢測共享資源得變化,在發生變化時立即判斷是否喚醒還是等待,因此代碼中不應該使用if判斷,而應該使用while進行循環判斷,雖然,這在一定程度上會占用一定得系統性能。改進后的代碼如下:
/**資源類**/
class Data {
private int number = 0;
/**
* 判斷等待、業務、通知
*/
//+1
public synchronized void increment() throws InterruptedException {
while (number != 0) {
// 等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他線程,我+1完畢了
this.notifyAll();
}
//-1
public synchronized void decrement() throws InterruptedException {
while (number == 0) {
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他線程,我-1完畢了
this.notifyAll();
}
}
此時,開啟四個線程進行測試的結果正常
A=>1
B=>0
A=>1
B=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
B=>0
A=>1
D=>0
C=>1
D=>0
C=>1
D=>0
進程完成,退出碼 0
4、使用java的JUC來實現
package com.xgp.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@SuppressWarnings("all")
public class B {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"C").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"D").start();
}
}
/**資源類**/
class Data2 {
private int number = 0;
/**
* 判斷等待、業務、通知
*/
Lock lock = new ReentrantLock();
//鎖監視器(取代了對象監視器的使用)
Condition condition = lock.newCondition();
//+1
public void increment() throws InterruptedException {
lock.lock();
try {
while (number != 0) {
// 等待
condition.await(); //等待
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他線程,我+1完畢了
condition.signalAll();
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
//-1
public void decrement() throws InterruptedException {
lock.lock();
try {
while (number == 0) {
//等待
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他線程,我-1完畢了
condition.signalAll();
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
運行結果:
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
A=>1
B=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
C=>1
D=>0
進程完成,退出碼 0
通過該方式也同樣可以實現資源的共享,並且此方式還有許多優點,能過將控制權更大程度的交給代碼的編寫者。相當於汽車中的手動擋,雖然缺少自動,但是會的話開的更快。
5、使用java的JUC來實現點對點喚醒
package com.xgp.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@SuppressWarnings("all")
public class C {
public static void main(String[] args) {
Data3 data = new Data3();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.printA();
} catch (Exception e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.printB();
} catch (Exception e) {
e.printStackTrace();
}
}
},"B").start();
new Thread(() -> {for(int i = 0;i < 10;i++) {
try {
data.printC();
} catch (Exception e) {
e.printStackTrace();
}
}
},"C").start();
}
}
class Data3 {
private int number = 1;
private Lock lock = new ReentrantLock();
//鎖監視器(取代了對象監視器的使用)
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void printA() {
lock.lock();
try {
while (number != 1) {
condition1.await();
}
System.out.println(Thread.currentThread().getName());
//喚醒指定的B
number = 2;
condition2.signal();
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
while (number != 2) {
condition2.await();
}
System.out.println(Thread.currentThread().getName());
//喚醒指定的B
number = 3;
condition3.signal();
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
while (number != 3) {
condition3.await();
}
System.out.println(Thread.currentThread().getName());
//喚醒指定的B
number = 1;
condition1.signal();
}catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
運行結果:
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
進程完成,退出碼 0