問題描述
在IT技術面試過程中,我們經常會遇到生產者消費者問題(Producer-consumer problem), 這是多線程並發協作問題的經典案例。場景中包含三個對象,生產者(Producer),消費者(Consumer)以及一個固定大小的緩沖區(Buffer)。生產者的主要作用是不斷生成數據放到緩沖區,消費者則從緩沖區不斷消耗數據。該問題的關鍵是如何線程安全的操作共享數據塊,保證生產者線程和消費者線程可以正確的更新數據塊,主要考慮 1. 生產者不會在緩沖區滿時加入數據. 2. 消費者應當停止在緩沖區時消耗數據. 3. 在同一時間應當只允許一個生產者或者消費者訪問共享緩沖區(這一點是對於互斥操作訪問共享區塊的要求)。
解決方案
解決問題以上問題通常有信號量,wait & notify, 管道或者阻塞隊列等幾種思路。本文以Java語言為例一一進行舉例講解。
信號量
信號量(Semaphore)也稱信號燈,是用來控制資源被同時訪問的個數,比如控制訪問數據庫最大連接數的數量,線程通過acquire()獲得連接許可,完成數據操作后,通過release()釋放許可。對於生產者消費者問題來說,為了滿足線程安全操作的要求,同一時間我們只允許一個線程訪問共享數據區,因此需要一個大小為1的信號量mutex來控制互斥操作。注意到我們還定義了notFull 和 notEmpty 信號量,notFull用於標識當前可用區塊的空間大小,當notFull size 大於0時表明"not full", producer 可以繼續生產,等於0時表示空間已滿,無法繼續生產;同樣,對於notEmpty信號量來說,大於0時表明 "not empty", consumer可以繼續消耗,等於0 時表明沒有產品,無法繼續消耗。notFull初始size 為5 (5個available空間可供生產),notEmpty初始為0(沒有產品可供消耗)。
/***
數據倉儲class,所有的producer和consumer共享這個class對象
**/
static class DataWareHouse {
//共享數據區
private final Queue<String> data = new LinkedList();
//非滿鎖
private final Semaphore notFull;
//非空鎖
private final Semaphore notEmpty;
//互斥鎖
private final Semaphore mutex;
public DataWareHouse(int capacity) {
this.notFull = new Semaphore(capacity);
this.notEmpty = new Semaphore(0);
mutex = new Semaphore(1);
}
public void offer(String x) throws InterruptedException {
notFull.acquire(); //producer獲取信號,notFull信號量減一
mutex.acquire(); //當前進程獲得信號,mutex信號量減1,其他線程被阻塞操作共享區塊data
data.add(x);
mutex.release(); //mutex信號量+1, 其他線程可以繼續信號操作共享區塊data
notEmpty.release(); //成功生產數據,notEmpty信號量加1
}
public String poll() throws InterruptedException {
notEmpty.acquire(); //notEmpty信號減一
mutex.acquire();
String result = data.poll();
mutex.release();
notFull.release(); //成功消耗數據, notFull信號量加1
return result;
}
}
/**Producer線程**/
static class Producer implements Runnable {
private final DataWareHouse dataWareHouse;
public Producer(final DataWareHouse dataWareHouse) {
this.dataWareHouse = dataWareHouse;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(100); //生產的速度慢於消耗的速率
String s = UUID.randomUUID().toString();
System.out.println("put data " + s);
dataWareHouse.offer(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**Consumer線程**/
static class Consumer implements Runnable {
private final DataWareHouse dataWareHouse;
public Consumer(final DataWareHouse dataWareHouse) {
this.dataWareHouse = dataWareHouse;
}
@Override
public void run() {
while (true) {
while (true) {
try {
System.out.println("get data " + dataWareHouse.poll());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
//測試代碼
public static void main(String[] args) {
final DataWareHouse dataWareHouse = new DataWareHouse(5);
//三個producer 持續生產
for (int i = 0; i < 3; i++) {
Thread t = new Thread(new Producer(dataWareHouse));
t.start();
}
//三個consumer 持續消耗
for (int i = 0; i < 3; i++) {
Thread t = new Thread(new Consumer(dataWareHouse));
t.start();
}
}
Wait 和 Notify 機制
Java Object對象類中包含三個final methods來允許線程之間進行通信,告知資源的狀態。它們分別是wait(), notify(), 和notifyAll()。
wait(): 顧名思義告訴當前線程釋放鎖,陷入休眠狀態(waiting狀態),等待資源。wait 方法本身是一個native method,它在Java中的使用語法如下所示:
synchronized(lockObject )
{
while( ! condition )
{
lockObject.wait();
}
//take the action here;
}
notify(): 用於喚醒waiting狀態的線程, 同時釋放鎖,被喚醒的線程可以重新獲得鎖訪問資源。它的基本語法 如下
synchronized(lockObject)
{
//establish_the_condition;
lockObject.notify();
//any additional code if needed
}
notifyAll(): 不同於notify(),它用於喚醒所有處於waiting狀態的線程。語法如下:
synchronized(lockObject)
{
establish_the_condition;
lockObject.notifyAll();
}
說完了這三個方法,來看下如何使用wait & notify(All) 來解決我們的問題。新的DataWareHouse 類如下所示:
//producer類和consumer共享對象
static class DataWareHouse {
//共享數據區
private final Queue<String> data = new LinkedList();
private int capacity;
private int size = 0;
public DataWareHouse(int capacity) {
this.capacity = capacity;
}
public synchronized void offer(String x) throws InterruptedException {
while (size == capacity) { //當buffer滿時,producer進入waiting 狀態
this.wait(); //使用this對象來加鎖
}
data.add(x);
size++;
notifyAll(); //當buffer 有數據時,喚醒所有等待的consumer線程
}
public synchronized String poll() throws InterruptedException {
while (size == 0) {//當buffer為空時,consumer 進入等待狀態
this.wait();
}
String result = data.poll();
size--;
notifyAll(); //當數據被消耗,空間被釋放,通知所有等待的producer。
return result;
}
}
Note: 在方法上使用synchronized 等價於在方法體內使用synchronized(this),兩者都是使用this對象作為鎖。
生產者和消費者類,以及測試代碼和 信號量 section 相同,不做重復列舉了。
管道
管道Pipe是實現進程或者線程(線程之間通常通過共享內存實現通訊,而進程則通過scoket,管道,消息隊列等技術)之間通信常用方式,它連接輸入流和輸出流,基於生產者- 消費者模式構建的一種技術。具體實現可以通過創建一個管道輸入流對象和管道輸出流對象,然后將輸入流和輸出流就行鏈接,生產者通過往管道中寫入數據,而消費者在管道數據流中讀取數據,通過這種方式就實現了線程之間的互相通訊。
具體實現代碼如下所示
public class PipeSolution {
static class DataWareHouse implements Closeable {
private final PipedInputStream pis;
private final PipedOutputStream pos;
public DataWareHouse() throws IOException {
pis = new PipedInputStream();
pos = new PipedOutputStream();
pis.connect(pos); //連接管道
}
//向管道中寫入數據
public void offer(int val) throws IOException {
pos.write(val);
pos.flush();
}
//從管道中取數據.
public int poll() throws IOException {
//當管道中沒有數據,方法阻塞
return pis.read();
}
//關閉管道
@Override
public void close() throws IOException {
if (pis != null) {
pis.close();
}
if (pos != null) {
pos.close();
}
}
}
//consumer類
static class Consumer implements Runnable {
private final DataWareHouse dataWareHouse;
Consumer(DataWareHouse dataWareHouse) {
this.dataWareHouse = dataWareHouse;
}
@Override
public void run() {
try {
//消費者不斷從管道中讀取數據
while (true) {
int num = dataWareHouse.poll();
System.out.println("get data +" + num);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
static class Producer implements Runnable {
private final DataWareHouse dataWareHouse;
private final Random random = new Random();
Producer(DataWareHouse dataWareHouse) {
this.dataWareHouse = dataWareHouse;
}
@Override
public void run() {
try {
//生產者不斷向管道中寫入數據
while (true) {
int num = random.nextInt(256);
dataWareHouse.offer(num);
System.out.println("put data +" + num);
Thread.sleep(1000);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws IOException {
DataWareHouse dataWareHouse = new DataWareHouse();
new Thread(new Producer(dataWareHouse)).start();
new Thread(new Consumer(dataWareHouse)).start();
}
}
阻塞隊列
阻塞隊列(BlockingQueue),具有1. 當隊列滿了的時候阻塞入隊列操作 2. 當隊列空了的時候阻塞出隊列操作 3. 線程安全 的特性,因而阻塞隊列通常被視為實現生產消費者模式最便捷的工具,其中DataWareHouse類實現代碼如下:
static class DataWareHouse {
//共享數據區
private final BlockingQueue<String> blockingQueue;
public DataWareHouse(int capacity) {
this.blockingQueue = new ArrayBlockingQueue<>(capacity);
}
public void offer(String x) {
blockingQueue.put(x);
}
public String poll() {
return blockingQueue.take();
}
}
生產者和消費者類,以及測試代碼和 信號量 section 相同,在此不做重復列舉了。
總結
生產者消費者問題是面試中經常會遇到的題目,本文總結了幾種常見的實現方式,面試過程中通常不必要向面試官描述過多實現細節,說出每種實現方式的特點即可。希望能給大家帶來幫助。