-
生產者消費者模式是一個十分經典的多線程協作的模式,弄懂生產者消費者問題能夠讓我們對多線程編程的理解更加深刻。
所謂生產者消費者問題,實際上主要是包含了兩類線程:
一類是生產者線程用於生產數據
一類是消費者線程用於消費數據
為了解耦生產者和消費者的關系,通常會采用共享的數據區域,就像是一個倉庫
生產者生產數據之后直接放置在共享數據區中,並不需要關心消費者的行為
消費者只需要從共享數據區中去獲取數據,並不需要關心生產者的行為
-
Object類的等待和喚醒方法
方法名 | 說明 |
---|---|
void wait() | 導致當前線程等待,直到另一個線程調用該對象的 notify()方法或 notifyAll()方法 |
void notify() | 喚醒正在等待對象監視器的單個線程 |
void notifyAll() | 喚醒正在等待對象監視器的所有線程 |
-
-
桌子類(Desk):定義表示包子數量的變量,定義鎖對象變量,定義標記桌子上有無包子的變量
-
生產者類(Cooker):實現Runnable接口,重寫run()方法,設置線程任務
1.判斷是否有包子,決定當前線程是否執行
2.如果有包子,就進入等待狀態,如果沒有包子,繼續執行,生產包子
3.生產包子之后,更新桌子上包子狀態,喚醒消費者消費包子
-
消費者類(Foodie):實現Runnable接口,重寫run()方法,設置線程任務
1.判斷是否有包子,決定當前線程是否執行
2.如果沒有包子,就進入等待狀態,如果有包子,就消費包子
3.消費包子后,更新桌子上包子狀態,喚醒生產者生產包子
-
測試類(Demo):里面有main方法,main方法中的代碼步驟如下
創建生產者線程和消費者線程對象
分別開啟兩個線程
-

public class Desk { //定義一個標記 //true 就表示桌子上有漢堡包的,此時允許吃貨執行 //false 就表示桌子上沒有漢堡包的,此時允許廚師執行 public static boolean flag = false; //漢堡包的總數量 public static int count = 10; //鎖對象 public static final Object lock = new Object(); } public class Cooker extends Thread { // 生產者步驟: // 1,判斷桌子上是否有漢堡包 // 如果有就等待,如果沒有才生產。 // 2,把漢堡包放在桌子上。 // 3,叫醒等待的消費者開吃。 @Override public void run() { while(true){ synchronized (Desk.lock){ if(Desk.count == 0){ break; }else{ if(!Desk.flag){ //生產 System.out.println("廚師正在生產漢堡包"); Desk.flag = true; Desk.lock.notifyAll(); }else{ try { Desk.lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } } public class Foodie extends Thread { @Override public void run() { // 1,判斷桌子上是否有漢堡包。 // 2,如果沒有就等待。 // 3,如果有就開吃 // 4,吃完之后,桌子上的漢堡包就沒有了 // 叫醒等待的生產者繼續生產 // 漢堡包的總數量減一 //套路: //1. while(true)死循環 //2. synchronized 鎖,鎖對象要唯一 //3. 判斷,共享數據是否結束. 結束 //4. 判斷,共享數據是否結束. 沒有結束 while(true){ synchronized (Desk.lock){ if(Desk.count == 0){ break; }else{ if(Desk.flag){ //有 System.out.println("吃貨在吃漢堡包"); Desk.flag = false; Desk.lock.notifyAll(); Desk.count--; }else{ //沒有就等待 //使用什么對象當做鎖,那么就必須用這個對象去調用等待和喚醒的方法. try { Desk.lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } } public class Demo { public static void main(String[] args) { /*消費者步驟: 1,判斷桌子上是否有漢堡包。 2,如果沒有就等待。 3,如果有就開吃 4,吃完之后,桌子上的漢堡包就沒有了 叫醒等待的生產者繼續生產 漢堡包的總數量減一*/ /*生產者步驟: 1,判斷桌子上是否有漢堡包 如果有就等待,如果沒有才生產。 2,把漢堡包放在桌子上。 3,叫醒等待的消費者開吃。*/ Foodie f = new Foodie(); Cooker c = new Cooker(); f.start(); c.start(); } }
-
-
將Desk類中的變量,采用面向對象的方式封裝起來
-
生產者和消費者類中構造方法接收Desk類對象,之后在run方法中進行使用
-
創建生產者和消費者線程對象,構造方法中傳入Desk類對象
-
開啟兩個線程
-

public class Desk { //定義一個標記 //true 就表示桌子上有漢堡包的,此時允許吃貨執行 //false 就表示桌子上沒有漢堡包的,此時允許廚師執行 //public static boolean flag = false; private boolean flag; //漢堡包的總數量 //public static int count = 10; //以后我們在使用這種必須有默認值的變量 // private int count = 10; private int count; //鎖對象 //public static final Object lock = new Object(); private final Object lock = new Object(); public Desk() { this(false,10); // 在空參內部調用帶參,對成員變量進行賦值,之后就可以直接使用成員變量了 } public Desk(boolean flag, int count) { this.flag = flag; this.count = count; } public boolean isFlag() { return flag; } public void setFlag(boolean flag) { this.flag = flag; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public Object getLock() { return lock; } @Override public String toString() { return "Desk{" + "flag=" + flag + ", count=" + count + ", lock=" + lock + '}'; } } public class Cooker extends Thread { private Desk desk; public Cooker(Desk desk) { this.desk = desk; } // 生產者步驟: // 1,判斷桌子上是否有漢堡包 // 如果有就等待,如果沒有才生產。 // 2,把漢堡包放在桌子上。 // 3,叫醒等待的消費者開吃。 @Override public void run() { while(true){ synchronized (desk.getLock()){ if(desk.getCount() == 0){ break; }else{ //System.out.println("驗證一下是否執行了"); if(!desk.isFlag()){ //生產 System.out.println("廚師正在生產漢堡包"); desk.setFlag(true); desk.getLock().notifyAll(); }else{ try { desk.getLock().wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } } public class Foodie extends Thread { private Desk desk; public Foodie(Desk desk) { this.desk = desk; } @Override public void run() { // 1,判斷桌子上是否有漢堡包。 // 2,如果沒有就等待。 // 3,如果有就開吃 // 4,吃完之后,桌子上的漢堡包就沒有了 // 叫醒等待的生產者繼續生產 // 漢堡包的總數量減一 //套路: //1. while(true)死循環 //2. synchronized 鎖,鎖對象要唯一 //3. 判斷,共享數據是否結束. 結束 //4. 判斷,共享數據是否結束. 沒有結束 while(true){ synchronized (desk.getLock()){ if(desk.getCount() == 0){ break; }else{ //System.out.println("驗證一下是否執行了"); if(desk.isFlag()){ //有 System.out.println("吃貨在吃漢堡包"); desk.setFlag(false); desk.getLock().notifyAll(); desk.setCount(desk.getCount() - 1); }else{ //沒有就等待 //使用什么對象當做鎖,那么就必須用這個對象去調用等待和喚醒的方法. try { desk.getLock().wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } } public class Demo { public static void main(String[] args) { /*消費者步驟: 1,判斷桌子上是否有漢堡包。 2,如果沒有就等待。 3,如果有就開吃 4,吃完之后,桌子上的漢堡包就沒有了 叫醒等待的生產者繼續生產 漢堡包的總數量減一*/ /*生產者步驟: 1,判斷桌子上是否有漢堡包 如果有就等待,如果沒有才生產。 2,把漢堡包放在桌子上。 3,叫醒等待的消費者開吃。*/ Desk desk = new Desk(); Foodie f = new Foodie(desk); Cooker c = new Cooker(desk); f.start(); c.start(); } }
-
ArrayBlockingQueue: 底層是數組,有界
LinkedBlockingQueue: 底層是鏈表,無界.但不是真正的無界,最大為int的最大值
-
BlockingQueue的核心方法:
put(anObject): 將參數放入隊列,如果放不進去會阻塞
take(): 取出第一個數據,取不到會阻塞
public class Demo02 { public static void main(String[] args) throws Exception { // 創建阻塞隊列的對象,容量為 1 ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1); // 存儲元素 arrayBlockingQueue.put("漢堡包"); // 取元素 System.out.println(arrayBlockingQueue.take()); System.out.println(arrayBlockingQueue.take()); // 取不到會阻塞 System.out.println("程序結束了"); } }
-
-
生產者類(Cooker):實現Runnable接口,重寫run()方法,設置線程任務
1.構造方法中接收一個阻塞隊列對象
2.在run方法中循環向阻塞隊列中添加包子
3.打印添加結果
-
消費者類(Foodie):實現Runnable接口,重寫run()方法,設置線程任務
1.構造方法中接收一個阻塞隊列對象
2.在run方法中循環獲取阻塞隊列中的包子
3.打印獲取結果
-
測試類(Demo):里面有main方法,main方法中的代碼步驟如下
創建阻塞隊列對象
創建生產者線程和消費者線程對象,構造方法中傳入阻塞隊列對象
分別開啟兩個線程
-
public class Cooker extends Thread { private ArrayBlockingQueue<String> bd; public Cooker(ArrayBlockingQueue<String> bd) { this.bd = bd; } // 生產者步驟: // 1,判斷桌子上是否有漢堡包 // 如果有就等待,如果沒有才生產。 // 2,把漢堡包放在桌子上。 // 3,叫醒等待的消費者開吃。 @Override public void run() { while (true) { try { bd.put("漢堡包"); System.out.println("廚師放入一個漢堡包"); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class Foodie extends Thread { private ArrayBlockingQueue<String> bd; public Foodie(ArrayBlockingQueue<String> bd) { this.bd = bd; } @Override public void run() { // 1,判斷桌子上是否有漢堡包。 // 2,如果沒有就等待。 // 3,如果有就開吃 // 4,吃完之后,桌子上的漢堡包就沒有了 // 叫醒等待的生產者繼續生產 // 漢堡包的總數量減一 //套路: //1. while(true)死循環 //2. synchronized 鎖,鎖對象要唯一 //3. 判斷,共享數據是否結束. 結束 //4. 判斷,共享數據是否結束. 沒有結束 while (true) { try { String take = bd.take(); System.out.println("吃貨將" + take + "拿出來吃了"); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class Demo { public static void main(String[] args) { ArrayBlockingQueue<String> bd = new ArrayBlockingQueue<>(1); Foodie f = new Foodie(bd); Cooker c = new Cooker(bd); f.start(); c.start(); } }