生產者和消費者模型
1. 什么是生產者和消費者模型
生產者消費者模型具體來講,就是在一個系統中,存在生產者和消費者兩種角色,他們通過內存緩沖區進行通信,生產者生產消費者需要的資料,消費者把資料做成產品。
再具體一點:
- 生產者生產數據到緩沖區中,消費者從緩沖區中取數據。
- 如果緩沖區已經滿了,則生產者線程阻塞。
- 如果緩沖區為空,那么消費者線程阻塞。
2. 如何實現
實現生產者消費者模型有兩種方式:
- 采用 wait—notify 方式實現生產者消費者模型(注意這里需要加同步鎖 synchronized)
- 采用 阻塞隊列 方式實現生產者消費者模式
3. wait-notify 方式
實現過程並不復雜,直接上代碼:
這里設置了生產者生產速度大於消費者消費速度(通過 sleep()
方法實現)。
緩沖區 BufferArea.java
:
public class BufferArea {
// 當前資源數量的計數值
private int currNum = 0;
// 資源池中允許存放的資源數目
private int maxSize = 10;
/**
* 從資源池中取走資源
*/
public synchronized void get() {
if (currNum > 0) {
currNum--;
System.out.println("Cosumer_" + Thread.currentThread().getName() + "消耗一件資源," + "當前緩沖區有" + currNum + "個");
// 通知生產者生產資源
notifyAll();
} else {
try {
// 如果沒有資源,則 Cosumer_ 進入等待狀態
System.out.println("Cosumer_" + Thread.currentThread().getName() + ": 當前緩沖區資源不足,進入等待狀態");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 向緩沖區中添加資源
*/
public synchronized void put() {
// 若當前緩沖區內的資源計數小於最大 size 數,才加
if (currNum < maxSize) {
currNum++;
System.out.println(Thread.currentThread().getName() + "生產一件資源,當前資源池有" + currNum + "個");
// 通知等待的消費者
notifyAll();
} else {
// 若當前緩沖區的資源計數大於最大 size 數,則等待
try {
System.out.println(Thread.currentThread().getName() + "線程進入等待 << 當前緩沖區的資源計數大於最大 size 數");
// 生產者進入等待狀態,並釋放鎖
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
生產者 Producer.java
:
public class Producer extends Thread {
private BlockQueueBufferArea mBufferArea;
public Producer(BlockQueueBufferArea bufferArea) {
this.mBufferArea = bufferArea;
setName("Producer_" + getName());
}
@Override
public void run() {
// 不斷的生產資源
while (true) {
sleepSomeTime();
mBufferArea.put();
}
}
private void sleepSomeTime() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消費者 Consumer
:
public class Consumer extends Thread {
private BlockQueueBufferArea mBufferArea;
public Consumer(BlockQueueBufferArea bufferArea) {
this.mBufferArea = bufferArea;
setName("Consumer_" + getName());
}
@Override
public void run() {
// 不斷的取出資源
while (true) {
sleepSomeTime();
mBufferArea.get();
}
}
private void sleepSomeTime() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
測試 Test.java
:
public class Test {
public static void main(String[] args) {
BlockQueueBufferArea bufferArea = new BlockQueueBufferArea();
Consumer consumer1 = new Consumer(bufferArea);
Consumer consumer2 = new Consumer(bufferArea);
Consumer consumer3 = new Consumer(bufferArea);
Producer producer1 = new Producer(bufferArea);
Producer producer2 = new Producer(bufferArea);
Producer producer3 = new Producer(bufferArea);
consumer1.start();
consumer2.start();
consumer3.start();
producer1.start();
producer2.start();
producer3.start();
}
}
打印結果如下:
ProducerThread-5生產一件資源,當前資源池有1個
ProducerThread-4生產一件資源,當前資源池有2個
ProducerThread-3生產一件資源,當前資源池有3個
ProducerThread-5生產一件資源,當前資源池有4個
ProducerThread-4生產一件資源,當前資源池有5個
ProducerThread-3生產一件資源,當前資源池有6個
ProducerThread-5生產一件資源,當前資源池有7個
ProducerThread-4生產一件資源,當前資源池有8個
ProducerThread-3生產一件資源,當前資源池有9個
ProducerThread-3生產一件資源,當前資源池有10個
ProducerThread-4線程進入等待 << 當前緩沖區的資源計數大於最大 size 數
ProducerThread-5線程進入等待 << 當前緩沖區的資源計數大於最大 size 數
ProducerThread-3線程進入等待 << 當前緩沖區的資源計數大於最大 size 數
>> 注釋:3個生產者線程生產滿了10個(maxSize)產品,然后就都進入了等待
Cosumer_Consumer_Thread-0消耗一件資源,當前緩沖區有9個
Cosumer_Consumer_Thread-1消耗一件資源,當前緩沖區有8個
Cosumer_Consumer_Thread-2消耗一件資源,當前緩沖區有7個
>> 注釋:3個消費者消費了3個產品
ProducerThread-3生產一件資源,當前資源池有8個
ProducerThread-5生產一件資源,當前資源池有9個
ProducerThread-4生產一件資源,當前資源池有10個
>> 注釋:生產者立馬又生產3個
...
>> 然后一直循環往復這個過程
4. 阻塞隊列方式
阻塞隊列的特點:
- 當隊列元素已滿的時候,阻塞插入操作
- 當隊列元素為空的時候,阻塞獲取操作
不同的阻塞隊列:
ArrayBlockingQueue 與 LinkedBlockingQueue 都是支持 FIFO (先進先出),但是 LinkedBlockingQueue 是無界的,而ArrayBlockingQueue 是有界的。
這里我們采用無界阻塞隊列來演示生產者消費者模式。
演示
還是設置生產者生產速度大於消費者消費速度(通過 sleep()
方法實現)
緩沖區 BlockQueueBufferArea.java
:
public class BlockQueueBufferArea {
BlockingQueue<Integer> mProductPoll = new LinkedBlockingQueue(10);
public void put() {
try {
System.out.println(Thread.currentThread().getName() + "產品池被放入了一個資源");
mProductPoll.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void get() {
try {
System.out.println(Thread.currentThread().getName() + "產品池被取走了一個資源");
mProductPoll.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
生產者 Producer.java
:
public class Producer extends Thread {
private BlockQueueBufferArea mBufferArea;
public Producer(BlockQueueBufferArea bufferArea) {
this.mBufferArea = bufferArea;
setName("Producer_" + getName());
}
@Override
public void run() {
// 不斷的生產資源
while (true) {
sleepSomeTime();
mBufferArea.put();
}
}
private void sleepSomeTime() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消費者 Consumer.java
:
public class Consumer extends Thread {
private BlockQueueBufferArea mBufferArea;
public Consumer(BlockQueueBufferArea bufferArea) {
this.mBufferArea = bufferArea;
setName("Consumer_" + getName());
}
@Override
public void run() {
// 不斷的取出資源
while (true) {
sleepSomeTime();
mBufferArea.get();
}
}
private void sleepSomeTime() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
測試 Test.java
:
public class Test {
public static void main(String[] args) {
BlockQueueBufferArea bufferArea = new BlockQueueBufferArea();
Consumer consumer1 = new Consumer(bufferArea);
Consumer consumer2 = new Consumer(bufferArea);
Consumer consumer3 = new Consumer(bufferArea);
Producer producer1 = new Producer(bufferArea);
Producer producer2 = new Producer(bufferArea);
Producer producer3 = new Producer(bufferArea);
consumer1.start();
consumer2.start();
consumer3.start();
producer1.start();
producer2.start();
producer3.start();
}
}
打印結果如下:
Producer_Thread-5產品池被放入了一個資源
Producer_Thread-4產品池被放入了一個資源
Producer_Thread-3產品池被放入了一個資源
Producer_Thread-3產品池被放入了一個資源
Producer_Thread-4產品池被放入了一個資源
Producer_Thread-5產品池被放入了一個資源
Producer_Thread-3產品池被放入了一個資源
Producer_Thread-4產品池被放入了一個資源
Producer_Thread-5產品池被放入了一個資源
Producer_Thread-3產品池被放入了一個資源
Producer_Thread-4產品池被放入了一個資源
Producer_Thread-5產品池被放入了一個資源
Producer_Thread-3產品池被放入了一個資源
Consumer_Thread-0產品池被取走了一個資源
Consumer_Thread-1產品池被取走了一個資源
Consumer_Thread-2產品池被取走了一個資源
Producer_Thread-4產品池被放入了一個資源
Producer_Thread-5產品池被放入了一個資源
Producer_Thread-3產品池被放入了一個資源