生產者消費者問題是研究多線程程序時繞不開的經典問題之一。
問題描述如下。使用一個商品的緩存池用來存放商品。當池子滿時,生產者不能往池子里加入商品;當池子空時,消費者不能從池子中取得商品。
使用Object的方法 wait() notify()/notifyAll()實現
獲取鎖和釋放鎖都是針對Object而言的,而和線程無關。試想如果和線程相關,那么一個線程就無法使用多個鎖。
import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class Pool { private int MAX; private int cnt = 0; public Pool(int MAX) { this.MAX = MAX; } public synchronized void produce() throws InterruptedException { while (cnt == MAX) { wait(); } System.out.println("Produce one.. Now:" + ++cnt); notify(); } public synchronized void consume() throws InterruptedException { while (cnt == 0) { wait(); } System.out.println("Consume one.. Now:" + --cnt); notifyAll(); } public static void main(String[] args) { Pool pool = new Pool(6); Executor executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { executor.execute(() -> { try { pool.produce(); } catch (InterruptedException e) { e.printStackTrace(); } }); } for (int i = 0; i < 10; i++) { executor.execute(() -> { try { pool.consume(); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
使用 wait() notify()/notifyAll()的缺點在於在生產者喚醒消費者或者消費者喚醒生產者時,由於生產者和消費者使用同一個鎖,所以生產者也會將生產者喚醒,消費者也會將消費者喚醒。(這一點也被字節跳動的面試官問到啦TAT)
舉例:假設現在池子滿了,然后有3個生產者被阻塞,現在一個消費者拿走一個item,調用notify,此時一個被阻塞的生產者被喚醒了。這個生產者向池子里放入一個產品,並執行notify意圖喚醒被阻塞的消費者,此時這個notify有可能喚醒另外2個被阻塞的生產者中的一個。
Condition可以指定多個等待的條件,因此使用Condition可以解決這一問題。
使用ReentrantLock和Condition的await() signal()/signalAll()實現
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Pool { private int capacity; private Object[] queue; private int count; private Lock lock = new ReentrantLock(); private Condition prod = lock.newCondition(); private Condition cons = lock.newCondition(); public Pool(int capacity) { this.capacity = capacity; queue = new Object[capacity]; count = 0; } public void produce(Object o) throws InterruptedException { lock.lock(); try { while (count == capacity) { prod.await(); } queue[count++] = o; System.out.println(Thread.currentThread().getName() + " produce " + o.toString() + ". current count: " + count); cons.signal(); } finally { lock.unlock(); } } public Object consume() throws InterruptedException { lock.lock(); try { while (count == 0) { cons.await(); } Object res = queue[--count]; queue[count] = null; System.out.println(Thread.currentThread().getName() + " consume " + res.toString() + ". current count: " + count); prod.signal(); return res; } finally { lock.unlock(); } } public static void main(String[] args) { Pool pool = new Pool(10); ExecutorService executorService = Executors.newCachedThreadPool(); int cnt = 20; while (cnt-- > 0) { executorService.execute(() -> { try { pool.produce(new Object()); } catch (InterruptedException e) { e.printStackTrace(); } }); executorService.execute(() -> { try { pool.consume(); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }