AbstractQueuedSynchronizer源碼解讀--續篇之Condition


1. 背景

在之前的AbstractQueuedSynchronizer源碼解讀中,介紹了AQS的基本概念、互斥鎖、共享鎖、AQS對同步隊列狀態流轉管理、線程阻塞與喚醒等內容。其中並不涉及Condition相關的內容。本文主要介紹AQS中Condition的實現即ConditionObject類的源碼。
Condition在JUC中使用很多,最常見的就是各種BlockingQueue了。

2. Condition是什么

java.util.concurrent.locks.Condition是JUC提供的與Java的Object中wait/notify/notifyAll類似功能的一個接口,通過此接口,線程可以在某個特定的條件下等待/喚醒。
與wait/notify/notifyAll操作需要獲得對象監視器類似,一個Condition實例與某個互斥鎖綁定,在此Condition實例進行等待/喚醒操作的調用也需要獲得互斥鎖,線程被喚醒后需要再次獲取到鎖,否則將繼續等待。
而與原生的wait/notify/notifyAll等API不同的地方在於,JUC提供的Condition具有更豐富的功能,例如等待可以響應/不響應中斷,可以設定超時時間或是等待到某個具體時間點。
此外一把互斥鎖可以綁定多個Condition,這意味着在同一把互斥鎖上競爭的線程可以在不同的條件下等待,喚醒時可以根據條件來喚醒線程,這是Object中的wait/notify/notifyAll不具備的機制

3. 代碼解讀

3.1 套路

JUC中Condition接口的主要實現類是AQS的內部類ConditionObject,它內部維護了一個隊列,我們可以稱之為條件隊列,在某個Condition上等待的線程被signal/signalAll后,ConditionObject會將對應的節點轉移到外部類AQS的等待隊列中,線程需要獲取到AQS等待隊列的鎖,才可以繼續恢復執行后續的用戶代碼。

這里給出一個流程:

await流程:
1. 創建節點加入到條件隊列
2. 釋放互斥鎖
3. 只要沒有轉移到同步隊列就阻塞(等待其他線程調用signal/signalAll或是被中斷)
4. 重新獲取互斥鎖
signal流程:
1. 將隊列中第一個節點轉移到同步隊列
2. 根據情況決定是否要喚醒對應線程


這里以我之前在[AbstractQueuedSynchronizer源碼解讀]畫的AQS狀態流轉圖來說明下:
如果一個節點通過ConditionObject#await等方法調用初始化后,在被喚醒之后,會將狀態切換至0,也即無狀態,隨后進入AQS的同步隊列,此后就與一般的爭鎖無異了。

3.2 await方法

public final void await() throws InterruptedException {
    // 對中斷敏感。
    if (Thread.interrupted())
        throw new InterruptedException();
    // 加到條件隊列中。
    Node node = addConditionWaiter();
    // 完全釋放互斥鎖(無論鎖是否可以重入),如果沒有持鎖,會拋出異常。
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    /*
     * 只要仍未轉移到同步隊列就阻塞。
     * 轉移的情況如下:
     * 1. 其它線程調用signal將當前線程節點轉移到同步隊列並喚醒當前線程。
     * 2. 其它線程調用signalAll。
     * 3. 其它線程中斷了當前線程,當前線程會自行嘗試進入同步隊列。
     */
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        /*
         * 獲取中斷模式。
         * 在線程從park中被喚醒的時候,需要判斷是否此時被中斷,若中斷則嘗試轉移到同步隊列。
         * 1. 中斷且自行進入同步隊列,返回THROW_IE(值-1),后續需要拋出InterruptedException。
         * 2. 中斷且未能自行進入同步隊列,則說明有線程調用signal/signalAll喚醒線程並嘗試轉移到同步隊列,
         *     返回REINTERRUPT,后續重新中斷線程。
         * 3. 線程未被中斷,返回0,此時需要重試循環判斷是否上了同步隊列。
         */
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 重新獲取互斥鎖過程中如果中斷並且interruptMode不為"拋出異常",設置為REINTERRUPT。
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 如果線程發生過中斷則根據THROW_IE或是REINTERRUPT分別拋出異常或者重新中斷。
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
    Node t = lastWaiter;
    /*
     * 如果條件隊列中最后一個waiter節點狀態為取消,
     * 則調用unlinkCancelledWaiters清理隊列。
     */
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        // 重讀lastWaiter。
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // t如果為null, 初始化firstWaiter為當前節點。
    if (t == null)
        firstWaiter = node;
    else
        // 將隊尾的next連接到node。
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

/**
 * 移除隊列中所有取消節點。
 */
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    // 記錄上一個非取消節點。
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            // 斷開
            t.nextWaiter = null;
            if (trail == null)
                // 如果trail為null,取當前節點的后繼作為頭節點的值(next可以為null)。
                firstWaiter = next;
            else
                // 否則把trail連接到當前節點的后繼。
                trail.nextWaiter = next;
            // 如果當前節點沒有后繼了, 更新lastWaiter為trail, 即上一個非取消節點。
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

final boolean isOnSyncQueue(Node node) {
    /*
     * 節點狀態為CONDITION一定是在條件隊列,
     * 或者如果prev為null也一定是在條件隊列。
     *
     * 同步隊列里的節點prev為null只可能是獲取到鎖后調用setHead清為null,
     * 新入隊的節點prev值是不會為null的。
     * 另外,條件隊列里節點是用nextWaiter來維護的,不用next和prev。
     */
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    /*
     * 如果next不為null,一定是在同步隊列的。
     * 這里值得一提的是在AQS的cancelAcquire方法中,
     * 一個節點將自己移除出隊列的時候會把自己的next域指向自己。
     * 即node.next = node;     
     *
     * 從GC效果上來看node.next = node和node.next = null無異,
     * 但是這對此處next不為null一定在同步隊列上來說,
     * 這樣可以將節點在同步隊列上被取消的情況與普通情況歸一化判斷。
     */
    if (node.next != null)
        return true;
    /*
     * 有可能node.prev的值不為null,但還沒在隊列中,因為入隊時CAS隊列的tail可能失敗。
     * 這是從tail向前遍歷一次,確定是否已經在同步隊列上。
     */
    return findNodeFromTail(node);
}

/**
 * 從隊列尾部向前遍歷判斷節點是否在隊列中。
 */
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

private int checkInterruptWhileWaiting(Node node) {
    /*
     * 1. 線程未中斷返回0
     * 2. 線程中斷且入同步隊列成功,返回THROW_IE表示后續要拋出InterruptedException。
     * 3. 線程中斷且未能入同步隊列(由於被signal方法喚醒),則返回REINTERRUPT表示后續重新中斷。
     */
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    /*
     * 上面CAS失敗的原因是signal()方法被調用,狀態已經被搶先更新了。
     * 這時需要自旋等待節點成功進入同步隊列,否則會影響后續的重新獲取鎖acquireQueued()方法。
     * 因為acquireQueued必須在節點成功入隊后才可以調用。
     */
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

/**
 * THROW_IE則拋出InterruptedException,
 * REINTERRUPT則重新中斷當前線程。
 */
private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
}

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

3.3 signal/signalAll方法

public final void signal() {
    // 檢查互斥鎖持有情況。
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        // 將firstWaiter設置為后繼節點,如果為null,則置lastWaiter為null。
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 斷開連接。
        first.nextWaiter = null;
        /*
         * 如果轉移失敗並且下一個節點不為null,則重試。
         * 在這里轉移失敗只可能因為節點被取消。
         */
    } while (!transferForSignal(first) &&
            (first = firstWaiter) != null);
}

public final void signalAll() {
    // 檢查互斥鎖持有情況。
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

private void doSignalAll(Node first) {
    // 將firstWaiter和lastWaiter先清為null。
    lastWaiter = firstWaiter = null;
    // 從first開始一直遍歷到第一個null節點。
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

final boolean transferForSignal(Node node) {
    // 必須將狀態從CONDITION流轉到0,如果失敗則說明節點被取消,因為這里不會存在signal的競爭。
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 插入同步隊列。
    Node p = enq(node);
    int ws = p.waitStatus;
    /*
     * 如果前驅節點狀態為取消或者無法將狀態CAS到SIGNAL(比如可能前驅在此期間被取消了),
     * 則需要喚醒參數node節點對應的線程,使其能開始嘗試爭鎖。
     *
     * 如果將前驅狀態切到SIGNAL了,則由相應線程在釋放鎖之后喚醒node節點對應線程。
     */
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

4. 思考與總結

至此,已經對ConditionObject的await/signal/signalAll方法源碼進行了分析。對於中斷不敏感的awaitUninterruptibly, 帶有時限的awaitNanos由於大致套路與await無異,不作冗述。
ConditionObject的firstWaiter/lastWaiter以及AQS.Node的nextWaiter都是沒有volatile修飾的。這是因為ConditionObject假設在await/signal/signalAll等方法的調用是已經持有互斥鎖的。

個人認為ConditionObject這樣的設計是有個問題的。即按照Condition接口的定義,在不持鎖情況下調用await由子類決定如何處理,通常是拋出InterruptedException。但如果同時有持鎖和不持鎖的線程調用await方法,可能會對ConditionObject的內部隊列造成破壞,后果就是有些成功調用await方法的線程可能永遠沒有辦法被喚醒,因為無法通過隊列追溯到它們。也就是非法調用會拋出異常,但仍然會對內部數據結構造成破壞,這其實是有一些不合理的,至少是可以改進的地方。
最簡單的處理方式是,對於不持鎖的請求拋出異常,不應該依靠await -> fullyRelease這一步來拋出異常,此時按照流程已經調用過addConditionWaiter了。可以在await這類方法前面與signal/signalAll一樣預檢查一次持鎖情況:

if (!isHeldExclusively())
    throw new IllegalMonitorStateException();

整體而言,ConditionObject中代碼對GC友好,邏輯縝密,讀過之后受益匪淺。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM