生產者消費者問題是線程模型中的經典問題:生產者和消費者在同一時間段內共用同一存儲空間,生產者向空間里生產數據,而消費者取走數據。
1. 使用阻塞隊列實現生產者消費者模式
生產者:
1 public class Producer implements Runable{ 2 private final BlockingQueue sharedQueue; 3 public Producer(BlockingQueue sharedQueue){ 4 this.sharedQueue = sharedQueue; 5 } 6 7 publlic void run(){ 8 for(i=0;i<10;i++){ 9 try{ 10 System.out.println("Producer:"+i); 11 sharedQueue.put(i); 12 }catch(InterruptedException ex){ 13 System.out.println("exception:"+ex); 14 } 15 } 16 } 17 }
消費者:
public class Consumer implements Runable{ private final BlockingQueue sharedQueue; public Consumer(BlockingQueue sharedQueue){ this.sharedQueue = sharedQueue } public void run(){ while(true){ try{ System.out.println("Consumed:"+sharedQueue.take()); }catch(InterruptedException ex){ System.out.println("Exception:"+ex); } } } }
生產者消費者模式:
public class ProducerConsumerPattern { private static final Logger logger = public static void main(String[] args) { //阻塞隊列 BlockingQueue sharedQueue = new LinkedBlockingDeque(); //創建生產者和消費者,共享隊列 Thread prodThread = new Thread(new Producer(sharedQueue)); Thread consThread = new Thread(new Consumer(sharedQueue)); //開啟生產者和消費者進程 prodThread.start(); consThread.start(); } }
BlockingQueue是一個阻塞隊列,它的存取可以保證只有一個線程在進行,所以根據邏輯,生產者在內存滿的時候進行等待,並且喚醒消費者隊列,反過來消費者在飢餓狀態下等待並喚醒生產者進行生產。
2. wait/notify方法實現
/** * 生產者消費者模式:使用Object.wait() / notify()方法實現 */ public class ProducerConsumer { private static final int CAPACITY = 5; public static void main(String args[]){ Queue<Integer> queue = new LinkedList<Integer>(); Thread producer1 = new Producer("P-1", queue, CAPACITY); Thread producer2 = new Producer("P-2", queue, CAPACITY); Thread consumer1 = new Consumer("C1", queue, CAPACITY); Thread consumer2 = new Consumer("C2", queue, CAPACITY); Thread consumer3 = new Consumer("C3", queue, CAPACITY); producer1.start(); producer2.start(); consumer1.start(); consumer2.start(); consumer3.start(); } /** * 生產者 */ public static class Producer extends Thread{ private Queue<Integer> queue; String name; int maxSize; int i = 0; public Producer(String name, Queue<Integer> queue, int maxSize){ super(name); this.name = name; this.queue = queue; this.maxSize = maxSize; } @Override public void run(){ while(true){ synchronized(queue){ while(queue.size() == maxSize){ try { System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue."); queue.wait(); } catch (Exception ex) { ex.printStackTrace(); } } System.out.println("[" + name + "] Producing value : +" + i); queue.offer(i++); queue.notifyAll(); try {
Thread.sleep(new Random().nextInt(1000));//模擬隨機生產 } catch (InterruptedException e) { e.printStackTrace(); } } } } } /** * 消費者 */ public static class Consumer extends Thread{ private Queue<Integer> queue; String name; int maxSize; public Consumer(String name, Queue<Integer> queue, int maxSize){ super(name); this.name = name; this.queue = queue; this.maxSize = maxSize; } @Override public void run(){ while(true){ synchronized(queue){ while(queue.isEmpty()){ try { System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer"); queue.wait(); } catch (Exception ex) { ex.printStackTrace(); } } int x = queue.poll(); System.out.println("[" + name + "] Consuming value : " + x); queue.notifyAll(); try { Thread.sleep(new Random().nextInt(1000));//模擬隨機消費 } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
wait()
:當緩沖區已滿/空時,生產者/消費者線程停止自己的執行,放棄鎖,使自己處於等待狀態,讓其他線程執行。notify()
:當生產者/消費者向緩沖區放入/取出一個產品時,向其他等待的線程發出可執行的通知,同時放棄鎖,使自己處於等待狀態。
3. 使用Lock和Condition的await() / signal()方法
/** * 生產者消費者模式:使用Lock和Condition實現 * {@link java.util.concurrent.locks.Lock} * {@link java.util.concurrent.locks.Condition} */ public class ProducerConsumerByLock { private static final int CAPACITY = 5; private static final Lock lock = new ReentrantLock(); private static final Condition fullCondition = lock.newCondition(); //隊列滿的條件 private static final Condition emptyCondition = lock.newCondition(); //隊列空的條件 public static void main(String args[]){ Queue<Integer> queue = new LinkedList<Integer>(); Thread producer1 = new Producer("P-1", queue, CAPACITY); Thread producer2 = new Producer("P-2", queue, CAPACITY); Thread consumer1 = new Consumer("C1", queue, CAPACITY); Thread consumer2 = new Consumer("C2", queue, CAPACITY); Thread consumer3 = new Consumer("C3", queue, CAPACITY); producer1.start(); producer2.start(); consumer1.start(); consumer2.start(); consumer3.start(); } /** * 生產者 */ public static class Producer extends Thread{ private Queue<Integer> queue; String name; int maxSize; int i = 0; public Producer(String name, Queue<Integer> queue, int maxSize){ super(name); this.name = name; this.queue = queue; this.maxSize = maxSize; } @Override public void run(){ while(true){ //獲得鎖 lock.lock(); while(queue.size() == maxSize){ try { System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue."); //條件不滿足,生產阻塞 fullCondition.await(); } catch (InterruptedException ex) { ex.printStackTrace(); } } System.out.println("[" + name + "] Producing value : +" + i); queue.offer(i++); //喚醒其他所有生產者、消費者 fullCondition.signalAll(); emptyCondition.signalAll(); //釋放鎖 lock.unlock(); try { Thread.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 消費者 */ public static class Consumer extends Thread{ private Queue<Integer> queue; String name; int maxSize; public Consumer(String name, Queue<Integer> queue, int maxSize){ super(name); this.name = name; this.queue = queue; this.maxSize = maxSize; } @Override public void run(){ while(true){ //獲得鎖 lock.lock(); while(queue.isEmpty()){ try { System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer"); //條件不滿足,消費阻塞 emptyCondition.await(); } catch (Exception ex) { ex.printStackTrace(); } } int x = queue.poll(); System.out.println("[" + name + "] Consuming value : " + x); //喚醒其他所有生產者、消費者 fullCondition.signalAll(); emptyCondition.signalAll(); //釋放鎖 lock.unlock(); try { Thread.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
4. 總結
實現生產者消費者模式有3點:
1. 拿什么作為緩沖區,給生產者和消費者解耦,平衡了生產者和消費者的處理能力。一般使用隊列
2. 構建生產者,隊列滿使得生產者線程阻塞
3. 構建消費者 ,隊列空使得消費者現成阻塞
參考:https://blog.csdn.net/u010983881/article/details/78554671