生產者消費者模式通過一個阻塞隊列來解決兩者之間的強耦合問題。阻塞隊列相當於一個緩沖區,平衡消費者和生產者的處理能力。
阻塞隊列有數據——生產者不生產,阻塞隊列沒數據——消費者不消費
一、synchronized+wait+notifyAll
生產
package com.ProductCusromer.method2; import java.util.List; import java.util.Random; /** * @author Millet * @date 2020/3/30 17:59 */ public class Product2 implements Runnable { private List list; public Product2(List list) { this.list = list; } //多個生產者對應多個消費者 @Override public void run() { Random random = new Random(50);//相當於商品 while (true){ synchronized (list){ while(list.size()>0){ //集合中有數據就停止寫入,等待消費者消費完 try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //執行到這說明集合沒有數據了 list.add((random.nextInt(50))); //提示 System.out.println(Thread.currentThread().getName()+"線程生產了:"+ list.get(0)); list.notifyAll(); } } } }
消費者
package com.ProductCusromer.method2; import java.util.List; /** * @author Millet * @date 2020/3/30 18:07 */ public class Consumer2 implements Runnable { private List list; public Consumer2(List list) { this.list = list; } @Override public void run() { while(true){ synchronized (list){ while(list.size()<=0){//適合多生產者多消費者 try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } //提示 System.err.println(Thread.currentThread().getName()+"線程消費了:"+ list.remove(0)); list.notifyAll(); } } } }
二、ReetrantLock+Condition
生產者
package com.ProductCusromer.method3; import java.util.List; import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * @author Millet * @date 2020/3/30 17:59 */ public class Product3 implements Runnable { private List list; private Lock mLock; private Condition mCondition; public Product3(List list, Lock lock, Condition condition) { this.list = list; this.mLock = lock; this.mCondition = condition; } //一個生產者對應一個消費者 @Override public void run() { Random random = new Random(50);//相當於商品 while (true){ mLock.lock(); if(list.size()>0){ //集合中有數據就停止寫入,等待消費者消費完 try { mCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //執行到這說明集合沒有數據了 list.add((random.nextInt(50))); //提示 System.out.println(Thread.currentThread().getName()+"線程生產了:"+ list.get(0)); mCondition.signalAll(); mLock.unlock(); } } }
消費者
package com.ProductCusromer.method2; import java.util.List; /** * @author Millet * @date 2020/3/30 18:07 */ public class Consumer2 implements Runnable { private List list; public Consumer2(List list) { this.list = list; } @Override public void run() { while(true){ synchronized (list){ while(list.size()<=0){//適合多生產者多消費者 try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } //提示 System.err.println(Thread.currentThread().getName()+"線程消費了:"+ list.remove(0)); list.notifyAll(); } } } }
三、BlockingQueue實現
生產者
package com.ProductCusromer.method3; import java.util.concurrent.BlockingQueue; /** * @author Millet * @date 2020/3/30 21:28 */ public class Product4 implements Runnable { private BlockingQueue queue; private String name; public Product4(String name, BlockingQueue queue) { this.queue = queue; this.name = name; } @Override public void run() { while(true){ try { int val = (int) Math.random(); queue.put(val); System.out.println(name + "生產:"+val+".當前隊列長度:"+ queue.size()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消費者
package com.ProductCusromer.method3; import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * @author Millet * @date 2020/3/30 21:28 */ public class Consumer4 implements Runnable{ private BlockingQueue queue; private String name; public Consumer4(String name, BlockingQueue queue) { this.queue = queue; this.name = name; } @Override public void run() { try { while (true){ int val = (int) queue.take(); System.out.println(name + "消費:"+val+".當前隊列長度:"+ queue.size()); } } catch (InterruptedException e) { e.printStackTrace(); } } }
測試
package com.ProductCusromer.method3; import com.ProductCusromer.method1.Consumer1; import com.ProductCusromer.method1.Product1; import javax.persistence.criteria.CriteriaBuilder; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author Millet * @date 2020/3/30 20:33 */ public class Main { public static void main(String[] args) { BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(20); Product4 producer1 = new Product4("生產1號", queue); Product4 producer2 = new Product4("生產2號", queue); Product4 producer3 = new Product4("生產3號", queue); Consumer4 consumer1 = new Consumer4("消費1號", queue); Consumer4 consumer2 = new Consumer4("消費2號", queue); // 開始producer線程進行生產 new Thread(producer1).start(); new Thread(producer2).start(); new Thread(producer3).start(); // 開始consumer線程進行消費。 new Thread(consumer1).start(); new Thread(consumer2).start(); } }
結果