Condition接口
在並發編程中,每個Java對象都存在一組監視器方法,如wait()、notify()以及notifyAll()方法,通過這些方法,我們可以實現線程間通信與協作(也稱為等待喚醒機制),如生產者-消費者模式,而且這些方法必須配合着synchronized關鍵字使用。
與synchronized的等待喚醒機制相比,Condition具有更多的靈活性以及精確性,這是因為notify()在喚醒線程時是隨機(同一個鎖),而Condition則可通過多個Condition實例對象建立更加精細的線程控制,也就帶來了更多靈活性:
-
通過Condition能夠精細的控制多線程的休眠與喚醒。
-
對於一個鎖,可以為多個線程間建立不同的Condition。
Condition是一個接口:
public interface Condition {
/**
* 使當前線程進入等待狀態,直到【被通知(signal)】或【中斷】
* 當其他線程調用singal()或singalAll()方法時,該線程將被喚醒
* 當其他線程調用interrupt()方法中斷當前線程
* await()相當於synchronized等待喚醒機制中的wait()方法
*/
void await() throws InterruptedException;
// 當前線程進入等待狀態,直到被喚醒,該方法【不響應中斷要求】
void awaitUninterruptibly();
// 調用該方法,當前線程進入等待狀態,直到【被喚醒】或【被中斷】或【超時】
// 其中nanosTimeout指的等待超時時間,單位納秒
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 同awaitNanos,但可以指明時間單位
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 調用該方法當前線程進入等待狀態,直到被喚醒、中斷或到達某個時間期限(deadline)
// 如果沒到指定時間就被喚醒,返回true,其他情況返回false
boolean awaitUntil(Date deadline) throws InterruptedException;
// 喚醒一個等待在Condition上的線程,
// 該線程從等待方法返回前,必須獲取與Condition相關聯的鎖,功能與notify()相同
void signal();
// 喚醒所有等待在Condition上的線程,
// 該線程從等待方法返回前,必須獲取與Condition相關聯的鎖,功能與notifyAll()相同
void signalAll();
}
同步隊列與等待隊列
AQS中存在兩種隊列,一種是同步隊列,一種是等待隊列,而等待隊列就相對於Condition而言的。在使用Condition前必須獲得鎖,同時在Condition的等待隊列上的結點與前面同步隊列的結點是同一個類即Node,其結點的waitStatus的值為CONDITION=1。
同步隊列與等待隊列的關系:

每個Condition都對應着一個等待隊列,也就是說如果一個鎖上創建了多個Condition對象,那么也就存在多個等待隊列。等待隊列是一個FIFO的隊列,在隊列中每一個節點都包含了一個線程的引用,而該線程就是Condition對象上等待的線程。
當一個線程調用了await()相關的方法,那么該線程將會釋放鎖,並構建一個Node節點封裝當前線程的相關信息,加入到等待隊列中進行等待,直到被喚醒、中斷、超時才從隊列中移出。
Condition中的等待隊列模型如下:
Node節點的數據結構,在等待隊列中使用的變量與同步隊列是不同的,Condtion中等待隊列的結點,只有直接指向的后繼結點,並沒有指明前驅結點,而且使用的變量是nextWaiter而不是next。
等待隊列中結點的狀態只有兩種即CANCELLED和CONDITION,前者表示線程已結束,需要從等待隊列中移除,后者表示條件結點等待被喚醒。
每個Codition對象對應於一個等待隊列,也就是說AQS中只能存在一個同步隊列,但可擁有多個等待隊列。
newCondition
public class ReentrantLock implements Lock, java.io.Serializable {
public Condition newCondition() {
// 使用自定義的條件
return sync.newCondition();
}
}
public class MyMutex implements Lock {
private static class MySync extends AbstractQueuedSynchronizer {
/**
* 主要用於等待/通知機制,每個condition都有一個與之對應的條件等待隊列
* @return condition
*/
Condition newCondition() {
return new ConditionObject();
}
}
}
public class ReentrantLock implements Lock, java.io.Serializable {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
AQS#ConditionObject
ConditionObject是Condition的實現類,該類就定義在了AQS中。在實現類ConditionObject中有兩個結點,分別是firstWaiter和lastWaiter,firstWaiter代表等待隊列第一個等待結點,lastWaiter代表等待隊列最后一個等待結點:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
public ConditionObject() { }
// ...
}
}
所以,只需要來看一下ConditionObject實現的await/signal方法來使用這兩個成員變量就可以了。
await()
await()方法主要做了3件事:
-
一是調用
addConditionWaiter()方法將當前線程封裝成node結點加入等待隊列; -
二是調用
fullyRelease(node)方法釋放同步狀態,並喚醒后繼結點的線程; -
三是調用
isOnSyncQueue(node)方法判斷結點是否在同步隊列中。注意是個while循環,如果同步隊列中沒有該結點就直接掛起該線程,需要明白的是如果線程被喚醒后就調用acquireQueued(node, savedState)執行自旋操作爭取鎖,即當前線程結點從等待隊列轉移到同步隊列並開始努力獲取鎖。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
public ConditionObject() { }
// ...
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 1.構建Node節點,並加入到等待隊列中
int savedState = fullyRelease(node); // 2.釋放當前線程鎖,即釋放同步狀態
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 3.判斷結點是否在同步隊列(SyncQueue)中,即是否被喚醒
LockSupport.park(this); // 掛起當前線程
// 判斷是否被中斷喚醒,如果是退出循環
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被喚醒后執行自旋操作爭取獲得鎖,同時判斷線程是否被中斷
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters(); // 清理等待隊列中不為CONDITION狀態的結點
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
}
}
執行addConditionWaiter()添加到等待隊列:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) { // 判斷是否為結束狀態的結點並移除
unlinkCancelledWaiters();
t = lastWaiter;
}
// 新構建的節點的waitStatus是CONDITION,注意不是0或SIGNAL了
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 構建單向同步隊列
if (t == null)
firstWaiter = node; // 加入等待隊列
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
}
}
為什么這里是單向隊列,也沒有使用CAS來保證加入隊列的安全性呢?
因為await是Lock范式try中使用的,說明已經獲取到鎖了,所以就沒必要使用CAS了。至於是單向,因為這里還不涉及到競爭鎖,只是做一個條件等待隊列。
線程已經按相應的條件加入到了條件等待隊列中,那如何再嘗試獲取鎖呢?
signal/signalAll
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
public final void signal() {
if (!isHeldExclusively()) // 判斷是否持有獨占鎖,如果不是拋出異常
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null) // 喚醒等待隊列第一個結點的線程
doSignal(first);
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
}
}
這里signal()方法做了兩件事:
-
一是判斷當前線程是否持有獨占鎖,沒有就拋出異常,從這點也可以看出只有獨占模式先采用等待隊列,而共享模式下是沒有等待隊列的,也就沒法使用Condition。
-
二是喚醒等待隊列的第一個結點,即執行
doSignal(first)。
doSignal/doSignalAll
doSignal(first)方法中做了兩件事:
-
一是從條件等待隊列移除被喚醒的節點,然后
重新維護條件等待隊列的firstWaiter和lastWaiter的指向。 -
二是將從等待隊列移除的結點加入同步隊列(在
transferForSignal()方法中完成的),如果進入到同步隊列失敗,並且條件等待隊列還有不為空的節點,則繼續循環喚醒后續其他結點的線程。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
private void doSignal(Node first) {
do {
// 移除條件等待隊列中的第一個結點,
// 如果后繼結點為null,那么說沒有其他結點,將尾結點也設置為null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 如果被通知節點沒有進入到同步隊列,並且條件等待隊列還有不為空的節點,則繼續循環通知后續結點
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
/*
* 循環判斷是否還有nextWaiter,
* 如果有就像signal操作一樣,將其從【條件等待隊列】中移到【同步隊列】中
*/
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
}
final boolean transferForSignal(Node node) {
// 嘗試設置喚醒結點的waitStatus為0,即初始化狀態
// 如果設置失敗,說明當前結點node的waitStatus已不為CONDITION狀態,那么只能是結束狀態了,因此返回false
// 返回doSignal()方法中繼續喚醒其他結點的線程,注意這里並不涉及並發問題,
// 所以CAS操作失敗只可能是預期值不為CONDITION,
// 而不是多線程設置導致預期值變化,畢竟操作該方法的線程是持有鎖的。
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 加入同步隊列並返回前驅結點p
Node p = enq(node);
int ws = p.waitStatus;
// 判斷前驅結點是否為結束結點(CANCELLED=1)或者
// 在設置前驅節點狀態為Node.SIGNAL狀態失敗時,喚醒被通知節點代表的線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 喚醒node結點的線程
LockSupport.unpark(node.thread);
return true;
}
}
總結:signal的喚醒過程
signal()被調用后,先判斷當前線程是否持有獨占鎖,如果有,那么喚醒當前Condition對象中等待隊列的第一個結點的線程,並從等待隊列中移除該結點,移動到同步隊列中:
-
如果加入同步隊列失敗,那么繼續
循環喚醒等待隊列中的其他結點的線程; -
如果成功加入同步隊列,那么如果其前驅節點是已結束的節點或者設置前驅節點狀態為Node.SIGNAL狀態失敗,則通過
LockSupport.unpark()喚醒被通知節點代表的線程,到此signal()任務完成。
被喚醒后的線程,將從前面的await()方法中的while循環中退出,因為此時該線程的結點已在同步隊列中,那么while (!isOnSyncQueue(node))將不在符合循環條件,進而調用AQS的acquireQueued()方法加入獲取同步狀態的競爭中,這就是等待喚醒機制的整個流程實現原理,流程如下圖所示(注意無論是同步隊列還是等待隊列使用的Node數據結構都是同一個,不過是使用的內部變量不同罷了):

生產者-消費者Condition
package condition;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.ThreadLocalRandom.current;
public class TestCondition {
// 定義顯示鎖
private static final ReentrantLock lock = new ReentrantLock();
// 創建與顯式鎖關聯的Condition對象
private static final Condition condition = lock.newCondition();
// 鏈表
private static final LinkedList<Long> list = new LinkedList<>();
// 鏈表最大容量為100
private static final int CAPACITY = 100;
// 定義數據的初始值
private static long i = 0;
/**
* 生產者方法
*/
private static void produce() {
// 獲取鎖
lock.lock();
try {
// 當鏈表中數據量達到100時,生產者線程將被阻塞,加入與Condition關聯的wait隊列中
while (list.size() >= CAPACITY) {
condition.await();
}
// 當鏈表中數據量不足100時,生產新的數據
i++;
list.addLast(i);
System.out.println(currentThread().getName() + " 生產了數據 " + i);
// 1. 通知其他線程
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 釋放鎖
lock.unlock();
}
}
/**
* 消費者方法
*/
private static void consume() {
lock.lock();
try {
// 當list中數據為空時,消費者線程將被阻塞加入與Condition關聯的wait隊列
while (list.isEmpty()) {
condition.await();
}
// 消費數據
Long value = list.removeFirst();
System.out.println(currentThread().getName() + " 消費了數據 " + value);
// 2.通知其他線程
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private static void sleep() {
try {
TimeUnit.SECONDS.sleep(current().nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 啟動10個生產者線程
IntStream.range(0, 10).forEach(i ->
new Thread(
() -> {
for (; ; ) {
produce();
sleep();
}
}, "Producer-" + i
).start()
);
// 啟動5個消費者線程
IntStream.range(0, 5).forEach(i ->
new Thread(
() -> {
for (; ; ) {
consume();
sleep();
}
}, "Consumer-" + i
).start()
);
}
}
輸出結果:
...
Producer-6 生產了數據 8
Producer-4 生產了數據 9
Producer-8 生產了數據 10
Consumer-0 消費了數據 1
Consumer-1 消費了數據 2
Consumer-2 消費了數據 3
Consumer-4 消費了數據 4
Consumer-3 消費了數據 5
Producer-2 生產了數據 11
Producer-6 生產了數據 12
...
注釋1和2處condition.signalAll()喚醒的是與Condition關聯的阻塞隊列中的所有阻塞線程。由於使用的是唯一的一個Condition實例,因此,生產者喚醒的有可能是與Condition關聯的wait隊列中的生產者線程。假設此時生產者線程被喚醒並搶到了CPU的調度獲得了執行權,但又發現隊列已滿再次進入阻塞。這樣的線程上下文開銷實際上是沒有意義的,甚至會影響性能(多線程下的線程上下文切換開銷是非常大的性能損耗)。
因此,需要使用兩個Condition對象,一個用於隊列已滿臨界值條件的處理,另外一個用於隊列為空的臨界值條件的處理。此時,在生產者中喚醒的阻塞線程只能是消費者線程;在消費者中喚醒的也只能是生產者線程:
package condition;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.ThreadLocalRandom.current;
public class TestCondition {
// 定義顯示鎖
private static final ReentrantLock lock = new ReentrantLock();
// 創建與顯式鎖關聯的Condition對象
private static final Condition Full_Condition = lock.newCondition();
private static final Condition EMPTY_Condition = lock.newCondition();
// 鏈表
private static final LinkedList<Long> list = new LinkedList<>();
// 鏈表最大容量為100
private static final int CAPACITY = 100;
// 定義數據的初始值
private static long i = 0;
/**
* 生產者方法
*/
private static void produce() {
// 獲取鎖
lock.lock();
try {
// 當鏈表中數據量達到100時,生產者線程將被阻塞,加入Full_Condition wait隊列中
while (list.size() >= CAPACITY) {
Full_Condition.await();
}
// 當鏈表中數據量不足100時,生產新的數據
i++;
list.addLast(i);
System.out.println(currentThread().getName() + " 生產了數據 " + i);
// 1. 生產者線程通知消費者線程
EMPTY_Condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 釋放鎖
lock.unlock();
}
}
/**
* 消費者方法
*/
private static void consume() {
lock.lock();
try {
// 當list中數據為空時,消費者線程將被阻塞加入EMPTY_Condition wait隊列
while (list.isEmpty()) {
EMPTY_Condition.await();
}
// 消費數據
Long value = list.removeFirst();
System.out.println(currentThread().getName() + " 消費了數據 " + value);
// 2.消費者線程通知生產者線程
Full_Condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private static void sleep() {
try {
TimeUnit.SECONDS.sleep(current().nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 啟動10個生產者線程
IntStream.range(0, 10).forEach(i ->
new Thread(
() -> {
for (; ; ) {
produce();
sleep();
}
}, "Producer-" + i
).start()
);
// 啟動5個消費者線程
IntStream.range(0, 5).forEach(i ->
new Thread(
() -> {
for (; ; ) {
consume();
sleep();
}
}, "Consumer-" + i
).start()
);
}
}
輸出結果:
...
Producer-6 生產了數據 7
Producer-5 生產了數據 8
Producer-8 生產了數據 9
Producer-9 生產了數據 10
Consumer-0 消費了數據 1
Consumer-3 消費了數據 2
Consumer-4 消費了數據 3
Consumer-1 消費了數據 4
Consumer-2 消費了數據 5
Consumer-0 消費了數據 6
Producer-8 生產了數據 11
...
