概述
先來回顧一下java中的等待/通知機制
我們有時會遇到這樣的場景:線程A執行到某個點的時候,因為某個條件condition不滿足,需要線程A暫停;等到線程B修改了條件condition,使condition滿足了線程A的要求時,A再繼續執行。
自旋實現的等待通知
最簡單的實現方法就是將condition設為一個volatile的變量,當A線程檢測到條件不滿足時就自旋,類似下面:
public class Test { private static volatile int condition = 0; public static void main(String[] args) throws InterruptedException { Thread A = new Thread(new Runnable() { @Override public void run() { while (!(condition == 1)) { // 條件不滿足,自旋 } System.out.println("a executed"); } }); A.start(); Thread.sleep(2000); condition = 1; } }
這種方式的問題在於自旋非常耗費CPU資源,當然如果在自旋的代碼塊里加入Thread.sleep(time)將會減輕CPU資源的消耗,但是如果time設的太大,A線程就不能及時響應condition的變化,如果設的太小,依然會造成CPU的消耗。
Object提供的等待通知
因此,java在Object類里提供了wait()和notify()方法,使用方法如下:
class Test1 { private static volatile int condition = 0; private static final Object lock = new Object(); public static void main(String[] args) throws InterruptedException { Thread A = new Thread(new Runnable() { @Override public void run() { synchronized (lock) { while (!(condition == 1)) { try { lock.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } System.out.println("a executed by notify"); } } }); A.start(); Thread.sleep(2000); condition = 1; synchronized (lock) { lock.notify(); } } }
通過代碼可以看出,在使用一個對象的wait()、notify()方法前必須要獲取這個對象的鎖。
當線程A調用了lock對象的wait()方法后,線程A將釋放持有的lock對象的鎖,然后將自己掛起,直到有其他線程調用notify()/notifyAll()方法或被中斷。可以看到在lock.wait()前面檢測condition條件的時候使用了一個while循環而不是if,那是因為當有其他線程把condition修改為滿足A線程的要求並調用notify()后,A線程會重新等待獲取鎖,獲取到鎖后才從lock.wait()方法返回,而在A線程等待鎖的過程中,condition是有可能再次變化的。
因為wait()、notify()是和synchronized配合使用的,因此如果使用了顯示鎖Lock,就不能用了。所以顯示鎖要提供自己的等待/通知機制,Condition應運而生。
顯示鎖提供的等待通知
我們用Condition實現上面的例子:
class Test2 { private static volatile int condition = 0; private static Lock lock = new ReentrantLock(); private static Condition lockCondition = lock.newCondition(); public static void main(String[] args) throws InterruptedException { Thread A = new Thread(new Runnable() { @Override public void run() { lock.lock(); try { while (!(condition == 1)) { lockCondition.await(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { lock.unlock(); } System.out.println("a executed by condition"); } }); A.start(); Thread.sleep(2000); condition = 1; lock.lock(); try { lockCondition.signal(); } finally { lock.unlock(); } } }
可以看到通過 lock.newCondition() 可以獲得到 lock 對應的一個Condition對象lockCondition ,lockCondition的await()、signal()方法分別對應之前的Object的wait()和notify()方法。整體上和Object的等待通知是類似的。
應用舉例
上面我們看到了Condition實現的等待通知和Object的等待通知是非常類似的,而Condition提供的等待通知功能更強大,最重要的一點是,一個lock對象可以通過多次調用 lock.newCondition() 獲取多個Condition對象,也就是說,在一個lock對象上,可以有多個等待隊列,而Object的等待通知在一個Object上,只能有一個等待隊列。用下面的例子說明,下面的代碼實現了一個阻塞隊列,當隊列已滿時,add操作被阻塞有其他線程通過remove方法刪除元素;當隊列已空時,remove操作被阻塞直到有其他線程通過add方法添加元素。
public class BoundedQueue1<T> { public List<T> q; //這個列表用來存隊列的元素 private int maxSize; //隊列的最大長度 private Lock lock = new ReentrantLock(); private Condition addConditoin = lock.newCondition(); private Condition removeConditoin = lock.newCondition(); public BoundedQueue1(int size) { q = new ArrayList<>(size); maxSize = size; } public void add(T e) { lock.lock(); try { while (q.size() == maxSize) { addConditoin.await(); } q.add(e); removeConditoin.signal(); //執行了添加操作后喚醒因隊列空被阻塞的刪除操作 } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } finally { lock.unlock(); } } public T remove() { lock.lock(); try { while (q.size() == 0) { removeConditoin.await(); } T e = q.remove(0); addConditoin.signal(); //執行刪除操作后喚醒因隊列滿而被阻塞的添加操作 return e; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } finally { lock.unlock(); } } }
源碼分析
下面來分析Condition源碼
概述
之前我們介紹AQS的時候說過,AQS的同步排隊用了一個隱式的雙向隊列,同步隊列的每個節點是一個AbstractQueuedSynchronizer.Node實例。
Node的主要字段有:
- waitStatus:等待狀態,所有的狀態見下面的表格。
- prev:前驅節點
- next:后繼節點
- thread:當前節點代表的線程
- nextWaiter:Node既可以作為同步隊列節點使用,也可以作為Condition的等待隊列節點使用(將會在后面講Condition時講到)。在作為同步隊列節點時,nextWaiter可能有兩個值:EXCLUSIVE、SHARED標識當前節點是獨占模式還是共享模式;在作為等待隊列節點使用時,nextWaiter保存后繼節點。
狀態 | 值 | 含義 |
CANCELLED | 1 | 當前節點因為超時或中斷被取消同步狀態獲取,該節點進入該狀態不會再變化 |
SIGNAL | -1 | 標識后繼的節點處於阻塞狀態,當前節點在釋放同步狀態或被取消時,需要通知后繼節點繼續運行。每個節點在阻塞前,需要標記其前驅節點的狀態為SIGNAL。 |
CONDITION | -2 | 標識當前節點是作為等待隊列節點使用的。 |
PROPAGATE | -3 | |
0 | 0 | 初始狀態 |
Condition實現等待的時候內部也有一個等待隊列,等待隊列是一個隱式的單向隊列,等待隊列中的每一個節點也是一個AbstractQueuedSynchronizer.Node實例。
每個Condition對象中保存了firstWaiter和lastWaiter作為隊列首節點和尾節點,每個節點使用Node.nextWaiter保存下一個節點的引用,因此等待隊列是一個單向隊列。
每當一個線程調用Condition.await()方法,那么該線程會釋放鎖,構造成一個Node節點加入到等待隊列的隊尾。
等待
Condition.await()方法的源碼如下:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //構造一個新的等待隊列Node加入到隊尾 int savedState = fullyRelease(node); //釋放當前線程的獨占鎖,不管重入幾次,都把state釋放為0 int interruptMode = 0;
//如果當前節點沒有在同步隊列上,即還沒有被signal,則將當前線程阻塞 while (!isOnSyncQueue(node)) { LockSupport.park(this);
//后面的藍色代碼都是和中斷相關的,主要是區分兩種中斷:是在被signal前中斷還是在被signal后中斷,如果是被signal前就被中斷則拋出 InterruptedException,否則執行 Thread.currentThread().interrupt(); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //被中斷則直接退出自旋 break; }
//退出了上面自旋說明當前節點已經在同步隊列上,但是當前節點不一定在同步隊列隊首。acquireQueued將阻塞直到當前節點成為隊首,即當前線程獲得了鎖。然后await()方法就可以退出了,讓線程繼續執行await()后的代碼。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } final boolean isOnSyncQueue(Node node) {
//如果當前節點狀態是CONDITION或node.prev是null,則證明當前節點在等待隊列上而不是同步隊列上。之所以可以用node.prev來判斷,是因為一個節點如果要加入同步隊列,在加入前就會設置好prev字段。 if (node.waitStatus == Node.CONDITION || node.prev == null) return false;
//如果node.next不為null,則一定在同步隊列上,因為node.next是在節點加入同步隊列后設置的 if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); //前面的兩個判斷沒有返回的話,就從同步隊列隊尾遍歷一個一個看是不是當前節點。 } private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
通知
Condition.signal() 方法的源碼如下:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //如果同步狀態不是被當前線程獨占,直接拋出異常。從這里也能看出來,Condition只能配合獨占類同步組件使用。 Node first = firstWaiter; if (first != null) doSignal(first); //通知等待隊列隊首的節點。 }
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && //transferForSignal方法嘗試喚醒當前節點,如果喚醒失敗,則繼續嘗試喚醒當前節點的后繼節點。 (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { //如果當前節點狀態為CONDITION,則將狀態改為0准備加入同步隊列;如果當前狀態不為CONDITION,說明該節點等待已被中斷,則該方法返回false,doSignal()方法會繼續嘗試喚醒當前節點的后繼節點 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); //將節點加入同步隊列,返回的p是節點在同步隊列中的先驅節點 int ws = p.waitStatus;
//如果先驅節點的狀態為CANCELLED(>0) 或設置先驅節點的狀態為SIGNAL失敗,那么就立即喚醒當前節點對應的線程,線程被喚醒后會執行acquireQueued方法,該方法會重新嘗試將節點的先驅狀態設為SIGNAL並再次park線程;如果當前設置前驅節點狀態為SIGNAL成功,那么就不需要馬上喚醒線程了,當它的前驅節點成為同步隊列的首節點且釋放同步狀態后,會自動喚醒它。
//其實筆者認為這里不加這個判斷條件應該也是可以的。只是對於CAS修改前驅節點狀態為SIGNAL成功這種情況來說,如果不加這個判斷條件,提前喚醒了線程,等進入acquireQueued方法了節點發現自己的前驅不是首節點,還要再阻塞,等到其前驅節點成為首節點並釋放鎖時再喚醒一次;而如果加了這個條件,線程被喚醒的時候它的前驅節點肯定是首節點了,線程就有機會直接獲取同步狀態從而避免二次阻塞,節省了硬件資源。 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
Condition等待通知的本質
總的來說,Condition的本質就是等待隊列和同步隊列的交互:
當一個持有鎖的線程調用Condition.await()時,它會執行以下步驟:
- 構造一個新的等待隊列節點加入到等待隊列隊尾
- 釋放鎖,也就是將它的同步隊列節點從同步隊列隊首移除
- 自旋,直到它在等待隊列上的節點移動到了同步隊列(通過其他線程調用signal())或被中斷
- 阻塞當前節點,直到它獲取到了鎖,也就是它在同步隊列上的節點排隊排到了隊首。
當一個持有鎖的線程調用Condition.signal()時,它會執行以下操作:
從等待隊列的隊首開始,嘗試對隊首節點執行喚醒操作;如果節點CANCELLED,就嘗試喚醒下一個節點;如果再CANCELLED則繼續迭代。
對每個節點執行喚醒操作時,首先將節點加入同步隊列,此時await()操作的步驟3的解鎖條件就已經開啟了。然后分兩種情況討論:
- 如果先驅節點的狀態為CANCELLED(>0) 或設置先驅節點的狀態為SIGNAL失敗,那么就立即喚醒當前節點對應的線程,此時await()方法就會完成步驟3,進入步驟4.
- 如果成功把先驅節點的狀態設置為了SIGNAL,那么就不立即喚醒了。等到先驅節點成為同步隊列首節點並釋放了同步狀態后,會自動喚醒當前節點對應線程的,這時候await()的步驟3才執行完成,而且有很大概率快速完成步驟4.
總結
如果知道Object的等待通知機制,Condition的使用是比較容易掌握的,因為和Object等待通知的使用基本一致。
對Condition的源碼理解,主要就是理解等待隊列,等待隊列可以類比同步隊列,而且等待隊列比同步隊列要簡單,因為等待隊列是單向隊列,同步隊列是雙向隊列。
以下是筆者對等待隊列是單向隊列、同步隊列是雙向隊列的一些思考,歡迎提出不同意見:
之所以同步隊列要設計成雙向的,是因為在同步隊列中,節點喚醒是接力式的,由每一個節點喚醒它的下一個節點,如果是由next指針獲取下一個節點,是有可能獲取失敗的,因為虛擬隊列每添加一個節點,是先用CAS把tail設置為新節點,然后才修改原tail的next指針到新節點的。因此用next向后遍歷是不安全的,但是如果在設置新節點為tail前,為新節點設置prev,則可以保證從tail往前遍歷是安全的。因此要安全的獲取一個節點Node的下一個節點,先要看next是不是null,如果是null,還要從tail往前遍歷看看能不能遍歷到Node。
而等待隊列就簡單多了,等待的線程就是等待者,只負責等待,喚醒的線程就是喚醒者,只負責喚醒,因此每次要執行喚醒操作的時候,直接喚醒等待隊列的首節點就行了。等待隊列的實現中不需要遍歷隊列,因此也不需要prev指針。