線程間通信
線程之間除了同步互斥,還要考慮通信。在Java5之前我們的通信方式為:wait 和 notify。Condition的優勢是支持多路等待,即可以定義多個Condition,每個condition控制線程的一條執行通路。傳統方式只能是一路等待
Condition提供不同於Object 監視器方法的行為和語義,如受保證的通知排序,或者在執行通知時不需要保持一個鎖。
Condition接口
Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成截然不同的對象,以便通過將這些對象與任意 Lock 實現組合使用,為每個對象提供多個等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。
public interface Condition {
void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
說明:
await()方法會使當前線程等待,同時釋放當前鎖,當其他線程中使用signal()時或者signalAll()方法時,線程會重新獲得鎖並繼續執行。或者當線程被中斷時,也能跳出等待。這和Object.wait()方法很相似。
awaitUninterruptibly()方法與await()方法基本相同,但awaitUninterruptibly()方法不會在等待過程中響應中斷。
singal()方法用於喚醒一個在等待中的線程。相對的singalAll()方法會喚醒所有在等待中的線程。這和Obejct.notify()方法類似。
condition.await()方法必須在lock.lock()與lock.unlock()方法之間調用。
獲取Condition
Condition實例實質上被綁定到一個鎖上。一個鎖內部可以有多個Condition,即有多路等待和通知。要為特定 Lock 實例獲得 Condition 實例,請使用其 newCondition() 方法。
Condition newCondition() 返回用來與當前Lock實例一起使用的Condition 實例。
類似於 object.wait()和object.notify()的功能。object.wait()與object.notify()需要結合synchronized使用。Condition需要結合ReentrantLock使用。
"虛假喚醒"
所謂"虛假喚醒",即其他地方的代碼觸發了condition.signal(),喚醒condition上等待的線程。但被喚醒的線程仍然不滿足執行條件。
condition通常與條件語句一起使用:
if(!條件){ condition.await(); //不滿足條件,當前線程等待; }
更好的方法是使用while:
while(!條件){ condition.await(); //不滿足條件,當前線程等待; }
在等待Condition時,允許發生"虛假喚醒",這通常作為對基礎平台語義的讓步。若使用"if(!條件)"則被"虛假喚醒"的線程可能繼續執行。所以"while(!條件)"可以防止"虛假喚醒"。建議總是假定這些"虛假喚醒"可能發生,因此總是在一個循環中等待。
例:緩沖隊列的實現。
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class BoundedBuffer { final Lock lock = new ReentrantLock();// 鎖對象 final Condition notFull = lock.newCondition(); //寫線程條件 final Condition notEmpty = lock.newCondition();//讀線程條件 final Object[] items = new Object[100];// 初始化一個長度為100的隊列 int putptr/* 寫索引 */, takeptr/* 讀索引 */, count/* 隊列中存在的數據個數 */; public void put(Object x) throws InterruptedException { lock.lock(); //獲取鎖 try { while (count == items.length) notFull.await();// 當計數器count等於隊列的長度時,不能再插入,因此等待。阻塞寫線程。 items[putptr] = x;//賦值 putptr++; if (putptr == items.length) putptr = 0;// 若寫索引寫到隊列的最后一個位置了,將putptr置為0。 count++; // 每放入一個對象就將計數器加1。 notEmpty.signal(); // 一旦插入就喚醒取數據線程。 } finally { lock.unlock(); // 最后釋放鎖 } } public Object take() throws InterruptedException { lock.lock(); // 獲取鎖 try { while (count == 0) notEmpty.await(); // 如果計數器等於0則等待,即阻塞讀線程。 Object x = items[takeptr]; // 取值 takeptr++; if (takeptr == items.length) takeptr = 0; //若讀鎖應讀到了隊列的最后一個位置了,則讀鎖應置為0;即當takeptr達到隊列長度時,從零開始取 count++; // 每取一個將計數器減1。 notFull.signal(); //枚取走一個就喚醒存線程。 return x; } finally { lock.unlock();// 釋放鎖 } } }
此即Condition的強大之處,假設緩存隊列中已經存滿,那么阻塞的肯定是寫線程,喚醒的肯定是讀線程,相反,阻塞的肯定是讀線程,喚醒的肯定是寫線程,那么假設只有一個Condition會有什么效果呢,緩存隊列中已經存滿,這個Lock不知道喚醒的是讀線程還是寫線程了,如果喚醒的是讀線程,皆大歡喜,如果喚醒的是寫線程,那么線程剛被喚醒,又被阻塞了,這時又去喚醒,這樣就浪費了很多時間。
例:經典問題:三個線程依次打印ABC,代碼示例如下:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class Business { private Lock lock = new ReentrantLock(); private Condition conditionA = lock.newCondition(); private Condition conditionB = lock.newCondition(); private Condition conditionC = lock.newCondition(); private String type = "A"; //內部狀態 /* * 方法的基本要求為: * 1、該方法必須為原子的。 * 2、當前狀態必須滿足條件。若不滿足,則等待;滿足,則執行業務代碼。 * 3、業務執行完畢后,修改狀態,並喚醒指定條件下的線程。 */ public void printA() { lock.lock(); //鎖,保證了線程安全。 try { while (type != "A") { //type不為A, try { conditionA.await(); //將當前線程阻塞於conditionA對象上,將被阻塞。 } catch (InterruptedException e) { e.printStackTrace(); } } //type為A,則執行。 System.out.println(Thread.currentThread().getName() + " 正在打印A"); type = "B"; //將type設置為B。 conditionB.signal(); //喚醒在等待conditionB對象上的一個線程。將信號傳遞出去。 } finally { lock.unlock(); //解鎖 } } public void printB() { lock.lock(); //鎖 try { while (type != "B") { //type不為B, try { conditionB.await(); //將當前線程阻塞於conditionB對象上,將被阻塞。 } catch (InterruptedException e) { e.printStackTrace(); } } //type為B,則執行。 System.out.println(Thread.currentThread().getName() + " 正在打印B"); type = "C"; //將type設置為C。 conditionC.signal(); //喚醒在等待conditionC對象上的一個線程。將信號傳遞出去。 } finally { lock.unlock(); //解鎖 } } public void printC() { lock.lock(); //鎖 try { while (type != "C") { try { conditionC.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + " 正在打印C"); type = "A"; conditionA.signal(); } finally { lock.unlock(); //解鎖 } } } public class Test{ public static void main(String[] args) { final Business business = new Business();//業務對象。 //線程1號,打印10次A。 Thread ta = new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10;i++){ business.printA(); } } }); //線程2號,打印10次B。 Thread tb = new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10;i++){ business.printB(); } } }); //線程3號,打印10次C。 Thread tc = new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10;i++){ business.printC(); } } }); //執行3條線程。 ta.start(); tb.start(); tc.start(); } }
執行代碼,控制台依次顯示了A、B、C,10次。可以看到3條線程之間共享Business類中的資源type,且3條線程之間進行了有效的協調。
