什么是生產者/消費者模式?
某個模塊負責產生數據,這些數據由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數、線程、進程等)。產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。在生產者與消費者之間在加個緩沖區,我們形象的稱之為倉庫,生產者負責往倉庫了進商品,而消費者負責從倉庫里拿商品,這就構成了生產者消費者模式。結構圖如下:
生產者消費者模式有如下幾個優點:
1、解耦
由於有緩沖區的存在,生產者和消費者之間不直接依賴,耦合度降低。
2、支持並發
由於生產者與消費者是兩個獨立的並發體,他們之間是用緩沖區作為橋梁連接,生產者只需要往緩沖區里丟數據,就可以繼續生產下一個數據,而消費者只需要從緩沖區了拿數據即可,這樣就不會因為彼此的處理速度而發生阻塞。
3、支持忙閑不均
緩沖區還有另一個好處。如果制造數據的速度時快時慢,緩沖區的好處就體現出來 了。當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。 等生產者的制造速度慢下來,消費者再慢慢處理掉。
生產者-消費者模型准確說應該是“生產者-倉儲-消費者”模型,這樣的模型遵循如下的規則:
1、生產者僅僅在倉儲未滿時候生產,倉滿則停止生產。
2、消費者僅僅在倉儲有產品時候才能消費,倉空則等待。
3、當消費者發現倉儲沒產品可消費時候會通知生產者生產。
4、生產者在生產出可消費產品時候,應該通知等待的消費者去消費
此模型將要結合java.lang.Object的wait與notify、notifyAll方法來實現以上的需求。實例代碼如下:
創建所謂的“倉庫”,此類是(本質上:共同訪問的)共享數據區域
//此類是(本質上:共同訪問的)共享數據區域 public class SyncStack { private String[] str = new String[10]; private int index; //供生產者調用 public synchronized void push(String sst){ if(index == sst.length()){ try { wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } this.notify();//喚醒在此對象監視器上等待的單個線程 str[index] = sst; index++; } //供消費者調用 public synchronized String pop(){ if(index == 0){ try { wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } this.notify(); index--; String product = str[index]; return product; } //就是定義一個返回值為數組的方法,返回的是一個String[]引用 public String[] pro(){ return str; } }
創建生產者:
public class Producter implements Runnable { private SyncStack stack; public Producter(SyncStack stack){ this.stack = stack; } public void run(){ for(int i = 0;i<stack.pro().length;i++){ String producter = "產品" + i; stack.push(producter); System.out.println("生產了:" + producter); try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
創建消費者:
public class Consumer implements Runnable { private SyncStack stack; public Consumer(SyncStack stack){ this.stack = stack; } public void run(){ for(int i=0;i<stack.pro().length;i++){ String consumer = stack.pop(); System.out.println("消費了:" + consumer ); try { Thread.sleep(400); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
測試類:
public class TestDeam { public static void main(String[] args) { SyncStack stack = new SyncStack(); Consumer p = new Consumer(stack); Producter c = new Producter(stack); new Thread(p).start(); new Thread(c).start(); } }
測試結果:
生產了:產品0
消費了:產品0
生產了:產品1
生產了:產品2
消費了:產品2
生產了:產品3
消費了:產品3
生產了:產品4
生產了:產品5
生產了:產品6
消費了:產品5
生產了:產品7
消費了:產品6
消費了:產品7
生產了:產品8
生產了:產品9
消費了:產品8
消費了:產品9
消費了:產品4
消費了:產品1