condition是對線程進行控制管理的接口,具體實現是AQS的一個內部類ConditionObject,主要功能是控制線程的啟/停(這么說並不嚴格,還要有鎖的競爭排隊)。
condition主要方法:
void await() throws InterruptedException
|
進入等待,直到被通知或中斷
|
void awaitUninterruptibly()
|
進入等待,直到被通知,不響應中斷
|
long awaitNanos(long nanosTimeout) throws InterruptedException
|
等待xxx納秒
|
boolean awaitUntil(Date deadline) throws InterruptedException
|
等待,直到某個時間點
|
void signal()
|
喚醒
|
void signalAll()
|
喚醒所有等待在condition上的線程,會從等待隊列挨個signal()
|
使用示例:
通過實現一個有界隊列來深入理解Condition的使用方式。有界隊列:一種特殊的隊列,當隊列為空時,隊列的獲取操作將會阻塞獲取線程,直到隊列中有新增元素,當隊列已滿時,隊列的插入操作將會阻塞插入線程,
,直到隊列出現空位。
public class BoundedQueue<T> { private Object[] items; //添加的下標,刪除的下標和數組當前數量 private int addIndex, removeIndex, count; private Lock lock = new ReentrantLock(); private Condition empty = lock.newCondition(); private Condition full = lock.newCondition(); //構造方法 public BoundedQueue(int size){ items = new Object[size]; } //添加元素,如果數組滿,則添加線程進入等待,直到有空位 public void add(T t) throws InterruptedException{ lock.lock(); try { while (count == items.length) //改成if會如何 full.await(); items[addIndex] = t; if(++addIndex == items.length) addIndex = 0; ++count; empty.signal(); }finally { lock.unlock(); } } //從頭部刪除一個元素,如果數組空,則刪除線程進入等待狀態,直到添加新元素 public T remove() throws InterruptedException{ lock.lock(); try{ while (count == 0) empty.await(); Object x = items[removeIndex]; if(++removeIndex == items.length) removeIndex = 0; --count; full.signal(); return (T)x; }finally { lock.unlock(); } } }
在這里,阻塞隊列的數據存放是在items數組中,注意幾個下標的賦值操作,當addIndex到頭的時候,因為之前可能有remove操作,故items數組的頭部或者中間位置可能是空的,如果繼續添加數據,數據應添加在頭部,相當於形成了一個“環”,這就是19-20行的含義。至於16行代碼中的while替換成if,書中給出的解釋是:使用while目的是為了防止過早或意外的通知,只有條件符合才能退出循環。這個地方沒有想出相應的場景,僅從目前的代碼邏輯來說,換成if也是可以的,但如果考慮異常導致等待線程被喚醒,那阻塞隊列就無法正常工作了。
原理分析:
ConditionObject是同步器AQS的內部類,因為Condition的操作需要獲取相關聯的鎖,所以作為同步器的內部類也較為合理。每個Condition對象都包含着一個隊列(等待隊列),該隊列是condition對象實現等待/通知的功能的關鍵。
等待隊列:
等待隊列是一個FIFO的隊列,在隊列中的每個節點都包含了一個線程引用,該線程就是在Condition對象上等待的線程,如果一個線程調用了Condition.await()方法,那么該線程將會釋放鎖、構造成節點加入等待隊列並進入等待狀態。事實上,節點的定義復用了同步器中節點的定義,
也就是說,同步隊列和等待隊列中節點類型都是同步器的靜態內部類AbstractQueuedSynchronizer.Node。
如圖所示,Condition擁有首尾節點的引用,而新增節點只需要將原有的尾節點nextWaiter指向它,並且更新尾節點即可。
上述節點引用更新的過程並沒有使用CAS保證,原因在於調用await()方法的線程必定是獲取了鎖的線程,也就是說該過程是由鎖來保證線程安全的。
在Object的監視器模型上,一個對象擁有一個同步隊列和等待隊列,
而並發包中的Lock實現類擁有一個同步隊列和多個等待隊列:
如上圖所示,Condition的實現是同步器的內部類,因此每個Condition實例都能夠訪問同步器提供的方法,相當於每個Condition都擁有所屬同步器的引用。
等待:
調用Condition的await()方法(或者以await開頭的方法),會使當前線程進入等待隊列並釋放鎖,同時線程狀態變為等待狀態。當從await()方法返回時,當前線程一定獲取了Condition相關聯的鎖。
如果從隊列(同步隊列和等待隊列)的角度看await()方法,當調用await()方法時,相當於同步隊列的首節點(獲取了鎖的節點)移動到Condition的等待隊列中。
Condition的await()方法:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //當前線程加入等待隊列 int savedState = fullyRelease(node); //釋放同步狀態,也就是釋放鎖 int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
調用該方法的線程成功獲取了鎖的線程,也就是同步隊列中的首節點,該方法會將當前線程構造成節點並加入等待隊列中,( 因為已經獲取了同步狀態,所以無需通過cas,在隊列尾部添加等待節點 )然后釋放同步狀態,喚醒同步隊列中的后繼節點,然后當前線程會進入等待狀態。
當等待隊列中的節點被喚醒,則喚醒節點的線程開始嘗試獲取同步狀態。如果不是通過其他線程調用Condition.signal()方法喚醒,而是對等待線程進行中斷,則會拋出InterruptedException。
如果從隊列的角度去看,當前線程加入Condition的等待隊列,如圖所示,同步隊列的首節點並不會直接加入等待隊列,而是通過addConditionWaiter()方法把當前線程構造成一個新的節點並將其加入等待隊列中:

通知:
調用Condition的signal()方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點之前,會將節點移到同步隊列末尾。
signal方法:
public final void signal() { if (!isHeldExclusively()) //是否獲取了鎖(重入鎖中是直接 return 獨占鎖線程==當前線程) throw new IllegalMonitorStateException(); Node first = firstWaiter; //等待隊列頭節點 if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) //等待隊列首節點的后繼節點為空,說明只有一個節點,那么尾節點也置空 lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); // 等待隊列首節點喚醒失敗,則喚醒下個節點 } final boolean transferForSignal(Node node) {// 將節點加入同步隊列 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 設置節點狀態為0----(等待隊列狀態只能為-2,-3,用-2進行cas設置失敗,說明是-3) return false; Node p = enq(node); // 放入同步隊列尾部 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
調用signal方法的前置條件是當前線程必須獲取了鎖,可以看到signal()方法進行了isHeldExclusively()檢查,也就是當前線程必須是獲取了鎖的線程。接着獲取等待隊列的首節點,將其移動到同步隊列並使用LockSupport喚醒節點中的線程。
節點從等待隊列移動到同步隊列的過程如下圖所示:

被喚醒后的線程,將從await()方法中的while循環中退出(isOnSyncQueue(Node node)方法返回true,節點已經在同步隊列中),進而調用同步器的acquireQueued()方法加入到獲取同步狀態的競爭中。成功獲取同步狀態之后,被喚醒的線程將從先前調用的await()方法返回,此時該線程已經成功地獲取了鎖。
Condition的signalAll()方法,相當於對等待隊列中的每個節點均執行一次signal()方法,效果就是將等待隊列中所有節點全部移動到同步隊列中,並喚醒每個節點的線程。
-----------------------------------------------------------------------------------
想到的問題:
1、wait,sleep,park都有啥區別,這個讓出cpu資源的操作底層實現是一樣的么?
wait是Object的方法,會釋放鎖,原理是monitor機制(搶占對象頭信息的鎖標志位),是個native方法,也就是是java的一套機制,底層是用c編寫的,具體如何掛起線程的邏輯未知。
sleep是Thread的方法,不會釋放鎖,也是個native方法,具體底層邏輯未知。
park是unsafe類的方法,這個unsafe類提供了很多直接操作內存的方法,這個park也是個native方法,openjdk源碼顯示底層調用了操作系統的函數,停掉了線程。wait跟sleep應該也是類似原理。
2、線程的喚醒是怎么實現的,在線程非常多的情況下,喚醒了就能馬上執行么?
喚醒肯定也是最終調用os的函數來實現的,線程非常多的情況下,硬件線程數固定,操作系統或者jvm肯定也要有相應機制來調用線程,這個喚醒只是讓線程“醒”了而已,醒了!=執行,所以,喚醒了可能並不能馬上執行,而是要等待別的線程執行完后進行搶占,成功后才能執行。