在生產者/消費者模型中,生產者Producer負責生產數據,而消費者Consumer負責使用數據。多個生產者線程會在同一時間運行,生產數據,並放到內存中一個共享的區域。期間,多個消費者線程讀取內存共享區,消費里面的數據。
分析
在下面Java應用程序中,生產者線程向一個線程安全的堆棧緩沖區中寫(PUSH)數據,消費者從該堆棧緩沖區中讀(POP)數據,這樣,這個程序中同時運行的兩個線程共享同一個堆棧緩沖區資源。
類Producer是生產者模型,其中的run方法中定義了生產者線程所做的操作,循環調用push()方法,將生產的100個字母送入堆棧中,每次執行完push操作后,調用sleep方法睡眠一段隨機時間。
類Consumer是消費者模型,循環調用pop方法,從堆棧取出一個字母,一共取100次,每次執行完push操作后,調用sleep方法睡眠一段隨機時間

同步堆棧類SynchronizedStack
package com.ailk.biapp.ci.ProducerAndConsumer; public class SynchronizedStack { private int index = 0; private int size = 100; //共享內存區 private char[] data; public SynchronizedStack(int size){ System.out.println("棧被創建"); this.size = size; data = new char[size]; } /** * 生產數據 * * @param c */ public synchronized void push(char c){ while (index == size){ try{ System.err.println("生產數據滿了"); this.wait();//等待,直到有數據出棧 }catch(InterruptedException e){ Thread.currentThread().interrupt(); e.printStackTrace(); } } data[index] = c; index++; this.notify();//通知其他線程把數據出棧 } /** * 消費數據 * * @return */ public synchronized char pop(){ while (index == 0){ try{ System.err.println("棧空了"); this.wait();// 等待,直到有數據出棧 }catch(InterruptedException e){ Thread.currentThread().interrupt(); e.printStackTrace(); } } index --;//指針向下移動 char ch = data[index]; this.notify();//通知其他線程把數據入棧 return ch; } //顯示堆棧內容 public synchronized void print(){ for(int i = 0; i < data.length; i++){ System.out.println(data[i]); } System.out.println(); this.notify();// 通知其它線程顯示堆棧內容 } }
生產者Product
package com.ailk.biapp.ci.ProducerAndConsumer; public class Producer implements Runnable{ private SynchronizedStack stack; public Producer(SynchronizedStack s){ stack = s; } public void run(){ char ch; for(int i = 0; i< 100; i++){ //隨機產生100個字符 ch = (char) (Math.random() * 26 + 'A'); stack.push(ch); System.out.println("Produced:" + ch); try{ //每一個字符線程就休眠一下 Thread.sleep((int) (Math.random() * 1000)); }catch (InterruptedException e) { } } } }
消費者Consumer
package com.ailk.biapp.ci.ProducerAndConsumer; public class Consumer implements Runnable{ private SynchronizedStack stack; public Consumer(SynchronizedStack s){ stack = s; } public void run() { char ch; for(int i = 0 ; i < 100; i++){ ch = stack.pop(); System.out.println("Consumed:" + ch); } try{ Thread.sleep((int) (Math.random() * 1000)); }catch(InterruptedException e){ } } }
測試:
package com.ailk.biapp.ci.ProducerAndConsumer; public class ProductConsumerTest { public static void main(String args[]){ // 下面的消費者類對象和生產者類對象所操作的是同一個同步堆棧對象 SynchronizedStack stack = new SynchronizedStack(5); Runnable source = new Producer(stack); Runnable sink = new Consumer(stack); Thread t1 = new Thread(source); Thread t2 = new Thread(sink); t1.start(); t2.start(); } }

借鑒於:http://www.cnblogs.com/linjiqin/archive/2011/04/15/2016820.html
