一、引言
一般我們在使用鎖的Condition時,我們一般都是這么使用,以ReentrantLock為例,
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try{
condition.await();
}finally{
lock.unlock();
}
lock.lock();
try{
condition.signal();
}finally{
lock.unlock();
}
從上面可以知道,我們調用Condition的await和signal方法必須是在獲取得到鎖的情況下,首先我們以這個為基礎,先不管是如何獲取得到鎖的,那么上面的程序在condition.await()時阻塞當前調用的線程,而調用 condition.signal()方法的時候可能喚起一個正在await阻塞的線程,我這里說的是可能不是一定。為什么這么說,我們來看下await()方法主要做了什么事情。
二、分析
下面是await 方法的在jdk8的源碼,
下面是await 方法的在jdk8的源碼,
public final void await() throws InterruptedException {
if (Thread.interrupted()) // ①
throw new InterruptedException();
Node node = addConditionWaiter(); //②
int savedState = fullyRelease(node); //③
int interruptMode = 0;
while (!isOnSyncQueue(node)) { //④
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // ⑤
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //⑥
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled ⑦
unlinkCancelledWaiters();
if (interruptMode != 0) // ⑧
reportInterruptAfterWait(interruptMode);
}
執行過程如下,
① ,判斷當前顯示是否被interrupt? 是的話,會拋出中斷異常InterruptedException,
② ,調用addConditionWaiter方法,主要是new 一個以當前線程為數據的節點Node,然后添加到condition條件隊列里。
③ ,調用fullyRelease方法,該方法內部調用release方法,而release方法主要是在AQS隊列里面喚起第一個節點(即head的next節點)的線程(如果head后繼節點存在的話)。這時,在ASQ同步器內至少有2個活動的線程(一個是當前線程(可能是頭節點),另一個是喚起的線程(如果存在))。如果喚起失敗,會拋異常IllegalMonitorStateException。注:當存在喚起的線程的時候,這個線程就可以去爭取獲取鎖。
④ ,通過isOnSyncQueue方法判斷該節點是否在AQS隊列中,當調用await時候,肯定不在AQS上,因為addConditionWaiter方法是new 一個新的Node.接着會進入while循環里面。調用 LockSupport.park(this);阻塞當前線程,相當於釋放鎖。
⑤ ,當當前線程被喚起的時候(可能是 另一個await線程喚起的第一個節點可能是這個線程,或者在signal下,前驅節點已經cancel時,第一個firstWaiter節點是該當前節點),需要判斷是否被中斷,存儲在interruptMode,如果被中斷則break,否則checkInterruptWhileWaiting返回0,那么會接着判斷node節點是否在AQS中,如果還是不在的話,park當前線程,否則跳出while循環。那么node節點是什么時候被加入到AQS上的,答案是在signal方法上。
⑥ ,當node節點在AQS隊列時,我們需要獲取鎖,只有當前線程的節點Node在AQS隊列上,才能去爭取鎖。爭取鎖就是通過調用acquireQueued方法。等下來分析下acquireQueued方法。
⑦ , 如果當前節點的node.nextWaiter不為空,說明還有其他線程在該condition上,並且當前的線程已經獲取鎖,接着清除條件隊列上的cancel類型節點
⑧ , 如果interruptMode 是InterruptedException類型或者REINTERRUPT類型。則進行相應的拋中斷異常或者線程自我中斷標志位設置。
接着,來分析下signal方法
public final void signal() {
if (!isHeldExclusively()) // ①
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // ②
}
執行過程如下,
① ,如果當前線程不是持有該condition的鎖,那么執行拋IllegalMonitorStateException異常。
② ,調用doSignal方法,並且條件隊列的首節點傳入。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null) // ①
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null); // ②
}
① ,這種條件隊列firstWaiter指針為next節點,因為當前的節點需要被移除條件隊列,並且next節點為空,那么lastWaiter置為null,說明是空條件隊列,接着把first.nextWaiter=null,說明移除了條件隊列
② ,在這里有2步操作,一是transferForSignal,二是 first = firstWaiter,如果我們當前first節點入AQS隊列成功,那么transferForSignal返回true,則doSignal的while循環結束,
如果當前的first節點入AQS返回失敗,則需要next的節點重新signal,保證有一個成功的firstWaiter節點入AQS隊列。接着來看下transferForSignal 方法主要做了什么事情。
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // ①
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); // ②
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // ③
LockSupport.unpark(node.thread);
return true;
}
① ,首先傳入的節點node是條件隊列的第一個節點(在外部已經移除),改變其狀態CONDITION,為初始狀態0,如果改變失敗,說明該節點已經不是條件節點,直接返回false,doSignal方法重新調用新firstWaiter節點入AQS隊列,
② ,把首節點入AQS節點,enq()方法返回的是入節點的前驅節點。從這里核心方法可以知道,signal() 方法的作用其實只是把等待隊列中第一個非取消節點轉移到AQS的同步隊列尾部。轉移后的節點很可能正在在同步隊列阻塞着,什么時候喚醒,取決於它的前驅節點是否是頭節點。
③ ,如果當前前驅節點的waitStatus>0(說明是CANCELLED狀態),前驅節點已經Canncel(說明前驅節點已經中斷等情況),則可以調用LockSupport.unpark(node.thread)喚起線程,則await方法的park返回可以立即返回,預先將AQS同步隊列中取消的節點移除掉,而不用等到獲取同步狀態失敗的時候再去判斷了,起到一定的優化作用。
最后來分析下獲取鎖方法 acquireQueued
執行如下:
final boolean acquireQueued(final Node node, int arg) { // ①
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //②
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // ③
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
① ,傳入node 為需要獲取鎖的節點,arg為之前state狀態.
② ,如果傳入的節點的前驅節點是head節點,說明當前節點是AQS隊列的首節點,可以嘗試去獲取鎖,即我們需要是要實現的同步語義方法tryAcquire,如果同步語義獲取鎖成功,則設置當前頭節點為頭節點。
這里注意返回的值得語義是是否發生中斷,而不是獲取鎖是否成功。
③ ,調用 shouldParkAfterFailedAcquire方法,該方法用來判斷獲取鎖失敗后是否需要park當前線程,如果需要park線程,則接着判斷該線程是否有中斷標志。
接着我們來看下shouldParkAfterFailedAcquire 方法。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // ①
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { // ②
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //③
}
return false;
}
① 前驅節點pred的waitStatus為SIGNAL,說明當前節點有效,返回true,代表需要park當前線程,
② 前驅節點已經取消,則刪除去取消掉的前驅節點,返回false,外面繼續for循環獲取鎖
③ 處在該條件語句的前驅節點的waitStatus必定是 0,或者是傳播PROPAGATE,則設置傳播節點為SIGNAL,然后返回false,則接着去for循環獲取鎖,並且失敗的時候,調用shouldParkAfterFailedAcquire時知道前驅為SIGNAL(之前由③設置),則需要park線程。
從這里可以知道transferForSignal方法中,!compareAndSetWaitStatus(p, ws, Node.SIGNAL)語句,如果對其前驅節點設置Node.SIGNAL失敗,則不需要等到acquireQueued去判斷是否需要park線程,直接unpark線程即可
