微信公眾號:房東的小黑黑
路途隨遙遠,將來更美好
學海無涯,大家一起加油!
Condition條件隊列
當我們進行線程間的通信時,可以使用ReetrantLock
與Condition
相結合,其中的await()
和signal()
方法進行線程間的阻塞與喚醒。我將詳細的解釋其中的機制。
ConditionObjectConditionObject
是實現條件隊列的關鍵,每個ConditionObject對象都維護一個單獨的條件等待對列。一個AQS中可以有多個條件隊列,但是只有一個同步隊列。
該博客的描述挺好的,有的部分是引用里面的。AQS解析與實戰
條件隊列與同步隊列的聯系

1) 調用了
await()
方法的線程,會被加入到
ConditionObject
等待隊列中,並且喚醒同步隊列中head節點的下一個節點
2) 線程在某個ConditionObject對象上調用
signal()
方法后,等待隊列中的firstWaiter會被加入到同步隊列中,等待被喚醒。
3)當線程調用
unLock()
方法解鎖后,同步隊列中的head節點的下一個節點會被喚醒。
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
每個ConditionObject
對應一個條件隊列,它記錄該隊列的頭節點和尾節點。
條件隊列是單向的,而同步隊列是雙向的,會有前驅指針。
await()方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
整體的流程如下:
- 執行await()時,會新創建一個節點並放入到該條件隊列尾部。
- 然后釋放鎖,並喚醒同步隊列中的Head節點的后一個節點。
- 然后while循環,將該節點阻塞,直達該節點被放入到同步節點或者被中斷了,才退出循環。
- 退出循環后,開始調用
acquireQueued()
不斷嘗試拿鎖。 - 拿到鎖后,會清空條件隊列中被取消的節點。
public final void signal() {
//如果當前線程不是持有該鎖的線程,拋出異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
主要過程:
將條件隊列中的頭節點放入到同步隊列尾部,並獲取它在同步隊列中的前驅節點
如果他前驅節點的狀態是取消狀態或者設置設置前驅節點為Signal失敗,則喚醒當前節點,
喚醒后會執行在await()
方法阻塞后的代碼,會進行不斷嘗試獲取鎖。

線程的中斷
當我們想執行線程中斷時,一般會調用interrupt
方法,對於初學者可能以為線程會立即中斷執行,但是如果線程處於運行狀態,不會受影響,會繼續執行下去。為什么會出現這種情況呢?
java的線程中斷機制,主要是依靠中斷標志位實現的,當調用interrupt
方法時,會將中斷標志位設置為ture,然后針對線程此時的不同線程狀態,會有不同的處理。具體的處理情況可以閱讀這篇文章: java中斷詳細介紹及其對各種線程狀態的影響分析
執行中斷后,當線程處於waiting
狀態會拋出中斷異常並將清空中斷標志位。
前面寫了一篇AQS的文章,介紹了acqure()
方法,最后一步調用了selfInterrupt()
,這是當前線程進行中斷。一般我們加鎖會調動lock()
方法,但是實際上還有一個方法lockInterruptibly()
。我將通過ReetrantLock
類介紹它倆的區別。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
首先分析下lock()
方法,首先如果state
為0,那么通過CAS機制將state
設置為1,然后將當前線程設置為鎖的擁有者;其他情況則調用acquire(1)
嘗試加鎖,具體的嘗試加鎖機制就不介紹了,在之前文章寫過,它是一個自旋操作,如果獲取失敗,會將線程park
掛起,進入阻塞狀態,當其他線程執行完,會喚醒該線程,執行向下執行,這時會執行Thread.interrupted()
,會判斷當前線程是否被中斷,如果該線程拿到鎖,會執行selfInterrupt()
方法,將當前線程進行中斷,但是之后自己設計的邏輯中沒有如果沒有處理中斷的代碼,那么它會一直執行下去,或者將當前線程設置為waitiing
狀態,就會拋出中斷異常,並消除中斷標志。
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
現在再介紹一下lockInterruptibly
的機制。
首先它會判斷當前線程是否中斷,如果有,就馬上拋出異常;沒有中斷,則開始嘗試獲得鎖,獲取失敗,將該節點放入到同步隊列中,並執行自旋操作:
1)如果該節點的前驅節點頭節點,並且該節點獲取到了鎖,則設置當前節點是頭節點,並返回
2)其他情況,就需要將它的前置節點設置為Siginal
,並將當前節點所屬的線程掛起,當重新喚醒后,如果發生中斷,會直接拋出異常,並將該節點從同步隊列中刪除