Condition的await()和signal()流程


介紹

Conditionj.u.c包下提供的一個接口。
可以翻譯成 條件對象,其作用是線程先等待,當外部滿足某一條件時,在通過條件對象喚醒等待的線程。ArrayBlockingQueue就是通過Condition實現的。

先看一下Condition接口提供了哪些方法:

/**
 *  條件對象
 */
public interface Condition {

    /**
     * 讓線程進入等待,如果其他線程調用同一Condition對象的notify/notifyAll,那么等待的線程可能被喚醒
     */
    void await() throws InterruptedException;

    /**
     * 不拋出中斷異常的await方法
     */
    void awaitUninterruptibly();

    /**
     * 帶超時的await
     */
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    /**
     * 帶超時的await(可指定時間單位)
     */
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 帶超時的await(指定截止時間)
     */
    boolean awaitUntil(Date deadline) throws InterruptedException;

    /**
     * 喚醒等待的線程
     */
    void signal();

    /**
     * 喚醒所有線程
     */
    void signalAll();
}

Condition接口主要提供了兩類方法——讓線程等待的方法(await()等)和喚醒線程的方法(signal())。

AQS內部提供了Condition接口的實現——ConditionalObject。它內部的字段如下:

    private static final long serialVersionUID = 1173984872572414699L;
    //該ConditionObject維護的等待隊列的頭節點
    private transient Node firstWaiter;
    //該ConditionObject維護的等待隊列的尾節點
    private transient Node lastWaiter;

非常簡單,從上面的字段我們大概可以猜到Condition內部也維護了一個隊列。
上篇文章中,我們已經分析鎖實現的遠離就是通過節點構成隊列:讓隊列中除頭節點外的其他線程都被Park,當頭節點釋放鎖時,頭節點喚醒下一個節點(Unpark線程),同時更新頭節點。
舉一反三,我們推測Condition喚醒功能的原理也是通過維護隊列的節點。
接下來就通過分析源碼,(主要是await()signal()方法),驗證我們的猜測。


Condtion對象的獲取

Condition對象的獲取主要是通過Lock.newCondition()方法。
一個Lock對象可以返回多個Condition對象。
在對Condition進行等待或者喚醒前,都需要先持有Condition關聯Lock對象,否則會拋出IllegalMonitorStateException異常。


Condition.await()過程

public final void await() throws InterruptedException {
            //如果線程已經被標記為中斷,則拋出異常
            if (Thread.interrupted())
                throw new InterruptedException();
            //將線程添加進等待隊列
            //注意等待隊列和AQS維護的阻塞隊列是兩個不同的隊列
            //正常流程當線程能調用await(),說明線程此時擁有鎖,此時AQS的阻塞隊列中,線程應該在head節點
            Node node = addConditionWaiter();

            //釋放掉鎖(如果釋放失敗,NODE的waitStatus被更新為CANCELLED)
            //同時因為釋放掉了鎖,該線程在阻塞隊列中的節點也已經被移除
            int savedState = fullyRelease(node);

            //這里會將線程掛起,除非線程節點被移到AQS的阻塞隊列或是線程被外部中斷
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                //檢查是否是由於被中斷而喚醒,如果是,則跳出循環
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //在阻塞隊列中嘗試獲取鎖
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //節點已經在阻塞隊列中,與Condition的等待隊列聯系斷開
            //對於SIGNAL喚醒的線程而言,SIGNAL時除了將節點移到阻塞隊列,同時也清空了node.nextWaiter
            //而對於中斷喚醒的線程而言,只是將節點移到阻塞隊列,並沒有清空node.nextWaiter(因為此時線程不持有,操作等待線程並非線程安全)
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            //根據interruptMode 決定是否需要拋出異常
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

await()方法主要可以分為以下幾個過程:
1)線程將自己包裝成節點,並添加到Condition的阻塞隊列中
2)線程主動釋放掉鎖
3)線程進入自循環等待(主動通過LockSupport.park()),醒來時,檢查自己是否已經被移動至Lock的阻塞隊列
4)線程在阻塞隊列中等待,直到獲取鎖(線程在等待時可能又會被LockSupport.park()掛起)
5)線程獲取鎖,檢查自己在等待過程中(await()過程)是否有被中斷
6)如果有需要,則清理節點與等待隊列之間的聯系
7)根據中斷狀態確定是否需要拋出異常,以便讓await()的調用者可以響應線程的中斷狀態

從上面的流程,我們可以清楚的了解到以上步驟1和2,以及步驟5,6,7是持有鎖的,步驟3和4並沒有持有鎖。了解這一點很重要,因為涉及某些方法是否需要以CAS來保證線程安全。
了解了大體流程,接下來就逐步分析各個步驟。

步驟1.線程包裝成節點,添加進Condition的等待隊列

這一步驟主要是addConditionWaiter()過程

    private Node addConditionWaiter() {
            Node t = lastWaiter;

            // 找出該ConditionObject的等待隊列中 真正未被取消的最后一個節點,並更新為lastWaiter
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }

            //將該線程包裝成Node
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            //如果此時ConditionObject隊列為空,初始化鏈表且頭節點為node
            if (t == null)
                firstWaiter = node;
            else //否則將node添加隊尾
                t.nextWaiter = node;
            //更新鏈表尾節點為node
            lastWaiter = node;
            return node;
        }

主要找出等待隊列的最后一個節點,將線程包裝成Node,添加到隊列的隊尾。
這里要注意的一點是此時Node的waitStatusCONDITION。節點的waitStatus對判斷等待是否被取消很重要,在等待隊列中等待的節點狀態應該為CONDITION,如果狀態不為CONDITION,說明線程已經取消了等待(如果waitStatus為0說明被喚醒或中斷)。

步驟2.線程釋放鎖

釋放鎖的步驟比較簡單。主要通過fullRelease()更新AQSstate為0並且將AQS的擁有者置為null,同時喚醒阻塞隊列中的后繼節點。

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            //獲取鎖被持有的此時
            int savedState = getState();
            //讓鎖直接釋放被持有的次數
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            //如果釋放失敗了,則將節點標記為CANCELLED
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

步驟3.線程掛起,進入循環等待

這一步比較關鍵,線程等待的動作都發生在這一步。

    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            //檢查是否是由於被中斷而喚醒,如果是,則跳出循環
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
    }

線程執行到這,會被LockSupport.park()掛起。
如果此時線程被喚醒,線程會檢查是否是因為中斷,如果發生中斷,還需要確定中斷是否發生在SIGNAL前(如果發生在SIGNAL前,之后線程需要拋出異常,讓外部響應)。
如果線程不是因為中斷而喚醒,線程需要確認節點是否已經被移動至AQS的等待隊列。如果沒有被移動,則繼續被掛起(防止假喚醒)。

checkInterruptWhileWaiting()就是用來檢測線程在等待的時候是否被中斷。

/**
         * 檢查線程在WAITING狀態期間,是否有被中斷
         * 如果沒有返回0;如果是在SIGNAL之前被中斷,返回 THROW_IE;如果在SIGNAL之后被中斷,則返回REINTERRUPT
         */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

如果線程被中斷,還需要通過transferAferCancelledWait判斷中斷是否發生在SIGNAL之前。

final boolean transferAfterCancelledWait(Node node) {
        //CAS操作,期待值是CONDITION,說明此時喚醒是被取消(中斷),因為如果是SIGNAL,那么waitingStatus不會CONDITION,而是0(可以見SIGNAL流程)
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            //即使是取消的,也需要移到AQS的阻塞隊列
            enq(node);
            return true;
        }

        //說明線程先收到了SIGNAL信號
        //此時要等SIGNAL信號處理完成
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

前文說了Node節點的waitStatus是一個很重要的狀態,從它可以推斷出線程節點發生了什么操作。
上述方法先用CAS嘗試將更新節點的waitStatus為0,期待值為CONDITION
如果嘗試成功,說明此時節點未被操作過(SIGNAL信號),線程是中斷喚醒的,此時需要通過enq()將節點添加AQS阻塞隊列,因為此時沒有鎖,所以enq()方法以CAS重試的方式保證線程安全。
如果嘗試失敗,說明線程收到了SIGNAL信號,節點將由負責SIGNAL的線程移動至阻塞隊列。這里為了避免線程返回過早,在判斷出線程還未移動至阻塞隊列的情況下,會通過Thread.yeild()讓出CPU時間。

步驟4.線程在阻塞隊列中重新等待鎖

這一步主要是通過acquireQueue()方法。該方法已經在上一篇文章中介紹過了,這里不過多介紹。
需要注意的一點是,即使線程在等待時被中斷,仍然需要在AQS的阻塞隊列中等待獲取鎖。因為外部沒有辦法在線程獲取鎖之前發現中斷狀態,而且即使線程拋出了中斷異常,此時線程也是持有鎖的,外部需要顯式的釋放。

步驟5.檢查並設置中斷狀態

這一步很簡單,主要就是通過步驟3中的checkInterruptWhilewaiting()方法返回值:0表示未中斷,-1表示中斷發生在SIGNAl之前,1表示中斷發生在SIGNAL之后。

步驟6.清理節點與等待隊列的聯系

這里有兩種情況,如果線程是因為SIGNAL喚醒的,在喚醒時調用signal()的線程已經清理了被喚醒節點與等待隊列的關系。 因為那時喚醒線程持有鎖,操作是安全的。
但是如果對於中斷被喚醒的線程,喚醒時是不持有鎖的,不能保證線程安全的清理喚醒節點與等待隊列的關系。因此就將等待清理工作放在了獲取鎖之后。

//此方法並不保證線程安全,因此調用此方法時,必須要在獲取鎖的情況下調用
        //此方法的目的是為了整理Condition的等待隊列,將非CONDITION狀態的節點從等待隊列中移除
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

步驟7.確定中斷狀態,並決定是否拋出異常

這一步也很簡單,就是根據interruptMode來確定是否拋出異常,如果interruptMode值為THROW_IE,說明線程被喚醒前先被中斷,此時需要拋出InterruptedException


Condition.signal()過程

signal()過程比起await()要簡單很多。既然await()過程是將節點添加到等待隊列,那么signal()作為await()的逆過程,就是將節點從等待隊列重新移動到AQS阻塞隊列。

        /**
         * 喚醒等待節點
         * 主要的流程就是將節點從Condition的等待隊列移到AQS的阻塞隊列中,讓其重新等待鎖的獲取
         */
        public final void signal() {
            //先驗證喚醒者是否是鎖的持有者
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //喚醒等待隊列的第一個節點
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

為保證安全,先確定喚醒線程是否為鎖的持有者。
之后找出等待隊列中的頭節點,將其喚醒。

        /**
         * 喚醒操作(此時占有鎖,為線程安全),
         * 找出等待隊列中第一個真正要被喚醒的節點,移動到阻塞隊列,
         */
        private void doSignal(Node first) {
            do {
                //找出第一個需要喚醒的節點
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                //斷開和之后等待隊列的聯系,並更新等待隊列的頭節點
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null); //等待隊列的頭節點移動失敗 且 等待隊列中還有其他節點,則繼續嘗試其他節點
        }

因為頭節點可能已經被取消,所以這里會一直在等待隊列中從前往后找一個節點,開始喚醒。直到一個節點喚醒成功或者等待隊列中沒有節點需要喚醒。
上文也說了喚醒過程其實就是節點的移動過程。

    /**
     * 將節點從Condition的等待隊列移動到AQS的阻塞隊列(該方法只會在signal相關的方法中被調用)
     */
    final boolean transferForSignal(Node node) {
        //CAS操作更新節點的waitStatus,期待值為CONDITION
        //操作成功,說明沒有其他線程在操作這個節點
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //CAS操作失敗,說明節點已經被中斷操作過,waitStatus已經變成了0
            return false;

        //這里說明CAS操作成功
        //應該是線程安全的
        //將節點添加到阻塞隊列的隊尾
        Node p = enq(node);
        //判斷此時阻塞隊列是否有前繼節點等待,有就Park線程,等待前繼節點喚醒
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

transferForSignal()操作其實就是先通過CAS操作確定等待隊列中的節點還未被取消。如果CAS操作成功,則將其添加到阻塞隊列,如果此時阻塞隊列還需要等待鎖,則讓await()的線程繼續被掛起。將給線程喚醒的任務(此時已經是獲取鎖的任務,不再是之前的await()任務)交給阻塞隊列中的前繼節點。

await()以及signal()過程的流程圖

跟着流程圖在走一遍,可以幫助鞏固上述的知識點。
Condition的await()和signal()過程

總結

再借上篇文章重口味的比方梳理下這個流程。
當你排隊進了WC的包廂,想要方便時,你覺得太臟了,於是你在包廂內留了張紙條,希望有人能在廁所包廂干凈了在叫你過來上廁所(調用Condition的await ()),隨后主動讓出了包廂的使用權(釋放鎖)。后面在排隊等着方便的人便進去了。
你離開后,到另一個地方邊玩手機邊等(移動到Condition的阻塞隊列,並被Park),期間,也有一些人同樣覺得廁所太臟,跑了出來,在外面等,並排在了你的后面。
過了一段時間,有人把打電話給你說廁所已經變干凈了,可以去用了,你就重新回到廁所那排起了隊伍,等待輪到你用廁所(acquireQueue)。因為那個人僅通知了你,沒有通知其他因為嫌棄廁所臟,而跑出來的人,所以那些只能繼續在那等別人去叫他們。
如果你在等待的時候突然有人用其他方式把你喊了回去(外部未給signal卻被中斷),你又主動從在外面等待走到了廁所前去排隊等待繼續去用廁所,但是等你重新排到包廂時,發現測試還是很臟(Condition的 條件未滿足),你就會拋出異常。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM