生產者-消費者(producer-consumer)問題,也稱作有界緩沖區(bounded-buffer)問題,兩個進程共享一個公共的固定大小的緩沖區。
其中一個是生產者,用於將消息放入緩沖區;另外一個是消費者,用於從緩沖區中取出消息。
問題出現在當緩沖區已經滿了,而此時生產者還想向其中放入一個新的數據項的情形,其解決方法是讓生產者此時進行休眠,等待消費者從緩沖區中取走了一個或者多個數據后再去喚醒它。
同樣地,當緩沖區已經空了,而消費者還想去取消息,此時也可以讓消費者進行休眠,等待生產者放入一個或者多個數據時再喚醒它。
Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成截然不同的對象,以便通過將這些對象與任意 Lock 實現組合使用,為每個對象提供多個等待 set (wait-set)。
其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。
在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統線程的通信方式,Condition都可以實現,這里注意,Condition是被綁定到Lock上的,要創建一個Lock的Condition必須用newCondition()方法。
這樣看來,Condition和傳統的線程通信沒什么區別,Condition的強大之處在於它可以為多個線程間建立不同的Condition,下面引入API中的一段代碼,加以說明。
代碼:
class BoundedBuffer { final Lock lock = new ReentrantLock();//鎖對象 final Condition notFull = lock.newCondition();//寫線程條件 final Condition notEmpty = lock.newCondition();//讀線程條件 final Object[] items = new Object[100];//緩存隊列 int putptr/*寫索引*/, takeptr/*讀索引*/, count/*隊列中存在的數據個數*/; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length)//如果隊列滿了 notFull.await();//阻塞寫線程 items[putptr] = x;//賦值 if (++putptr == items.length) putptr = 0;//如果寫索引寫到隊列的最后一個位置了,那么置為0 ++count;//個數++ notEmpty.signal();//喚醒讀線程 } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0)//如果隊列為空 notEmpty.await();//阻塞讀線程 Object x = items[takeptr];//取值 if (++takeptr == items.length) takeptr = 0;//如果讀索引讀到隊列的最后一個位置了,那么置為0 --count;//個數-- notFull.signal();//喚醒寫線程 return x; } finally { lock.unlock(); } } }
這是一個處於多線程工作環境下的緩存區,緩存區提供了兩個方法,put和take,put是存數據,take是取數據,內部有個緩存隊列,具體變量和方法說明見代碼,
這個緩存區類實現的功能:有多個線程往里面存數據和從里面取數據,其緩存隊列(先進先出后進后出)能緩存的最大數值是100,多個線程間是互斥的,
當緩存隊列中存儲的值達到100時,將寫線程阻塞,並喚醒讀線程,當緩存隊列中存儲的值為0時,將讀線程阻塞,並喚醒寫線程,這也是ArrayBlockingQueue的內部實現。
下面分析一下代碼的執行過程:
1. 一個寫線程執行,調用put方法;
2. 判斷count是否為100,顯然沒有100;
3. 繼續執行,存入值;
4. 判斷當前寫入的索引位置++后,是否和100相等,相等將寫入索引值變為0,並將count+1;
5. 僅喚醒讀線程阻塞隊列中的一個;
6. 一個讀線程執行,調用take方法;
7. ……
8. 僅喚醒寫線程阻塞隊列中的一個。
這就是多個Condition的強大之處,假設緩存隊列中已經存滿,那么阻塞的肯定是寫線程,喚醒的肯定是讀線程,相反,阻塞的肯定是讀線程,喚醒的肯定是寫線程,那么假設只有一個Condition會有什么效果呢,緩存隊列中已經存滿,這個Lock不知道喚醒的是讀線程還是寫線程了,如果喚醒的是讀線程,皆大歡喜,如果喚醒的是寫線程,那么線程剛被喚醒,又被阻塞了,這時又去喚醒,這樣就浪費了很多時間。
Condition的應用:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Main { static class NumberWrapper { public int value = 1; } public static void main(String[] args) { //初始化可重入鎖 final Lock lock = new ReentrantLock(); //第一個條件當屏幕上輸出到3 final Condition reachThreeCondition = lock.newCondition(); //第二個條件當屏幕上輸出到6 final Condition reachSixCondition = lock.newCondition(); //NumberWrapper只是為了封裝一個數字,一邊可以將數字對象共享,並可以設置為final //注意這里不要用Integer, Integer 是不可變對象 final NumberWrapper num = new NumberWrapper(); //初始化A線程 Thread threadA = new Thread(new Runnable() { @Override public void run() { //需要先獲得鎖 lock.lock(); try { System.out.println("threadA start write"); //A線程先輸出前3個數 while (num.value <= 3) { System.out.println(num.value); num.value++; } //輸出到3時要signal,告訴B線程可以開始了 reachThreeCondition.signal(); } finally { lock.unlock(); } lock.lock(); try { //等待輸出6的條件 reachSixCondition.await(); System.out.println("threadA start write"); //輸出剩余數字 while (num.value <= 9) { System.out.println(num.value); num.value++; } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }); Thread threadB = new Thread(new Runnable() { @Override public void run() { try { lock.lock(); while (num.value <= 3) { //等待3輸出完畢的信號 reachThreeCondition.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } try { lock.lock(); //已經收到信號,開始輸出4,5,6 System.out.println("threadB start write"); while (num.value <= 6) { System.out.println(num.value); num.value++; } //4,5,6輸出完畢,告訴A線程6輸出完了 reachSixCondition.signal(); } finally { lock.unlock(); } } }); //啟動兩個線程 threadB.start(); threadA.start(); } }

threadA start write 1 2 3 threadB start write 4 5 6 threadA start write 7 8 9
基本思路就是首先要A線程先寫1,2,3,這時候B線程應該等待reachThredCondition信號,而當A線程寫完3之后就通過signal告訴B線程“我寫到3了,該你了”,
這時候A線程要等reachSixCondition信號,同時B線程得到通知,開始寫4,5,6,寫完4,5,6之后B線程通知A線程reachSixCondition條件成立了,這時候A線程就開始寫剩下的7,8,9了。
Java官方提供的例子:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Main { public static void main(String[] args) { final BoundedBuffer boundedBuffer = new BoundedBuffer(); Thread t1 = new Thread(new Runnable() { @Override public void run() { System.out.println("t1 run"); for (int i=0;i<20;i++) { try { System.out.println("putting.."); boundedBuffer.put(Integer.valueOf(i)); } catch (InterruptedException e) { e.printStackTrace(); } } } }) ; Thread t2 = new Thread(new Runnable() { @Override public void run() { for (int i=0;i<20;i++) { try { Object val = boundedBuffer.take(); System.out.println(val); } catch (InterruptedException e) { e.printStackTrace(); } } } }) ; t1.start(); t2.start(); } /** * BoundedBuffer 是一個定長100的集合,當集合中沒有元素時,take方法需要等待,直到有元素時才返回元素 * 當其中的元素數達到最大值時,要等待直到元素被take之后才執行put的操作 * @author yukaizhao * */ static class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { System .out.println("put wait lock"); lock.lock(); System.out.println("put get lock"); try { while (count == items.length) { System.out.println("buffer full, please wait"); notFull.await(); } items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { System.out.println("take wait lock"); lock.lock(); System.out.println("take get lock"); try { while (count == 0) { System.out.println("no elements, please wait"); notEmpty.await(); } Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } } }

t1 run putting.. take wait lock take get lock no elements, please wait put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock putting.. put wait lock put get lock 0 take wait lock take get lock 1 take wait lock take get lock 2 take wait lock take get lock 3 take wait lock take get lock 4 take wait lock take get lock 5 take wait lock take get lock 6 take wait lock take get lock 7 take wait lock take get lock 8 take wait lock take get lock 9 take wait lock take get lock 10 take wait lock take get lock 11 take wait lock take get lock 12 take wait lock take get lock 13 take wait lock take get lock 14 take wait lock take get lock 15 take wait lock take get lock 16 take wait lock take get lock 17 take wait lock take get lock 18 take wait lock take get lock 19
也可以修改t1,t2線程里面的循環,改成大於100,可能會碰到集合已滿的情況。
這個示例中BoundedBuffer是一個固定長度的集合,
這個在其put操作時,如果發現長度已經達到最大長度,那么要等待notFull信號才能繼續put,如果得到notFull信號會像集合中添加元素,並且put操作會發出notEmpty的信號,
而在其take方法中如果發現集合長度為空,那么會等待notEmpty的信號,接受到notEmpty信號才能繼續take,同時如果拿到一個元素,那么會發出notFull的信號。
Condition與Object中的wati,notify,notifyAll區別:
1.Condition中的await()方法相當於Object的wait()方法,Condition中的signal()方法相當於Object的notify()方法,Condition中的signalAll()相當於Object的notifyAll()方法。
不同的是,Object中的這些方法是和同步鎖捆綁使用的;而Condition是需要與互斥鎖/共享鎖捆綁使用的。
2.Condition它更強大的地方在於:能夠更加精細的控制多線程的休眠與喚醒。對於同一個鎖,我們可以創建多個Condition,在不同的情況下使用不同的Condition。
例如,假如多線程讀/寫同一個緩沖區:當向緩沖區中寫入數據之后,喚醒"讀線程";當從緩沖區讀出數據之后,喚醒"寫線程";並且當緩沖區滿的時候,"寫線程"需要等待;當緩沖區為空時,"讀線程"需要等待。
如果采用Object類中的wait(),notify(),notifyAll()實現該緩沖區,當向緩沖區寫入數據之后需要喚醒"讀線程"時,不可能通過notify()或notifyAll()明確的指定喚醒"讀線程",而只能通過notifyAll喚醒所有線程(但是notifyAll無法區分喚醒的線程是讀線程,還是寫線程)。 但是,通過Condition,就能明確的指定喚醒讀線程。
http://outofmemory.cn/java/java.util.concurrent/lock-reentrantlock-condition
http://blog.csdn.net/ghsau/article/details/7481142