Java多線程之JUC包:AbstractQueuedSynchronizer(AQS)源碼學習筆記


若有不正之處請多多諒解,並歡迎批評指正。

請尊重作者勞動成果,轉載請標明原文鏈接:

http://www.cnblogs.com/go2sea/p/5618628.html 

 

AbstractQueuedSynchronizer(AQS)是一個同步器框架,在實現鎖的時候,一般會實現一個繼承自AQS的內部類sync,作為我們的自定義同步器。AQS內部維護了一個state成員和一個隊列。其中state標識了共享資源的狀態,隊列則記錄了等待資源的線程。以下這五個方法,在AQS中實現為直接拋出異常,這是我們自定義同步器需要重寫的方法:

①isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它。

②tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗返回false。

③tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗返回false。

④tryAcquireShared(int):共享方式。嘗試獲取資源。成功返回true,失敗返回false。

⑤tryReleaseShared(int):共享方式。嘗試釋放資源,成功則返回true,失敗返回false。

其中isHeldExclusively需要在使用Condition時重寫,他在AQS中的調用全部發生在其內部類ConditionObject的方法中。②③和④⑤分別對應了AQS定義的兩種資源共享的方式:Exclusive&share,例如ReentrantLock就是一種獨占鎖,CountDownLatch和Semaphore是共享鎖。與CountDownLatch有一定相似性的CyclicBarrier並沒有自己的共享同步器,而是使用Lock和Condition來實現的(關於CyclicBarrier的詳解可以參考本人的另一篇博文http://www.cnblogs.com/go2sea/p/5615531.html)。

下面是一個簡單的獨占鎖的實現,它是不可重入的。它重寫了AQS的tryAcquire方法和tryRelease方法:

 1 import java.io.Serializable;
 2 import java.util.concurrent.TimeUnit;
 3 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 4 import java.util.concurrent.locks.Condition;
 5 import java.util.concurrent.locks.Lock;
 6 
 7 class Mutex implements Lock, Serializable {
 8     //自定義同步器,繼承自AQS
 9    private static class Sync extends AbstractQueuedSynchronizer {
10      //試圖獲取鎖,當state為0時能成功獲取,
11      public boolean tryAcquire(int acquires) {
12        assert acquires == 1; //這是一個對於state進行操作的量,含義自定義
13        if (compareAndSetState(0, 1)) {    //注意:這是一個原子操作
14          setExclusiveOwnerThread(Thread.currentThread());
15          return true;
16        }
17        return false;
18      }
19      //釋放鎖,此時state應為1,Mutex處於被獨占狀態
20      protected boolean tryRelease(int releases) {
21        assert releases == 1; // Otherwise unused
22        if (getState() == 0) throw new IllegalMonitorStateException();
23        setExclusiveOwnerThread(null);
24        setState(0);
25        return true;
26      }
27      //返回一個Condition
28      Condition newCondition() { return new ConditionObject(); }
29    }
30    
31    private final Sync sync = new Sync();
32    
33    public void lock()                { sync.acquire(1); }
34    public boolean tryLock()          { return sync.tryAcquire(1); }
35    public void unlock()              { sync.release(1); }
36    public Condition newCondition()   { return sync.newCondition(); }
37    public void lockInterruptibly() throws InterruptedException {
38      sync.acquireInterruptibly(1);
39    }
40    public boolean tryLock(long timeout, TimeUnit unit)
41        throws InterruptedException {
42      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
43    }
44  }

我們可以看到,利用AQS實現一個簡單的自定義鎖看上去並不復雜,讓我們以此為例,來學習一下AQS的內部原理吧。

一、acquire 獲取鎖

我們先來看一下Mutex重寫的tryAcquire方法:

  //試圖獲取鎖,當state為0時能成功獲取,
     public boolean tryAcquire(int acquires) {
       assert acquires == 1; //這是一個對於state進行操作的量,含義自定義
       if (compareAndSetState(0, 1)) {    //注意:這是一個原子操作
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }

注意:當我們初始化一個Sync的時候,如果沒有指定state的初值(無參數),那么state的默認初值是0。可以看到,方法開頭首先有一個斷言acquires==1,參數acquires代表要在state上做的改變的量(減去或增加),在Mutex中,我們定義state只有兩個狀態:0或1,0代表共享資源可以被獲取,1表示共享資源正在被占用,因此Mutex是不可重入的。實際上,自定義同步器通過重寫tryAcquire和tryRelease來定義state代表的意義和資源的共享方式,這是同步器的主要任務。Mutex的tryAcquire使用一個原子操作compareAndSetState來試圖獲取資源,這個原子操作由上層的AQS提供,如果成功,將當前線程設置為獨占線程並返回true。

Mutex的lock方法調用了sync的acquire方法,acquire方法實現為:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

它首先調用tryAquire去獲取共享資源,如果失敗,調用addWaiter將當前線程放入等待隊列,返回持有當前線程的Node對象,然后調用acquireQueued方法來監視等待隊列並獲取資源。acquireQueued方法會阻塞,直到成功獲取。注意,acquire方法不能及時響應中斷,只能在成功獲取鎖之后,再來處理。中斷當前線程的操作跑出的異常在acquireQueued方法中被捕獲,外部調用者沒能看到這個異常,因此調用selfInterrupt來重置中斷標識。

我們需要詳細了解addWaiter方法和acquireQueued方法,之后再來回顧acquire的過程,才能對整個獲取鎖的流程有比較詳細的了解。

我們先來看addWaiter方法:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

addWaiter首先將當前線程包裝在一個Node對象node中,然后獲取了一下隊列的尾節點,如果隊列不為空(tail不為null)的話,調用一個CAS函數試圖將node放入等待隊列的尾部,注意,此時可能發生競爭,如果有另外一個線程在兩個if之間搶先更新的隊列的尾節點,CAS操作將會失敗,這時會調用enq方法,繼續試圖將node放入隊列:

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq方法會循環檢測隊列,如果隊列為空,則調用CAS函數初始化隊列(此時node==head==tail),否則調用CAS函數將node放入隊列尾。注意,這兩個CAS是由AQS提供的原子操作。如果CAS失敗,enq會繼續循環檢測,直到成功將node入列。enq方法的這種方式有一個專用的名詞:CAS自旋,這種方式在AQS中有多處應用。這里有一個隱含的知識點,即tail是一個volatile成員,確保某個線程更新隊列后對其他線程的可見性。

注意:隊列為空的時候,第一個線程進入隊列的情況有點tricky:第一個發現隊列為空並初始化隊列(head節點)的線程不一定優先拿到資源。head節點被初始化后,當前線程需要下一次旋轉才有機會進入隊列,在這期間,完全有可能半路殺出程咬金,將當前線程與它初始化出的head節點無情分開。我們來總結一下,當隊列只有一個節點時(head=tail),有兩種情況:第一種是這個隊列剛剛被初始化,head並沒有持有任何線程對象。這個狀態不會持續太久,初始化隊列的線程有很大機會在下次自旋時把自己接到隊尾。第二種情況是,所有等待線程都已經獲得資源並繼續執行下去了,隊列僅有的節點是最后一個獲取共享資源的線程,等到下一個線程到達等待隊列並將它踢出隊列之后,它才有機會被回收。

enq執行完畢,我們已經成功把當前線程放入等待隊列,接下來的任務就是監視隊列,等待獲取資源。這個過程由acquireQueued方法實現:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

acquireQueued方法是一個很重要的方法,在分析這個方法之前,我們先來說一下AQS中的那個等待隊列。這個隊列實際上是一個CLH隊列,它保證了競爭資源的線程按到達順序來獲取資源,避免了飢餓的發生。CLH隊列的工作過程,就是acquireQueued方法的工作過程。很明顯,這又是一個自旋。首先,我們調用predecessor方法獲取當前線程的前驅節點,如果這個前驅是head節點,就緊接着調用tryAcquire去獲取共享資源,當然這是有可能失敗的,因為head節點可能剛剛“上位”,還沒有釋放資源。如果很幸運,我們拿到了資源,就調用setHead將node設置為隊列的頭結點,setHead方法同時會將node的prev置為null,緊接着將原先head的next也置為null,顯然這是為了讓其后續被回收。注意:acquireQueued方法在自旋過程中是不可被中斷的,當然它會檢測到中斷(在parkAndCheckInterrupt方法中檢測中斷標志),但並不會因此結束自旋,只能在獲得資源退出方法后,反饋給上層的方法:我剛剛被中斷了。還記得acquire方法中的selfInterrupt的調用嗎,就是為了“補上”這里沒有響應的中斷。

好,我們繼續往下。獲取資源失敗后(原因有二,head與我之間還有等待線程或者head節點的線程正在使用資源),調用shouldParkAfterFailedAcquire方法檢測是否該去“休息”下,畢竟一直自旋很累嘛。如果可以休息就調用parkAndCheckInterrupt放心去休息。我們先來看一下shuldParkAfterFailedAcquire:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

我們首先了解一下waitStatus。Node對象維護了一個int成員waitStatus,他的可能取值如下:

static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

下面解釋一下每個值的含義

CANCELLED:因為超時或者中斷,結點會被設置為取消狀態,被取消狀態的結點不應該去競爭鎖,只能保持取消狀態不變,不能轉換為其他狀態。處於這種狀態的結點會被踢出隊列,被GC回收;
SIGNAL:表示這個結點的繼任結點被阻塞了,到時需要通知它;
CONDITION:表示這個結點在條件隊列中,因為等待某個條件而被阻塞;
PROPAGATE:使用在共享模式頭結點有可能處於這種狀態,表示鎖的下一次獲取可以無條件傳播;
0:None of the above,新結點會處於這種狀態。

在我們的Mutex的例子中,節點的waitStatus只可能有CANCELLED、SIGNAL和0三中狀態(事實上,獨占模式下所有不使用Condition的同步器都是這樣)。

我們繼續來分析shouldParkAfterFailedAcquire方法:

首先檢測下node的前驅節點pred,如果pred狀態已經被置為SIGNAL,直接返回true。否則,從node的前驅繼續往前找,直到找到一個waitStatus小於等於0的節點,設置該點為node的前驅(注意:此時node與這個節點之間的節點從等待隊列中被“摘下”,等待被回收了)並返回false。返回之后,上層的acquireQueued方法繼續自旋,再次進入shouldParkAfterFailedAcquire方法之后,如果發現node前驅不是取消狀態且waitStatus不等於SIGNAL,調用CAS函數進行注冊。注意:這個操作可能失敗,因此不能直接返回true,而是返回false由上層的自旋再次調用shouldParkAfterFailedAcquire直到確認注冊成功。

歷盡曲折,我們終於可以安心休息了:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

parkAndCheckInterrupt方法十分簡單,他調用LockSupport的靜態方法park阻塞當前線程,直到被中斷,這次中斷會被acquireQueued記錄,但不會立即響應,直到自旋完成。注意:返回操作中的interrupted方法會將中斷標志復位,因此我們在上層需要將這個中斷“補上”,再一次:還記得大明湖邊的selfInterrupt嗎?

二、release 釋放鎖

我們先來看一下Mutex中重寫的tryRelease方法:

     //釋放鎖,此時state應為1,Mutex處於被獨占狀態
     protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }

邏輯比較簡單,首先將獨占線程置為null,緊接着將state設置為0,這里不會發生資源競爭,因此不需要用CAS去設置state值,直接置0即可。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

好,我們開始分析release方法。首先調用tryRelease試圖釋放共享資源,緊接着檢測自己的waitStatus是否為SIGNAL,如果是的話,調用unparkSuccessor喚醒隊列中的下一個線程。獨占模式下,waitStatus!=0與waitStatus==-1等價(這里waitStatus不會為CANCELLED,因為已經獲取資源了)。如果不為SIGNAL,說明如果有下個等待線程,它正在自旋。所以直接返回true即可。我們來看下unparkSuccessor方法:

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

unparkSuccessor方法也會在共享模式的工作流程中被調用,因此方法開始做的判斷是有必要的。對於獨占模式而言,ws應該都是0。然后找到下一個需要被喚醒的線程並調用LockSupport的靜態方法unpark喚醒等待線程。

至此,我們比較詳細地了解了acquire&release的工作流程。

三、acquireShared 獲取鎖

下面,我們來學習下共享模式下的獲取&釋放鎖的工作流程。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

acquireShared方法首先調用tryAcquireShared試圖獲取共享資源。tryAcquireShared的返回值表示剩余資源個數,負值表示獲取失敗,0表示獲取成功但已無剩余資源。如果獲取失敗,調用doAcquireShared方法完成獨占模式下類似的操作,后面我們會詳細分析。注意,doAcquireShared方法在等待資源的過程中也是不響應中斷的,它能覺察到中斷,但在成功獲取資源之前不會處理。

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireShared方法與acquireQueued方法相似,不同的地方在於,共享模式下成功獲取資源並將head指向自己之后,要檢查並試圖喚醒之后的等待線程。因為共享資源可能剩余,可以被后面的等待線程獲取。

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

setHeadAndPropagate中有一個長長的if,來判斷是否應該去試圖喚醒后面的線程。其中h==null的判斷筆者始終不能理解,因為查看代碼發現,之后隊列尚未初始化的時候為空,后續都不可能為空了。關於這點希望各位看官不吝指教。其他情況,propagate大於0,表示尚有資源可被獲取,顯然應該繼續判斷;而當h.waitStatus小於0時,它有兩種取值可能,SIGNAL和PROPAGATE,我們將在后面看到,這兩種情況都是應該繼續判斷。后續是對node的后繼進行的判斷,注意,node此時可能已經不是head節點了,因為這是共享模式,所以可能有一個node的后繼成功獲取資源后,把自己設為head,將node踢出了隊列。這種情況下node的后繼s是可能為null的,但貌似這種情況doReleaseShared的調用沒有意義。s.isShared的判斷主要是考慮到讀寫鎖的情況,在讀寫鎖的使用過程中,申請寫鎖(獨占模式)和申請讀鎖(共享模式)的線程可能同時存在,這個判斷發現后即線程是共享模式的時候,調用soReleaseShared方法喚醒他。

但總之,我們十分保守謹慎地調用了doReleaseShared方法試圖喚醒后繼線程:

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

又是一個自旋。我們首先獲取head節點h,然后檢查它的waitStatus是否為SIGNAL,如果是的話,調用CAS將h的waitStatus設置為0,並調用unparkSuccessor喚醒下一個等待線程。注意,這里調用CAS方法而不是直接賦值,是因為在共享模式下,這里可能發生競爭。doReleaseShared方法可能由head節點在使用完共享資源后主動調用(后續在releaseShared方法中可以看到),也可能由剛剛“上位”的等待線程調用,在上位之后,原來的head線程已被踢出隊列。

因此,doReleaseShared方法的執行情況變得比較復雜,需要細致分析。

第一種情況,只有剛剛釋放資源的head線程調用,這時候沒有競爭,waitStatus是SIGNAL,就去喚醒下個線程,是0,就重置為PROPAGATE。

第二種情況,剛剛釋放完資源的舊head,和剛剛上位的新head同時調用doReleaseShared方法,這時候最新的head獲取的都是自己,若干被踢出的舊head獲取的可能是舊head,也可能是新head,這些被踢出的舊head線程也在根據自己獲取的head(不管新舊)的狀態進行CAS操作和unparkSuccessor操作,幸運的是(必須幸運啊。。),這些操作不會造成錯誤,只是多了一些喚醒而已(這些喚醒可能導致一個線程獲得資源,也可能是一個“虛晃”)。

我們可以發現,不管head引用怎樣更迭,最終新head的waitStatus都會被順利處理。注意,可能有多個舊head同時參與這個過程,都不影響正確性。

我們注意到,一個新head,在他剛上位的時候有機會調用一次setHeadAndPropagate進而調用doReleaseShared,在他釋放資源之后,又一次調用doReleaseShared(這次是必然的)。第一次調用時,不管新head的waitStatus是0還是SIGNAL,最終狀態都被PROPAGATE(當然,被踢出隊列的head可能還沒來得及設置成PROPAGATE,但新上位的head最終會被設置),這也符合PROPAGATE的語義:使用在共享模式頭結點有可能處於這種狀態,表示鎖的下一次獲取可以無條件傳播。

還有一個問題,它是由SIGNAL-->0-->PROPAGATE變化而來的,為什么不是SIGNAL-->PROPAGA這樣直接變化呢?原因是unparkSuccessor方法會試圖將當前node的waitStatus復位成0,如果我們直接SIGNAL-->PROPAGA后,那么又被復位成0,還需要一次CAS操作置為PROPAGATE。

四、releaseShared 釋放鎖

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

我們可以看到,調用tryReleaseShared成功釋放共享資源之后,最終要再次調用doReleaseShared試圖喚醒后面的等待線程。

五、ConditionObject

關於Condition的內容請看筆者的另一篇博文Condition源碼學習筆記

 

至此,我們對獨占模式和共享模式下、不響應中斷的、沒有等待時間參數的獲取資源和釋放資源的流程有了初步了解。這時去看JUC包中的鎖的源碼,相信會有更深的理解。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM