(手寫生產者消費者模型,寫BlockingQueue較簡便 )
1、背景
生產者生產數據到緩沖區中,消費者從緩沖區中取數據。
如果緩沖區已經滿了,則生產者線程阻塞;
如果緩沖區為空,那么消費者線程阻塞。
2、方式一:synchronized、wait和notify
定義Resouce資源類,類中定義資源池大小。資源類的add()和remove()方法是synchronized 的。生產者/消費者線程持有一個資源類Resouce的成員變量,Main方法中通過構造函數將Resouce類傳入,線程run方法中操作Resouce類的add,remove方法

package producerConsumer; //wait 和 notify public class ProducerConsumerWithWaitNofity { public static void main(String[] args) { Resource resource = new Resource(); //生產者線程 ProducerThread p1 = new ProducerThread(resource); ProducerThread p2 = new ProducerThread(resource); ProducerThread p3 = new ProducerThread(resource); //消費者線程 ConsumerThread c1 = new ConsumerThread(resource); //ConsumerThread c2 = new ConsumerThread(resource); //ConsumerThread c3 = new ConsumerThread(resource); p1.start(); p2.start(); p3.start(); c1.start(); //c2.start(); //c3.start(); } } /** * 公共資源類 * @author * */ class Resource{//重要 //當前資源數量 private int num = 0; //資源池中允許存放的資源數目 private int size = 10; /** * 從資源池中取走資源 */ public synchronized void remove(){ if(num > 0){ num--; System.out.println("消費者" + Thread.currentThread().getName() + "消耗一件資源," + "當前線程池有" + num + "個"); notifyAll();//通知生產者生產資源 }else{ try { //如果沒有資源,則消費者進入等待狀態 wait(); System.out.println("消費者" + Thread.currentThread().getName() + "線程進入等待狀態"); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 向資源池中添加資源 */ public synchronized void add(){ if(num < size){ num++; System.out.println(Thread.currentThread().getName() + "生產一件資源,當前資源池有" + num + "個"); //通知等待的消費者 notifyAll(); }else{ //如果當前資源池中有10件資源 try{ wait();//生產者進入等待狀態,並釋放鎖 System.out.println(Thread.currentThread().getName()+"線程進入等待"); }catch(InterruptedException e){ e.printStackTrace(); } } } } /** * 消費者線程 */ class ConsumerThread extends Thread{ private Resource resource; public ConsumerThread(Resource resource){ this.resource = resource; } @Override public void run() { while(true){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } resource.remove(); } } } /** * 生產者線程 */ class ProducerThread extends Thread{ private Resource resource; public ProducerThread(Resource resource){ this.resource = resource; } @Override public void run() { //不斷地生產資源 while(true){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } resource.add(); } } }
3、方式二:lock和condition的await、signalAll

package producerConsumer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 使用Lock 和 Condition解決生產者消費者問題 * @author tangzhijing * */ public class LockCondition { public static void main(String[] args) { Lock lock = new ReentrantLock(); Condition producerCondition = lock.newCondition(); Condition consumerCondition = lock.newCondition(); Resource2 resource = new Resource2(lock,producerCondition,consumerCondition); //生產者線程 ProducerThread2 producer1 = new ProducerThread2(resource); //消費者線程 ConsumerThread2 consumer1 = new ConsumerThread2(resource); ConsumerThread2 consumer2 = new ConsumerThread2(resource); ConsumerThread2 consumer3 = new ConsumerThread2(resource); producer1.start(); consumer1.start(); consumer2.start(); consumer3.start(); } } /** * 消費者線程 */ class ConsumerThread2 extends Thread{ private Resource2 resource; public ConsumerThread2(Resource2 resource){ this.resource = resource; //setName("消費者"); } public void run(){ while(true){ try { Thread.sleep((long) (1000 * Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } resource.remove(); } } } /** * 生產者線程 * @author tangzhijing * */ class ProducerThread2 extends Thread{ private Resource2 resource; public ProducerThread2(Resource2 resource){ this.resource = resource; setName("生產者"); } public void run(){ while(true){ try { Thread.sleep((long) (1000 * Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } resource.add(); } } } /** * 公共資源類 * @author tangzhijing * */ class Resource2{ private int num = 0;//當前資源數量 private int size = 10;//資源池中允許存放的資源數目 private Lock lock; private Condition producerCondition; private Condition consumerCondition; public Resource2(Lock lock, Condition producerCondition, Condition consumerCondition) { this.lock = lock; this.producerCondition = producerCondition; this.consumerCondition = consumerCondition; } /** * 向資源池中添加資源 */ public void add(){ lock.lock(); try{ if(num < size){ num++; System.out.println(Thread.currentThread().getName() + "生產一件資源,當前資源池有" + num + "個"); //喚醒等待的消費者 consumerCondition.signalAll(); }else{ //讓生產者線程等待 try { producerCondition.await(); System.out.println(Thread.currentThread().getName() + "線程進入等待"); } catch (InterruptedException e) { e.printStackTrace(); } } }finally{ lock.unlock(); } } /** * 從資源池中取走資源 */ public void remove(){ lock.lock(); try{ if(num > 0){ num--; System.out.println("消費者" + Thread.currentThread().getName() + "消耗一件資源," + "當前資源池有" + num + "個"); producerCondition.signalAll();//喚醒等待的生產者 }else{ try { consumerCondition.await(); System.out.println(Thread.currentThread().getName() + "線程進入等待"); } catch (InterruptedException e) { e.printStackTrace(); }//讓消費者等待 } }finally{ lock.unlock(); } } }
4、方式三:BlockingQueue
定義Resouce資源類,資源類持有一個BlockingQueue。生產者/消費者線程持有一個資源類Resouce的成員變量,Main方法中通過構造函數將Resouce類傳入,線程run方法中操作Resouce類的add,remove方法,add,remove調用Queue的put()和take()

package producerConsumer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; //使用阻塞隊列BlockingQueue解決生產者消費者 public class BlockingQueueConsumerProducer { public static void main(String[] args) { Resource3 resource = new Resource3(); //生產者線程 ProducerThread3 p = new ProducerThread3(resource); //多個消費者 ConsumerThread3 c1 = new ConsumerThread3(resource); ConsumerThread3 c2 = new ConsumerThread3(resource); ConsumerThread3 c3 = new ConsumerThread3(resource); p.start(); c1.start(); c2.start(); c3.start(); } } /** * 消費者線程 * @author tangzhijing * */ class ConsumerThread3 extends Thread { private Resource3 resource3; public ConsumerThread3(Resource3 resource) { this.resource3 = resource; //setName("消費者"); } public void run() { while (true) { try { Thread.sleep((long) (1000 * Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } resource3.remove(); } } } /** * 生產者線程 * @author tangzhijing * */ class ProducerThread3 extends Thread{ private Resource3 resource3; public ProducerThread3(Resource3 resource) { this.resource3 = resource; //setName("生產者"); } public void run() { while (true) { try { Thread.sleep((long) (1000 * Math.random())); } catch (InterruptedException e) { e.printStackTrace(); } resource3.add(); } } } class Resource3{ private BlockingQueue<Integer> resourceQueue = new LinkedBlockingQueue<>(10); /** * 向資源池中添加資源 */ public void add(){ try { resourceQueue.put(1); //1當做生產和消費的Integer資源 System.out.println("生產者" + Thread.currentThread().getName() + "生產一件資源," + "當前資源池有" + resourceQueue.size() + "個資源"); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 向資源池中移除資源 */ public void remove(){ try { resourceQueue.take(); System.out.println("消費者" + Thread.currentThread().getName() + "消耗一件資源," + "當前資源池有" + resourceQueue.size() + "個資源"); } catch (InterruptedException e) { e.printStackTrace(); } } }
為什么用put和take:
為什么用put和take:https://blog.csdn.net/qiuchaoxi/article/details/80359462