13.死磕Java並發-----J.U.C之Condition
12.Condition使用總結
11.Java並發編程系列之十七:Condition接口
===
13.死磕Java並發-----J.U.C之Condition
此篇博客所有源碼均來自JDK 1.8
在沒有Lock之前,我們使用synchronized來控制同步,配合Object的wait()、notify()系列方法可以實現等待/通知模式。在Java SE5后,Java提供了Lock接口,相對於Synchronized而言,Lock提供了條件Condition,對線程的等待、喚醒操作更加詳細和靈活。下圖是Condition與Object的監視器方法的對比(摘自《Java並發編程的藝術》):
Condition提供了一系列的方法來對阻塞和喚醒線程:
- await() :造成當前線程在接到信號或被中斷之前一直處於等待狀態。
- await(long time, TimeUnit unit) :造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態。
- awaitNanos(long nanosTimeout) :造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態。返回值表示剩余時間,如果在nanosTimesout之前喚醒,那么返回值 = nanosTimeout - 消耗時間,如果返回值 <= 0 ,則可以認定它已經超時了。
- awaitUninterruptibly() :造成當前線程在接到信號之前一直處於等待狀態。【注意:該方法對中斷不敏感】。
- awaitUntil(Date deadline) :造成當前線程在接到信號、被中斷或到達指定最后期限之前一直處於等待狀態。如果沒有到指定時間就被通知,則返回true,否則表示到了指定時間,返回返回false。
- signal():喚醒一個等待線程。該線程從等待方法返回前必須獲得與Condition相關的鎖。
- signal()All:喚醒所有等待線程。能夠從等待方法返回的線程必須獲得與Condition相關的鎖。
Condition是一種廣義上的條件隊列。他為線程提供了一種更為靈活的等待/通知模式,線程在調用await方法后執行掛起操作,直到線程等待的某個條件為真時才會被喚醒。Condition必須要配合鎖一起使用,因為對共享狀態變量的訪問發生在多線程環境下。一個Condition的實例必須與一個Lock綁定,因此Condition一般都是作為Lock的內部實現。
Condtion的實現
獲取一個Condition必須要通過Lock的newCondition()方法。該方法定義在接口Lock下面,返回的結果是綁定到此 Lock 實例的新 Condition 實例。Condition為一個接口,其下僅有一個實現類ConditionObject,由於Condition的操作需要獲取相關的鎖,而AQS則是同步鎖的實現基礎,所以ConditionObject則定義為AQS的內部類。定義如下:
public class ConditionObject implements Condition, java.io.Serializable { }
等待隊列
每個Condition對象都包含着一個FIFO隊列,該隊列是Condition對象通知/等待功能的關鍵。在隊列中每一個節點都包含着一個線程引用,該線程就是在該Condition對象上等待的線程。我們看Condition的定義就明白了:
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; //頭節點 private transient Node firstWaiter; //尾節點 private transient Node lastWaiter; public ConditionObject() { } /** 省略方法 **/ }
從上面代碼可以看出Condition擁有首節點(firstWaiter),尾節點(lastWaiter)。當前線程調用await()方法,將會以當前線程構造成一個節點(Node),並將節點加入到該隊列的尾部。結構如下:
Node里面包含了當前線程的引用。Node定義與AQS的CLH同步隊列的節點使用的都是同一個類(AbstractQueuedSynchronized.Node靜態內部類)。
Condition的隊列結構比CLH同步隊列的結構簡單些,新增過程較為簡單只需要將原尾節點的nextWaiter指向新增節點,然后更新lastWaiter即可。
等待
調用Condition的await()方法會使當前線程進入等待狀態,同時會加入到Condition等待隊列同時釋放鎖。當從await()方法返回時,當前線程一定是獲取了Condition相關連的鎖。
public final void await() throws InterruptedException { // 當前線程中斷 if (Thread.interrupted()) throw new InterruptedException(); //當前線程加入等待隊列 Node node = addConditionWaiter(); //釋放鎖 long savedState = fullyRelease(node); int interruptMode = 0; /** * 檢測此節點的線程是否在同步隊上,如果不在,則說明該線程還不具備競爭鎖的資格,則繼續等待 * 直到檢測到此節點在同步隊列上 */ while (!isOnSyncQueue(node)) { //線程掛起 LockSupport.park(this); //如果已經中斷了,則退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //競爭同步狀態 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; //清理下條件隊列中的不是在等待條件的節點 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
此段代碼的邏輯是:首先將當前線程新建一個節點同時加入到條件隊列中,然后釋放當前線程持有的同步狀態。然后則是不斷檢測該節點代表的線程釋放出現在CLH同步隊列中(收到signal信號之后就會在AQS隊列中檢測到),如果不存在則一直掛起,否則參與競爭同步狀態。
加入條件隊列(addConditionWaiter())源碼如下:
private Node addConditionWaiter() { Node t = lastWaiter; //尾節點 //Node的節點狀態如果不為CONDITION,則表示該節點不處於等待狀態,需要清除節點 if (t != null && t.waitStatus != Node.CONDITION) { //清除條件隊列中所有狀態不為Condition的節點 unlinkCancelledWaiters(); t = lastWaiter; } //當前線程新建節點,狀態CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); /** * 將該節點加入到條件隊列中最后一個位置 */ if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
該方法主要是將當前線程加入到Condition條件隊列中。當然在加入到尾節點之前會清楚所有狀態不為Condition的節點。
fullyRelease(Node node),負責釋放該線程持有的鎖。
final long fullyRelease(Node node) { boolean failed = true; try { //節點狀態--其實就是持有鎖的數量 long savedState = getState(); //釋放鎖 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
isOnSyncQueue(Node node):如果一個節點剛開始在條件隊列上,現在在同步隊列上獲取鎖則返回true
final boolean isOnSyncQueue(Node node) { //狀態為Condition,獲取前驅節點為null,返回false if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //后繼節點不為null,肯定在CLH同步隊列中 if (node.next != null) return true; return findNodeFromTail(node); }
unlinkCancelledWaiters():負責將條件隊列中狀態不為Condition的節點刪除
private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
通知
調用Condition的signal()方法,將會喚醒在等待隊列中等待最長時間的節點(條件隊列里的首節點),在喚醒節點前,會將節點移到CLH同步隊列中。
public final void signal() { //檢測當前線程是否為擁有鎖的獨 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //頭節點,喚醒條件隊列中的第一個節點 Node first = firstWaiter; if (first != null) doSignal(first); //喚醒 }
該方法首先會判斷當前線程是否已經獲得了鎖,這是前置條件。然后喚醒條件隊列中的頭節點。
doSignal(Node first):喚醒頭節點
private void doSignal(Node first) { do { //修改頭結點,完成舊頭結點的移出工作 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
doSignal(Node first)主要是做兩件事:1.修改頭節點,2.調用transferForSignal(Node first) 方法將節點移動到CLH同步隊列中。transferForSignal(Node first)源碼如下:
final boolean transferForSignal(Node node) { //將該節點從狀態CONDITION改變為初始狀態0, if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //將節點加入到syn隊列中去,返回的是syn隊列中node節點前面的一個節點 Node p = enq(node); int ws = p.waitStatus; //如果結點p的狀態為cancel 或者修改waitStatus失敗,則直接喚醒 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
整個通知的流程如下:
- 判斷當前線程是否已經獲取了鎖,如果沒有獲取則直接拋出異常,因為獲取鎖為通知的前置條件。
- 如果線程已經獲取了鎖,則將喚醒條件隊列的首節點
- 喚醒首節點是先將條件隊列中的頭節點移出,然后調用AQS的enq(Node node)方法將其安全地移到CLH同步隊列中
- 最后判斷如果該節點的同步狀態是否為Cancel,或者修改狀態為Signal失敗時,則直接調用LockSupport喚醒該節點的線程。
總結
一個線程獲取鎖后,通過調用Condition的await()方法,會將當前線程先加入到條件隊列中,然后釋放鎖,最后通過isOnSyncQueue(Node node)方法不斷自檢看節點是否已經在CLH同步隊列了,如果是則嘗試獲取鎖,否則一直掛起。當線程調用signal()方法后,程序首先檢查當前線程是否獲取了鎖,然后通過doSignal(Node first)方法喚醒CLH同步隊列的首節點。被喚醒的線程,將從await()方法中的while循環中退出來,然后調用acquireQueued()方法競爭同步狀態。
Condition的應用
只知道原理,如果不知道使用那就坑爹了,下面是用Condition實現的生產者消費者問題:
public class ConditionTest { private LinkedList<String> buffer; //容器 private int maxSize ; //容器最大 private Lock lock; private Condition fullCondition; private Condition notFullCondition; ConditionTest(int maxSize){ this.maxSize = maxSize; buffer = new LinkedList<String>(); lock = new ReentrantLock(); fullCondition = lock.newCondition(); notFullCondition = lock.newCondition(); } public void set(String string) throws InterruptedException { lock.lock(); //獲取鎖 try { while (maxSize == buffer.size()){ notFullCondition.await(); //滿了,添加的線程進入等待狀態 } buffer.add(string); fullCondition.signal(); } finally { lock.unlock(); //記得釋放鎖 } } public String get() throws InterruptedException { String string; lock.lock(); try { while (buffer.size() == 0){ fullCondition.await(); } string = buffer.poll(); notFullCondition.signal(); } finally { lock.unlock(); } return string; } }
12.Condition使用總結
一、介紹
Condition是在java 1.5中才出現的,它用來替代傳統的Object的wait()、notify()實現線程間的協作,相比使用Object的wait()、notify(),使用Condition1的await()、signal()這種方式實現線程間協作更加安全和高效。簡單說,他的作用是使得某些線程一起等待某個條件(Condition),只有當該條件具備(signal 或者 signalAll方法被調用)時,這些等待線程才會被喚醒,從而重新爭奪鎖。
二、使用
Condition是個接口,基本的方法就是await()和signal()方法;
Condition依賴於Lock接口,生成一個Condition的基本代碼是lock.newCondition()
調用Condition的await()和signal()方法,都必須在lock保護之內,就是說必須在lock.lock()和lock.unlock之間才可以使用
Conditon中的await()對應Object的wait();
Condition中的signal()對應Object的notify();
Condition中的signalAll()對應Object的notifyAll()。
三、示例代碼
- package com.meituan.hyt.test4;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReentrantLock;
- public class Main {
- public static void main(String[] args) {
- final ReentrantLock reentrantLock = new ReentrantLock();
- final Condition condition = reentrantLock.newCondition();
- new Thread(new Runnable() {
- @Override
- public void run() {
- reentrantLock.lock();
- System.out.println(Thread.currentThread().getName() + "拿到鎖了");
- System.out.println(Thread.currentThread().getName() + "等待信號");
- try {
- condition.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName() + "拿到信號");
- reentrantLock.unlock();
- }
- }, "線程1").start();
- new Thread(new Runnable() {
- @Override
- public void run() {
- reentrantLock.lock();
- System.out.println(Thread.currentThread().getName() + "拿到鎖了");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName() + "發出信號");
- condition.signalAll();
- reentrantLock.unlock();
- }
- }, "線程2").start();
- }
- }
運行結果:
線程1拿到鎖了
線程1等待信號
線程2拿到鎖了
線程2發出信號
線程1拿到信號
四、原理
我們知道Lock的本質是AQS,AQS自己維護的隊列是當前等待資源的隊列,AQS會在資源被釋放后,依次喚醒隊列中從前到后的所有節點,使他們對應的線程恢復執行,直到隊列為空。而Condition自己也維護了一個隊列,該隊列的作用是維護一個等待signal信號的隊列。但是,兩個隊列的作用不同的,事實上,每個線程也僅僅會同時存在以上兩個隊列中的一個,流程是這樣的:
1. 線程1調用reentrantLock.lock時,嘗試獲取鎖。如果成功,則返回,從AQS的隊列中移除線程;否則阻塞,保持在AQS的等待隊列中。
2. 線程1調用await方法被調用時,對應操作是被加入到Condition的等待隊列中,等待signal信號;同時釋放鎖。
3. 鎖被釋放后,會喚醒AQS隊列中的頭結點,所以線程2會獲取到鎖。
4. 線程2調用signal方法,這個時候Condition的等待隊列中只有線程1一個節點,於是它被取出來,並被加入到AQS的等待隊列中。注意,這個時候,線程1 並沒有被喚醒,只是被加入AQS等待隊列。
5. signal方法執行完畢,線程2調用unLock()方法,釋放鎖。這個時候因為AQS中只有線程1,於是,線程1被喚醒,線程1恢復執行。
所以:
發送signal信號只是將Condition隊列中的線程加到AQS的等待隊列中。只有到發送signal信號的線程調用reentrantLock.unlock()釋放鎖后,這些線程才會被喚醒。
可以看到,整個協作過程是靠結點在AQS的等待隊列和Condition的等待隊列中來回移動實現的,Condition作為一個條件類,很好的自己維護了一個等待信號的隊列,並在適時的時候將結點加入到AQS的等待隊列中來實現的喚醒操作。
await源碼:
- public final void await() throws InterruptedException {
- // 1.如果當前線程被中斷,則拋出中斷異常
- if (Thread.interrupted())
- throw new InterruptedException();
- // 2.將節點加入到Condition隊列中去,這里如果lastWaiter是cancel狀態,那么會把它踢出Condition隊列。
- Node node = addConditionWaiter();
- // 3.調用tryRelease,釋放當前線程的鎖
- long savedState = fullyRelease(node);
- int interruptMode = 0;
- // 4.為什么會有在AQS的等待隊列的判斷?
- // 解答:signal操作會將Node從Condition隊列中拿出並且放入到等待隊列中去,在不在AQS等待隊列就看signal是否執行了
- // 如果不在AQS等待隊列中,就park當前線程,如果在,就退出循環,這個時候如果被中斷,那么就退出循環
- while (!isOnSyncQueue(node)) {
- LockSupport.park(this);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- }
- // 5.這個時候線程已經被signal()或者signalAll()操作給喚醒了,退出了4中的while循環
- // 自旋等待嘗試再次獲取鎖,調用acquireQueued方法
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null)
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- }
1.將當前線程加入Condition鎖隊列。特別說明的是,這里不同於AQS的隊列,這里進入的是Condition的FIFO隊列。
2.釋放鎖。這里可以看到將鎖釋放了,否則別的線程就無法拿到鎖而發生死鎖。
3.自旋(while)掛起,直到被喚醒(signal把他重新放回到AQS的等待隊列)或者超時或者CACELLED等。
4.獲取鎖(acquireQueued)。並將自己從Condition的FIFO隊列中釋放,表明自己不再需要鎖(我已經拿到鎖了)。
signal就是喚醒Condition隊列中的第一個非CANCELLED節點線程,而signalAll就是喚醒所有非CANCELLED節點線程,本質是將節點從Condition隊列中取出來一個還是所有節點放到AQS的等待隊列。盡管所有Node可能都被喚醒,但是要知道的是仍然只有一個線程能夠拿到鎖,其它沒有拿到鎖的線程仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。
11.Java並發編程系列之十七:Condition接口
通過前面的文章,我們知道任何一個Java對象,都擁有一組監視器方法,主要包括wait()、notify()、notifyAll()方法,這些方法與synchronized關鍵字配合使用可以實現等待/通知機制。而且前面我們已經使用這種方式實現了生產者-消費者模式。類似地,Condition接口也提供類似的Object的監視器的方法,主要包括await()、signal()、signalAll()方法,這些方法與Lock鎖配合使用也可以實現等待/通知機制。
相比Object實現的監視器方法,Condition接口的監視器方法具有一些Object所沒有的特性:
- Condition接口可以支持多個等待隊列,在前面已經提到一個Lock實例可以綁定多個Condition,所以自然可以支持多個等待隊列了
- Condition接口支持響應中斷,前面已經提到過
- Condition接口支持當前線程釋放鎖並進入等待狀態到將來的某個時間,也就是支持定時功能
使用Condition接口配合Lock鎖的使用實例如下:
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public void conditionWait() throws InterruptedException { lock.lock(); try { //.... condition.await(); }finally { lock.unlock(); } } public void conditionSignal(){ lock.lock(); try { //... condition.signal(); }finally { lock.unlock(); } }
一般而言,都會將Condition變量作為成員變量。當調用await方法后,當前線程會釋放鎖並進入Condition變量的等待隊列,而其他線程調用signal方法后,通知正在Condition變量等待隊列的線程從await方法返回,並且在返回前已經獲得了鎖。
與使用Object的監視器方法達到了同樣的效果,也許看不出Condition配合Lock鎖的優勢何在。但是在復雜多線程的編程中,這種方式可以體現出其優勢。所以一般使用的時候仍然是Object的監視器方法居多。
現在我們已經知道了如何配合Condition和Lock鎖實現等待/通知機制,那么我們使用這種方式實現生產者-消費者模式:
package com.rhwayfun.concurrency.r0405; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Created by rhwayfun on 16-4-5. */ public class ConditionProducerConsumerDemo { //日期格式器 private static DateFormat format = new SimpleDateFormat("HH:mm:ss"); static class Info{ //作者 private String author; //標題 private String title; //是否開始生產的標志 private boolean produce = true; //Lock鎖 private Lock lock = new ReentrantLock(); //Condition變量 private Condition condition = lock.newCondition(); public Info(){} public Info(String author, String title) { this.author = author; this.title = title; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } /** * 生產者執行的生產方法 * @param author * @param title * @throws InterruptedException */ public void set(String author,String title) throws InterruptedException { lock.lock(); try { //沒有開始生產就等待 while (!produce){ condition.await(); } //如果已經開始生產 this.setAuthor(author); TimeUnit.SECONDS.sleep(1); this.setTitle(title); //表示已經停止了生產可以取數據了 produce = false; //通知消費者 condition.signal(); }finally { lock.unlock(); } } /** * 消費者執行的消費方法 * @throws InterruptedException */ public void get() throws InterruptedException { lock.lockInterruptibly(); try { //如果已經開始生產就等待 while (produce){ condition.await(); } //如果沒有在生產就就可以取數據 System.out.println(Thread.currentThread().getName() + ":" + this.getAuthor() + "=" + this.getTitle() + " at " + format.format(new Date())); //表示我已經取了數據,生產者可以繼續生產 produce = true; //通知生產者 condition.signal(); }finally { lock.unlock(); } } } static class Producer implements Runnable{ private Info info; public Producer(Info info) { this.info = info; } public void run() { boolean flag = true; for (int i = 0; i < 5; i++){ if (flag){ try { info.set("authorA","titleA"); System.out.println(Thread.currentThread().getName() + ":" + info.getAuthor() + "=" + info.getTitle() + " at " + format.format(new Date())); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } flag = false; }else { try { info.set("authorB","titleB"); System.out.println(Thread.currentThread().getName() + ":" + info.getAuthor() + "=" + info.getTitle() + " at " + format.format(new Date())); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } flag = true; } } } } static class Consumer implements Runnable{ private Info info; public Consumer(Info info) { this.info = info; } public void run() { for (int i = 0; i < 5; i++){ try { info.get(); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) throws InterruptedException { Info info = new Info(); Thread producer = new Thread(new Producer(info),"Producer"); Thread consumer = new Thread(new Consumer(info),"Consumer"); producer.start(); TimeUnit.SECONDS.sleep(1); consumer.start(); } }
運行結果如下: