JAVA並發(1)-AQS(億點細節)


AQS(AbstractQueuedSynchronizer), 可以說的誇張點,並發包中的幾乎所有類都是基於AQS的。

一起揭開AQS的面紗

1. 介紹

為依賴 FIFO阻塞隊列阻塞鎖相關同步器(semaphores, events等)的實現提供一個框架。 為那些依賴於原子state的同步器提供基礎(CyclicBarrier、CountDownLatch等). 支持獨占模式共享模式, 不同的模式需要實現不同的方法.

引用這位大佬的圖 http://www.cnblogs.com/waterystone/p/4920797.html
AQS整體結構
這個圖是AQS整體結構,從圖中可以看到,AQS維護着一個阻塞隊列(當線程獲取資源失敗時,就會進入該隊列,等待,直到被喚醒), state是一個共享的資源。

2. 源碼剖析

我們先看看AQS的類圖,
AQS類圖

內部類: Node,阻塞隊列維護的元素;ConditionObject, 支持獨占模式下的子類用作Condition實現, 后面會講到。先看看Node的結構。

static final class Node {
	/** 表明阻塞一個共享模式的結點 */
        static final Node SHARED = new Node();
        /** 表明阻塞一個獨占模式的結點 */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
	// 發生的情況: 中斷或者超時
        static final int CANCELLED = 1;

        /** 表明后繼節點需要被喚醒 */
	// 發生的情況: 后繼結點被park()阻塞,當目前的結點釋放或取消,必須要unpark它的后繼結點
        static final int SIGNAL = -1;

        /** waitStatus value to indicate thread is waiting on condition */
	// 發生的情況: 當前結點在條件隊列中(后面會講解)
        static final int CONDITION = -2;

        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        // 發生的情況: 執行releaseShared后,需要傳播釋放其他結點。
        // 在doReleaseShared中進行設置。(后面講解這個狀態的必要性)
        // 在共享模式下,才會用到
        static final int PROPAGATE = -3;

	/** 結點的狀態, 初始值為0*/
	volatile int waitStatus;

	volatile Node prev;

	volatile Node next;

	volatile Thread thread;
	 ....
}

tip: waitStatus > 0, 即 CANCELLED, 此時的結點不正常。

AQS使用了模板模式, 自主選擇重新定義以下方法

  • tryAcquire - 獨占模式
  • tryRelease - 獨占模式
  • tryAcquireShared - 共享模式
  • tryReleaseShared - 共享模式
  • isHeldExclusively

調用這些方法,都會引發UnsupportedOperationException,后面的文章將通過其子類,來講解它們的實現。

有了這些知識后,我們從下面這幾個關鍵的共有方法入手去講解AQS

  • acquire(int arg) - 獨占模式
  • release(int arg) - 獨占模式
  • acquireShared(int arg) - 共享模式
  • releaseShared(int arg) - 共享模式

2.1 acquire

獨占模式下的獲取資源,忽略中斷。調用tryAcquire至少一次,若成功就返回。否則,將線程入隊,並可能反復阻塞和接觸阻塞,並調用tryAcquire直至成功。此方法可以用於實現 Lock.lock

    public final void acquire(int arg) {
	// (2.1.1)
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire在后面文章結合子類進行分析。
代碼(2.1.1)中,此時調用了tryAcquire,獲取資源失敗,返回false,繼續執行后續方法。

addWaiter -- Queuing utilities

使用當前線程和給定mode, 新建一個Node,並且將新建Node入隊

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
           // (2.1.2) 記住這個地方, 后面有個知識點會用到
            node.prev = pred;
           // (2.1.3)
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 插入node入隊,初始化如果有必要
        // 執行到這的情況:
        // 1. 阻塞隊列為空時,即tail == null
        // 2. 代碼(2.1.3), CAS失敗
        enq(node);
        return node;
    }

enq -- Queuing utilities

一直循環直到node入隊成功

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 初始時,head與tail都為空,需要新建一個哨兵結點
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            // 插入結點
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

經過了上面的操作,目前的線程已經加入了隊尾,此時做的事情就是阻塞自己,等待資源釋放並且獲取,然后執行自己的操作

acquireQueued

以獨占且不可中斷的模式,獲取已經在阻塞隊列中的線程。若在等待時被中斷,返回true

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 不斷自旋直到獲取到資源或被park
            for (;;) {
                final Node p = node.predecessor();
                // 若node的前繼結點是head,執行tryAcquire,嘗試獲取資源(可能剛好釋放了資源,就可以不用阻塞)
                // (2.1.4)
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 檢測是否需要被park,若需要就進行park,並返回在等待時是否被中斷
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //  失敗,發生的情況 1. 被中斷 2. 超時.(其他方法調用該方法時,會發生)
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire -- Utilities for various versions of acquire

對獲取資源失敗的node,檢測並獲取結點。返回true,如果線程需要阻塞

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前驅結點的狀態為SIGNAL,此時就可以安心的park,等待前驅節點釋放資源,然后喚醒自己
        if (ws == Node.SIGNAL)
            return true;

        // 此時ws為CANCELLED
        if (ws > 0) {
            /*
             * 前驅結點狀態為CANCELLED,跳過前驅結點並重試, 直到前驅節點不為CANCELLED
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus此時的值要么是0要么是PROPAGATE.
             * 需要把前驅結點狀態設置為SIGNAL
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

cancelAcquire -- Utilities for various versions of acquire

取消正在進行的獲取資源的嘗試

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 獲取過濾后的前驅的后驅節點
        Node predNext = pred.next;

        node.waitStatus = Node.CANCELLED;

        // node為tail,就直接從隊列中刪除
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;

            // 1. 前驅結點不是頭結點,該結點不是老二
            // 2. 滿足以下任意一個條件:
                  // 2.1 此時的前驅結點狀態為SIGNAL
                  // 2.2 此時的前驅結點狀態為PROPAGATE或CONDITION,成功將狀態設置為SIGNAL
            // 3. 前驅節點的任務不為空
            // 滿足上面的條件,就將節點的前驅結點的next 指向 節點的后驅結點
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                  pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 此時結點剛好是老二;
                // 代碼(2.1.4) 可以看出,頭結點要么是哨兵結點,要么是已經獲取到資源的結點。
                // 此時喚醒node的后驅結點,是為了防止后驅結點一直阻塞
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

unparkSuccessor

后驅結點存在一個在正在等待的結點,則喚醒它

private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;

        // 將當前的結點的waitStatus設置為0,失去SIGNAL、PROPAGATE的含義
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // (2.1.5)
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // [問題一] 為什么node的后驅結點為空,重新尋找是從后往前找
            // 只要waitStatus <= 0, 都有機會被喚醒
            for (Node t = tail; t != null && t != node; t = t.prev)
                // (2.1.6)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

2.1.1 acquire流程圖

根據上面的分析,整一個流程圖
acquire流程圖

2.1.2 小結

根據acquire流程圖,一句話小結其流程,嘗試獲取資源,失敗則將新建node(當前線程及獨占模式)入隊,檢測自己是否是老二,是老二就再一次嘗試獲取資源,成功就返回中斷標志,不是老二就設置為SIGNAL,park自己,然后安心等待被喚醒。

2.2 release

獨占模式下的釋放資源。解除阻塞一個或多個線程,當tryRelease返回true時。此方法可以用於實現 Lock.unlock

    public final boolean release(int arg) {
        // tryRelease返回true,繼續下面操作。
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

2.2.1 release流程圖

因為release(int arg)的主要流程是在tryReleaseunparkSuccessor中,但是tryRelease又是在子類中實現,所以該流程圖也可以看作unparkSuccessor的流程圖
release流程圖

2.2.2 小結

根據release流程圖, 一句話小結其流程, 釋放資源,喚醒后驅沒有被取消的結點。

下面講講AQS的另一種模式,共享模式

2.3 acquireShared

共享模式下獲取資源,忽略中斷。至少調用tryAcquireShared一次,成功就返回。否則,線程將入隊,可能會重復的阻塞和解除阻塞,直到調用tryAcquireShared成功。成功獲取到資源,將會喚醒后驅結點,若資源滿足。

    public final void acquireShared(int arg) {
        // 在子類探究tryAcquireShared
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

對tryAcquireShared返回的參數,進行簡單的介紹

  • 返回負數表示失敗;
  • 返回零,隨后的線程都不能獲取到資源
  • 返回正數,隨后的線程可以獲取到資源

此時tryAcquireShared的返回值是小於零,表示獲取資源失敗,進行下一步處理

doAcquireShared

獲取資源在不可中斷的模式下

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    // 若前驅結點是頭結點且剛好釋放了,嘗試獲取資源
                    // (2.3.1)
                    if (r >= 0) {
                        // 將當前node設置為head,並且嘗試喚醒node的后驅結點
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 跟獨占模式的處理是一樣的
                // (2.3.2)
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

setHeadAndPropagate

設置隊列的頭結點,達到條件就喚醒后面的結點.

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */

        // tips: h == null、(h = head) == null、s == null,是為了防止空指針的發生

        // 執行到下面的情況:
        // 1. propagate是tryAcquireShared返回的值。propagate(資源) > 0, 表示還有資源可以喚醒后面的結點。
        // 否則,此時propagate = 0, 結合代碼(2.3.1)
        // 2. 舊的head的waitStatus < 0
        // 舊的頭結點釋放了資源,執行了代碼(2.3.4), 此時的waitStatus 為PROPAGATE(初始化為0)
        // 3. 此時的head已經是當前結點了,后面若有結點(此時后面的結點在park),
        // 那么新head的waitStatus肯定為SIGNAL, 結合代碼(2.3.2)

        // 情況3,有可能會發生沒必要的喚醒,因為此時去喚醒新head的后驅結點,但是此時還沒
        // 有釋放資源,它后驅結點喚醒后,去獲取資源,獲取失敗,又被park.
        // 源碼注釋說到,雖然有爭議,但是大多數情況下,需要去喚醒
        // (2.3.3)
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

代碼(2.3.3)中, 為什么不只用propagate來判斷是否喚醒后驅結點 [問題二]

doReleaseShared

共享模式下主要的釋放資源的邏輯,喚醒后驅結點,確保線程不被掛起

 private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    // (2.3.4)
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                // (2.3.5)
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

2.3.1 acquireShared的流程圖

image

2.3.2 小結

一句話小結acquireShared的流程,嘗試獲取資源,若獲取到資源,資源還有剩余就去繼續喚醒后驅結點,若嘗試獲取資源失敗,就park自己,等待被喚醒。 跟acquire相比,最大的區別就是,獲取到資源acquireShared,還會去嘗試喚醒其后驅結點

2.4 releaseShared

Releases in shared mode. Implemented by unblocking one or more threads if tryReleaseShared returns true.

    // 代碼比較簡單,就不分析了~
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

2.5 問題解答

[問題一] 為什么在喚醒后驅結點時,node的后驅結點為空,需要重新從后往前找

   // (2.1.2) 記住這個地方, 后面有個知識點會用到
   node.prev = pred; // 1
   // (2.1.3)
   if (compareAndSetTail(pred, node)) { // 2
      pred.next = node; // 3
      return node;
   }

仔細觀察代碼(2.1.2) 和(2.1.3), 此時添加結點相當於有三步,都不是原子性的,當執行到第二步時,就要喚醒后驅結點了,此時新增的結點只設置了前驅結點,隊列設置了尾結點,但是沒有設置后驅結點,如果從前往后查找的話,可能會丟失符合要求的結點。

[問題二],代碼(2.3.3)中, 為什么不只用propagate來判斷是否喚醒后驅結點。
請看這位大佬的博客 講的非常詳細
大致意思就是,
我們假設有A、B、C、D四個線程,前兩個釋放資源的線程,后兩個是爭搶資源的線程,此時只有A或B釋放了資源,C、D才可以被喚醒,假設我們不看PROPAGTE
時刻一: A線程釋放資源,執行代碼 (2.3.4),head的waitStatus從SIGNAL(-1)變為了0
時刻二: C線程獲取到資源,執行到代碼(2.3.1), tryAcquireShared返回0
時刻三: B線程線程釋放資源,執行代碼 (2.3.4),因為此時未改變頭結點,head的waitStatus為0,不能unparkSuccessor
時刻四: 此時C執行到代碼(2.3.3),propagate(tryAcquireShared返回值)為0,C也不會去喚醒后驅結點,D線程就永遠GG了

引用doReleaseShared注釋中的一句話

status is set to PROPAGATE to ensure that upon release, propagation continues.

2.6 Condition

使用synchronized時,線程間通信使用wait, notify and notifyAll;而使用AQS實現的lock,線程間的通信就使用Condition中的await、signal...。Condition與Lock結合使用,同一個lock對應多個Condition。

sync queue & condition queue

public class ConditionObject implements Condition...

在AQS中,已經對Condition的方法進行了實現,子類想使用的話,只需要調用ConditionObject就行了

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

本來想跟着源碼走,簡簡單單介紹一下Condition,但是源碼有幾處細節,讓我頭禿,在網上搜索別人的博客,這篇博客解開了我的疑惑,對Condition介紹的非常詳細,寫的非常的完美~

根據大佬的博客,那我們下面簡單講解Condition的兩個常用方法

  • await
  • signal

2.6.1 await & signal

導致目前線程阻塞直到被喚醒或中斷;調用await后,會將當前的線程封裝成node,加入到條件隊列

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 添加結點到條件隊列中
            Node node = addConditionWaiter();
            // 釋放當前線程持有的鎖
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 不在同步隊列中,就park
            // (2.6.1)
            while (!isOnSyncQueue(node)) {
                // 執行到這,當前線程會立即掛起
                LockSupport.park(this);
                // 運行到這的話,情況: 1. signal 2. 中斷
                // 檢驗中斷
                // (2.6.2)
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 第二部分
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

為了講清楚代碼(2.6.1)之后的邏輯,我們先看看signal的源碼

將condition queue中等待最長的結點轉移到sync queue中去,去爭搶資源

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

此時,執行signal的主要邏輯

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                // 將后驅結點置空
                // (2.6.3)
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

將condition queue中的一個結點轉移到sync queue中去

    final boolean transferForSignal(Node node) {
        // 這里表示當前已經被取消了。
        // (2.6.4)
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        // 將當前結點放入sync queue的末尾, 此時返回的是當前結點的前驅結點(一定要注意)
        Node p = enq(node);
        int ws = p.waitStatus;
        // 前驅結點被取消,或者設置SIGNAL狀態失敗,就直接喚醒當前線程, 喚醒 = 有資格去競爭鎖了
        // enq返回的是前驅結點,我人傻了,看成是返回當前線程,就一直覺得邏輯不對。
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

這里signal的邏輯就講完了,總結一下:

  1. condition queue中找出等待時間最長且未被取消的結點, 轉移到sync queue的隊尾去,同時要在condition queue中刪除該結點
  2. 若在sync queue中的該結點的前驅結點被取消了或設置SIGNAL狀態失敗,要直接喚醒它,叫它去競爭鎖。

signalAll的主要邏輯和signal是一樣的,差別就是signalAll會把所有在condition queue中的結點轉移到sync queue中去,並清空所有在condition queue中的結點,下面只貼一下signalAll的主要代碼,

        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

我們再次回到await中去

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 添加結點到條件隊列中
            Node node = addConditionWaiter();
            // 釋放當前線程持有的鎖
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 不在同步隊列中,就park
            // (2.6.1)
            while (!isOnSyncQueue(node)) {
                // 執行到這,當前線程會立即掛起
                LockSupport.park(this);
                // 運行到這的話,情況: 1. signal 2. 中斷
                // 檢驗中斷
                // (2.6.2)
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 第二部分
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

代碼(2.6.1),繼續講解
isOnSyncQueue

一開始在條件隊列中,現在在sync queue中等待重新獲取資源,如果有這種的node就返回true

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }

findNodeFromTail

從尾部找尋結點

    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            // 結合下面的enq代碼以及圖思考一下,會明白此方法的意義的
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

node.waitStatus == Node.CONDITION, 表示當前結點肯定在condition queue中。

為何是上面的那些條件?
我們上面看了轉移到sync queue是用的enq方法

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // (2.6.5)
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

結合代碼(2.6.5),思考一下就知道isOnSyncQueue中條件設置的道理了,但是為何需要findNodeFromTail啦? 這是需要補充一個知識點了,在多個線程執行enq時,只有一個線程會設置為tail成功,其余的都只是設置prev,就可能會出現下面圖片中的情況,'多個尾巴'一直不斷自旋,最后會變成一個正常的鏈表。
enq造成的多個尾巴
此時線程的狀態是,調用await后,將結點添加到條件隊列中,且釋放了自己持有的所有資源,並將自己park,此時等待被signal或者被中斷。

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 添加結點到條件隊列中
            Node node = addConditionWaiter();
            // 釋放當前線程持有的鎖
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 不在同步隊列中,就park
            // (2.6.1)
            while (!isOnSyncQueue(node)) {
                // 執行到這,當前線程會立即掛起
                LockSupport.park(this);             ///  我們在這被掛起了
                // 運行到這的話,情況: 1. signal 2. 中斷
                // 檢驗中斷
                // (2.6.2)
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 第二部分
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

執行到代碼(2.6.2), 我們直到可能是被signal或被中斷了。現在要解決的是,

  1. 是否被中斷?
  2. 何時被中斷?
  3. 中斷如何處理?

我們帶着這三個問題,繼續出發~

補充一個小知識點,AQS定義了三種情況中斷的值

  • THROW_IE, signal前被中斷,要拋出InterruptedException
  • REINTERRUPT, signal后被中斷
  • 0, 未被中斷

關於REINTERRUPT這個中斷,可以理解成,吃飯,吃完了但是還有一個菜沒有上,問服務員,“如果沒有炒,就不要了”,但是服務員告訴,菜已經下鍋,所以這時候的中斷就是REINTERRUPT,中斷的太晚了。 -- 例子來自上面的那篇博客

我們繼續看向代碼(2.6.2)
checkInterruptWhileWaiting

        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

若被中斷Thread.interrupted肯定為true

transferAfterCancelledWait

    final boolean transferAfterCancelledWait(Node node) {
        // 中斷情況一: 此時結點還在condition queue中,肯定是signal前就被中斷了
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            // 這里還要加入到sync queue中,獲取到鎖才能拋出錯,繼續往后看
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
         // 中斷情況二: 這里的情況就是signal后,node還沒有執行enq,畢竟執行signal到執行enq還有幾個步驟
         // 此時就自旋,等待node轉移到sync node中就行了
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

上面的代碼已經注明了,各種情況的發生時機,此時我們來到了await的第二部分~

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 添加結點到條件隊列中
            Node node = addConditionWaiter();
            // 釋放當前線程持有的鎖
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 不在同步隊列中,就park
            // (2.6.1)
            while (!isOnSyncQueue(node)) {
                // 執行到這,當前線程會立即掛起
                LockSupport.park(this);             ///  我們在這被掛起了
                // 運行到這的話,情況: 1. signal 2. 中斷
                // 檢驗中斷
                // (2.6.2)
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 第二部分
            // acquireQueued獲取到鎖,並返回是否被中斷。
            // 有一種情況需要提一下,acquireQueued返回true,上面的interruptMode = 0,
            // 表示signal后,在獲取鎖的時候被中斷了
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;

            // node還在condition上,說明是被取消了的node,清除它
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                // 對上面得到的interruptMode做出回應
                reportInterruptAfterWait(interruptMode);
        }

reportInterruptAfterWait

        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

2.6.2 await與signal的流程圖

await流程圖


signal流程圖

2.6.3 小結

先講講await的流程,看起流程圖有點嚇人,其實很多步驟是對不同時機的中斷操作的記錄
當await被執行,下面簡單總結下await的流程

  1. 當前線程與CONDITION狀態封裝成node,加入到condition queue的末尾
  2. 釋放線程之前獲取的所有資源
  3. 若不在sync queue中,阻塞自己,等待被signal被中斷
  4. 獲取中斷操作的時機,並記錄表示何時中斷的值(interruptMode)
  5. 不管是怎么被喚醒的,都要去競爭資源,直到獲得資源為止
  6. 最后對不同的中斷值,作出不同的操作

signal的流程就相對於簡單一點

  1. 獲取condition queue的頭結點
  2. 檢驗是否被取消,若被取消,就獲取頭結點的后驅結點,以此類推;
  3. 將結點從condition queue中轉移到sync queue中,而且會從condition queue中刪除該節點
  4. 若結點插入sync queue,得到的前驅結點,被取消了,或者CAS前驅結點狀態為SIGNAL
    失敗,將直接unpark當前線程

3. 總結

Doug Lea,太秀了。AQS中有很多細枝末節的東西,只有自己去讀了源碼,理解為何這樣做,你才會明白才會真正讀懂AQS。
關於學習和寫AQS文章時,看了一些博客,為我解答了自己的疑惑,慢慢加油,我也要向這些大佬看齊~

4. 參考

~~# 5. 面試中問題
這是我的一個想法,若我博客中寫過的知識,在面試中有問到過,我會記錄下來,沒有就是目前還沒遇到過


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM