我要拿Offer之AQS條件隊列及中斷機制


微信公眾號:房東的小黑黑
路途隨遙遠,將來更美好
學海無涯,大家一起加油!

Condition條件隊列

當我們進行線程間的通信時,可以使用ReetrantLockCondition相結合,其中的await()signal()方法進行線程間的阻塞與喚醒。我將詳細的解釋其中的機制。

ConditionObject
ConditionObject是實現條件隊列的關鍵,每個ConditionObject對象都維護一個單獨的條件等待對列。一個AQS中可以有多個條件隊列,但是只有一個同步隊列。
該博客的描述挺好的,有的部分是引用里面的。AQS解析與實戰
條件隊列與同步隊列的聯系


1) 調用了 await()方法的線程,會被加入到 ConditionObject等待隊列中,並且喚醒同步隊列中head節點的下一個節點
2) 線程在某個ConditionObject對象上調用 signal()方法后,等待隊列中的firstWaiter會被加入到同步隊列中,等待被喚醒。
3)當線程調用 unLock()方法解鎖后,同步隊列中的head節點的下一個節點會被喚醒。

public class ConditionObject implements Conditionjava.io.Serializable {
        private transient Node firstWaiter;
        private transient Node lastWaiter;

每個ConditionObject對應一個條件隊列,它記錄該隊列的頭節點和尾節點。
條件隊列是單向的,而同步隊列是雙向的,會有前驅指針。

await()方法

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null// clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

整體的流程如下:

  • 執行await()時,會新創建一個節點並放入到該條件隊列尾部。
  • 然后釋放鎖,並喚醒同步隊列中的Head節點的后一個節點。
  • 然后while循環,將該節點阻塞,直達該節點被放入到同步節點或者被中斷了,才退出循環。
  • 退出循環后,開始調用acquireQueued()不斷嘗試拿鎖。
  • 拿到鎖后,會清空條件隊列中被取消的節點。
public final void signal() {
           //如果當前線程不是持有該鎖的線程,拋出異常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

主要過程:
將條件隊列中的頭節點放入到同步隊列尾部,並獲取它在同步隊列中的前驅節點
如果他前驅節點的狀態是取消狀態或者設置設置前驅節點為Signal失敗,則喚醒當前節點,
喚醒后會執行在await()方法阻塞后的代碼,會進行不斷嘗試獲取鎖。

線程的中斷

當我們想執行線程中斷時,一般會調用interrupt方法,對於初學者可能以為線程會立即中斷執行,但是如果線程處於運行狀態,不會受影響,會繼續執行下去。為什么會出現這種情況呢?

java的線程中斷機制,主要是依靠中斷標志位實現的,當調用interrupt方法時,會將中斷標志位設置為ture,然后針對線程此時的不同線程狀態,會有不同的處理。具體的處理情況可以閱讀這篇文章: java中斷詳細介紹及其對各種線程狀態的影響分析

執行中斷后,當線程處於waiting狀態會拋出中斷異常並將清空中斷標志位。
前面寫了一篇AQS的文章,介紹了acqure()方法,最后一步調用了selfInterrupt(),這是當前線程進行中斷。一般我們加鎖會調動lock()方法,但是實際上還有一個方法lockInterruptibly()。我將通過ReetrantLock類介紹它倆的區別。

final void lock() {
            if (compareAndSetState(01))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
}
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

首先分析下lock()方法,首先如果state為0,那么通過CAS機制將state設置為1,然后將當前線程設置為鎖的擁有者;其他情況則調用acquire(1)嘗試加鎖,具體的嘗試加鎖機制就不介紹了,在之前文章寫過,它是一個自旋操作,如果獲取失敗,會將線程park掛起,進入阻塞狀態,當其他線程執行完,會喚醒該線程,執行向下執行,這時會執行Thread.interrupted(),會判斷當前線程是否被中斷,如果該線程拿到鎖,會執行selfInterrupt()方法,將當前線程進行中斷,但是之后自己設計的邏輯中沒有如果沒有處理中斷的代碼,那么它會一直執行下去,或者將當前線程設置為waitiing狀態,就會拋出中斷異常,並消除中斷標志。

public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

public final void acquireInterruptibly(int arg)
            throws InterruptedException 
{
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

private void doAcquireInterruptibly(int arg)
        throws InterruptedException 
{
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null// help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

現在再介紹一下lockInterruptibly的機制。
首先它會判斷當前線程是否中斷,如果有,就馬上拋出異常;沒有中斷,則開始嘗試獲得鎖,獲取失敗,將該節點放入到同步隊列中,並執行自旋操作:
1)如果該節點的前驅節點頭節點,並且該節點獲取到了鎖,則設置當前節點是頭節點,並返回
2)其他情況,就需要將它的前置節點設置為Siginal,並將當前節點所屬的線程掛起,當重新喚醒后,如果發生中斷,會直接拋出異常,並將該節點從同步隊列中刪除

![](https://user-gold-cdn.xitu.io/2020/3/16/170e2b4f018e361d?w=900&h=500&f=png&s=91488)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM