一、通過synchronize 中的 wait 和 notify 實現
【1】我們可以將生產者和消費者需要的方法寫在公共類中
1 package com.yintong.concurrent; 2 3 import java.util.LinkedList; 4 5 public class Concurrentcomm { 6 //常量 7 private static int MAX_VALUE = 10; 8 //可以理解為緩存 9 LinkedList<String> linkedList = new LinkedList<>(); 10 Object object = new Object(); 11 /* 12 * 生產者方法 13 */ 14 public void product() throws Exception { 15 synchronized(linkedList) { 16 while(MAX_VALUE == linkedList.size()) { 17 System.out.println("倉庫已滿,【生產者】: 暫時不能執行生產任務!"); 18 linkedList.wait(); 19 } 20 linkedList.push(" 李四 "); 21 System.out.println("【生產者】:生產了一個產品\t【現倉儲量為】:" + linkedList.size()); 22 linkedList.notifyAll(); 23 } 24 } 25 /* 26 * 消費者方法 27 */ 28 public void customer() throws Exception { 29 /* 30 * 根據jdk的void notifyAll()的描述,“解除那些在該對象上調用wait()方法的線程的阻塞狀態。該方法只能在同步方法或同步塊內部調用。 31 * 如果當前線程不是對象所得持有者, 32 * 該方法拋出一個java.lang.IllegalMonitorStateException 異常” 33 * so我們使用同一把鎖 34 */ 35 synchronized (linkedList) { 36 //多線程判斷中使用 while 不要使用 if 否則會出現虛假喚醒問題 37 while(linkedList.size() == 0) { 38 System.out.println("倉庫無貨,【消費者】: 暫時不能執行消費任務!"); 39 linkedList.wait(); 40 } 41 linkedList.pop(); 42 System.out.println("【消費者】:消費了一個產品\t【現倉儲量為】:" + linkedList.size()); 43 linkedList.notifyAll(); 44 } 45 } 46 }
【2】在 main 函數中調用生產者和消費者方法,並加限制即可
1 /** 2 * @author zzx 3 * @desc 生產者與消費者 4 * 5 */ 6 public class Concurrent { 7 //常量 8 private static int MAX_VALUE = 100; 9 10 public static void main(String[] args) { 11 Concurrentcomm con = new Concurrentcomm(); 12 new Thread(new Runnable() { 13 14 @Override 15 public void run() { 16 try { 17 for (int i = 0; i < MAX_VALUE; i++) { 18 Thread.sleep(0); 19 con.product(); 20 } 21 } catch (Exception e) { 22 // TODO Auto-generated catch block 23 e.printStackTrace(); 24 } 25 } 26 }).start(); 27 // 消費者 28 new Thread(new Runnable() { 29 30 @Override 31 public void run() { 32 try { 33 Thread.sleep(10); 34 for (int i = 0; i < MAX_VALUE; i++) { 35 con.customer(); 36 } 37 } catch (Exception e) { 38 e.printStackTrace(); 39 } 40 } 41 }).start(); 42 } 43 }
【3】簡單的生產者與消費者模式就完成了,可以看下運行的結果
二、通過 Lock 中的 await 與 signalAll 實現
【1】我們將公共的屬性和方法放在 Resouce 類中,在資源類中使用 Lock 中的 lock()進行加鎖,控制並發操作。使用 await()方法阻塞線程。使用 signalAll()喚醒線程。
1 /** 2 * 通過 Lock 實現生產者與消費者 3 * 資源類:將公共的資源放在一個單獨的類中,可以將其看做一個產品,自身就就有生產和消費的能力(方法) 4 */ 5 public class ProductAndConsumer { 6 public static void main(String[] args) { 7 Resouce resouce = new Resouce(); 8 //生產者 9 new Thread(()->{ 10 for (int i=1;i<=5;i++) { 11 resouce.product(); 12 } 13 },String.valueOf("生產者")) .start(); 14 15 //消費者 16 new Thread(()->{ 17 for (int i=1;i<=5;i++){ 18 resouce.consumer(); 19 } 20 },String.valueOf("消費者")).start(); 21 } 22 } 23 //資源類 24 class Resouce { 25 private int MAX_VALUE = 3; 26 private int MIN_VALUE = 0; 27 private int number = 0; 28 private Lock lock = new ReentrantLock(); 29 private Condition condition = lock.newCondition(); 30 31 //生產者 32 public void product(){ 33 try { 34 lock.lock(); 35 //如果生產的數量大於最大值則阻塞 36 while(number >= MAX_VALUE){ 37 condition.await(); 38 } 39 number++; 40 System.out.println("【生產者】:生產了一個產品\t【現倉儲量為】:" + number); 41 condition.signalAll(); 42 } catch (InterruptedException e) { 43 e.printStackTrace(); 44 }finally { 45 lock.unlock(); 46 } 47 } 48 49 //消費者 50 public void consumer(){ 51 try { 52 lock.lock(); 53 //如果消費的值=0則阻塞 54 while(number <= MIN_VALUE){ 55 condition.await(); 56 } 57 number--; 58 System.out.println("【消費者】:消費了一個產品\t【現倉儲量為】:" + number); 59 condition.signalAll(); 60 } catch (InterruptedException e) { 61 e.printStackTrace(); 62 }finally { 63 lock.unlock(); 64 } 65 } 66 }
【2】輸出結果展示:
三、synchronized 和 Lock 的區別
【1】原始構成:synchronized 是關鍵字屬於 JVM 層面。底層通過 monitorenter(進入)monitorexit(退出)實現。底層是通過 monitor 對象完成,其實 wait/notify 等方法也依賴於 monitor 對象,只有在同步塊或方法中才能調用 wait/notify 等方法。Lock 是具體類(java.util.concurrent.locks.Lock)是 API 層面的鎖。
【2】使用方法:synchronized 不需要用戶手動釋放鎖,當 synchronized 代碼執行完后,系統會自動釋放鎖。ReentrantLock 則需要用戶手動釋放鎖,若未主動釋放鎖,就可能導致出現死鎖的現象。
【3】等待是否中斷:synchronized 不可中斷,除非拋出異常或者正常運行完成。ReentrantLock 可中斷,1)、設置超時時間 tryLock(long timeout,TimeUnit unit) 2)、lockInterruptibly() 放在代碼塊中,調用 interrupt() 方法可中斷。
【4】加鎖是否公平:synchronized 非公平鎖。ReentrantLock 兩者都可以,默認是非公平鎖,構造方法可以傳入 boolean 值,true 為公平鎖,false 為非公平鎖。
【5】鎖綁定多個條件 Condition:synchronized 沒有。ReentrantLock 用來實現分組喚醒需要喚醒的線程們,可以精確喚醒,而不是像 synchronized 要么隨機喚醒一個線程要么喚醒全部線程。
四、通過阻塞隊列實現生產者與消費者
【1】通過blockQueue 中的 put/take 方法實現生產者與消費者,具體實現如下:當生產者使用put 生產到指定的隊列大小3時,就會阻塞當前線程。這是消費者線程會通過 take 方法消費隊列中的消息。當隊列中沒有消息時,會阻塞,直到有消息消費。
1 public class BlockProductConsumer { 2 public static void main(String[] args) { 3 MyResouce resouce = new MyResouce(new ArrayBlockingQueue(3)); 4 //生產者線程 5 new Thread(()->{ 6 for(int i=1;i<=10;i++){ 7 resouce.product(); 8 } 9 },"生產者").start(); 10 11 //消費者線程 12 new Thread(()->{ 13 for(int i=1;i<=10;i++){ 14 try { 15 resouce.consumer(); 16 } catch (InterruptedException e) { 17 e.printStackTrace(); 18 } 19 } 20 },"消費者").start(); 21 22 try { 23 TimeUnit.SECONDS.sleep(1); 24 resouce.stop(); 25 } catch (InterruptedException e) { 26 e.printStackTrace(); 27 } 28 } 29 } 30 31 32 /** 33 * 公共資源類 34 */ 35 class MyResouce{ 36 //標記 while 無限循環 37 private volatile boolean FLAG = true; 38 //隊列中存入的數值 39 private AtomicInteger atomicInteger = new AtomicInteger(); 40 //組合一個阻塞隊列,通過構造器傳入 41 private BlockingQueue blockingQueue; 42 public MyResouce(BlockingQueue blockingQueue) { 43 this.blockingQueue = blockingQueue; 44 } 45 46 //生產者 47 public void product(){ 48 try { 49 while (FLAG){ 50 blockingQueue.put(String.valueOf(atomicInteger.incrementAndGet())); 51 System.out.println("生產者生產第"+blockingQueue.size()+"個產品"); 52 } 53 } catch (InterruptedException e) { 54 e.printStackTrace(); 55 } 56 } 57 58 //消費者 59 public void consumer() throws InterruptedException { 60 while (FLAG){ 61 blockingQueue.take(); 62 System.out.println("消費者消費第"+(blockingQueue.size()+1)+"個產品"); 63 } 64 } 65 66 public void stop(){ 67 FLAG = false; 68 System.out.println("========================"); 69 } 70 }
【2】效果展示: