java基礎之AQS


Java開發中,我們的應用程序經常會使用多線程提高程序的運行效率,多線程情況下訪問線程共享變量可能會帶來並發問題,此時就需要並發鎖解決並發問題。Java提供了兩種類型的並發控制機制:synchonrized關鍵字和AQS框架,二者各有優勢,不過在加鎖解鎖場景比較靈活的情況下,我們往往會采用AQS框架來解決並發問題。本文會對Java中的AQS框架的結構和源碼進行簡單介紹。本文大多數內容參考了這篇博客

AQS結構

AQS的全稱是AbstractQueuedSynchronizer(抽象的隊列式的同步器),AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實現都依賴於它,如常用的ReentrantLock/Semaphore/CountDownLatch等。

如下圖所示,AQS主要包含兩部分內容:共享資源和等待隊列。AQS底層已經對這兩部分內容提供了很多方法。

  1. 共享資源:共享資源是一個volatile的int類型變量。
  2. 等待隊列:等待隊列是一個線程安全的隊列,當線程拿不到鎖時,會被park並放入隊列。
  3. 新線程:非公平情況下,新線程會先嘗試直接獲取資源,獲取不到才進入隊列。

AQS結構

核心思想

同步器的核心方法是acquire和release操作,其背后的思想也比較簡潔明確。

acquire操作是這樣的:

// acquire操作
while (當前同步器的狀態不允許獲取操作) {
    如果當前線程不在隊列中,則將其插入隊列
    阻塞當前線程
}
如果線程位於隊列中,則將其移出隊列

release操作是這樣的:

更新同步器的狀態
if (新的狀態允許某個被阻塞的線程獲取成功)
    解除隊列中一個或多個線程的阻塞狀態

從這兩個操作中的思想中我們可以提取出三大關鍵操作:同步器的狀態變更、線程阻塞和釋放、插入和移出隊列。所以為了實現這兩個操作,需要協調三大關鍵操作引申出來的三個基本組件:

  • 同步器狀態的原子性管理;
  • 線程阻塞與解除阻塞;
  • 隊列的管理;

同步器狀態的原子性管理

AQS類使用單個int(32位)來保存同步狀態,並暴露出getState、setState以及compareAndSet操作來讀取和更新這個同步狀態。其中屬性state被聲明為volatile,並且通過使用CAS指令來實現compareAndSetState,使得當且僅當同步狀態擁有一個一致的期望值的時候,才會被原子地設置成新值,這樣就達到了同步狀態的原子性管理,確保了同步狀態的原子性、可見性和有序性。

線程阻塞與解除阻塞

直到JSR166,阻塞線程和解除線程阻塞都是基於Java的內置管程,沒有其它非基於Java內置管程的API可以用來達到阻塞線程和解除線程阻塞。唯一可以選擇的是Thread.suspend和Thread.resume,但是它們都有無法解決的競態問題,所以也沒法用,目前該方法基本已被拋棄。具體不能用的原因可以官方給出的答復。

j.u.c.locks包提供了LockSupport類來解決這個問題。方法LockSupport.park阻塞當前線程直到有個LockSupport.unpark方法被調用。unpark的調用是沒有被計數的,因此在一個park調用前多次調用unpark方法只會解除一個park操作。另外,它們作用於每個線程而不是每個同步器。一個線程在一個新的同步器上調用park操作可能會立即返回,因為在此之前可以有多余的unpark操作。但是,在缺少一個unpark操作時,下一次調用park就會阻塞。雖然可以顯式地取消多余的unpark調用,但並不值得這樣做。在需要的時候多次調用park會更高效。park方法同樣支持可選的相對或絕對的超時設置,以及與JVM的Thread.interrupt結合 ,可通過中斷來unpark一個線程。

隊列的管理

整個框架的核心就是如何管理線程阻塞隊列,該隊列是嚴格的FIFO隊列,因此不支持線程優先級的同步。同步隊列的最佳選擇是自身沒有使用底層鎖來構造的非阻塞數據結構,業界主要有兩種選擇,一種是MCS鎖,另一種是CLH鎖。其中CLH一般用於自旋,但是相比MCS,CLH更容易實現取消和超時,所以同步隊列選擇了CLH作為實現的基礎。

CLH隊列實際並不那么像隊列,它的出隊和入隊與實際的業務使用場景密切相關。它是一個鏈表隊列,通過AQS的兩個字段head(頭節點)和tail(尾節點)來存取,這兩個字段是volatile類型,初始化的時候都指向了一個空節點。

條件隊列

上一節的隊列其實是AQS的同步隊列,這一節的隊列是條件隊列,隊列的管理除了有同步隊列,還有條件隊列。AQS只有一個同步隊列,但是可以有多個條件隊列。AQS框架提供了一個ConditionObject類,給維護獨占同步的類以及實現Lock接口的類使用。

ConditionObject類實現了Condition接口,Condition接口提供了類似Object管程式的方法,如await、signal和signalAll操作,還擴展了帶有超時、檢測和監控的方法。ConditionObject類有效地將條件與其它同步操作結合到了一起。該類只支持Java風格的管程訪問規則,這些規則中,當且僅當當前線程持有鎖且要操作的條件(condition)屬於該鎖時,條件操作才是合法的。這樣,一個ConditionObject關聯到一個ReentrantLock上就表現的跟內置的管程(通過Object.wait等)一樣了。兩者的不同僅僅在於方法的名稱、額外的功能以及用戶可以為每個鎖聲明多個條件。

ConditionObject類和AQS共用了內部節點,有自己單獨的條件隊列。signal操作是通過將節點從條件隊列轉移到同步隊列中來實現的,沒有必要在需要喚醒的線程重新獲取到鎖之前將其喚醒。

源碼分析

我們主要通過獨占式同步狀態的獲取和釋放、共享式同步狀態的獲取和釋放來看下AQS是如何實現的。

獨占式同步狀態的獲取

獨占式同步狀態調用的方法是acquire,代碼如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上述代碼主要完成了同步狀態獲取、節點構造、加入同步隊列以及在同步隊列中自旋等待的相關工作,其主要邏輯是:首先調用子類實現的tryAcquire方法,該方法保證線程安全的獲取同步狀態,如果同步狀態獲取失敗,則構造獨占式同步節點(同一時刻只能有一個線程成功獲取同步狀態)並通過addWaiter方法將該節點加入到同步隊列的尾部,最后調用acquireQueued方法,使得該節點以自旋的方式獲取同步狀態。如果獲取不到則阻塞節點中的線程,而被阻塞線程的喚醒主要依靠前驅節點的出隊或阻塞線程被中斷來實現。

下面來首先來看下節點構造和加入同步隊列是如何實現的。代碼如下:

private Node addWaiter(Node mode) {
        // 當前線程構造成Node節點
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 嘗試快速在尾節點后新增節點 提升算法效率 先將尾節點指向pred
        Node pred = tail;
        if (pred != null) {
            //尾節點不為空  當前線程節點的前驅節點指向尾節點
            node.prev = pred;
            //並發處理 尾節點有可能已經不是之前的節點 所以需要CAS更新
            if (compareAndSetTail(pred, node)) {
                //CAS更新成功 當前線程為尾節點 原先尾節點的后續節點就是當前節點
                pred.next = node;
                return node;
            }
        }
        //第一個入隊的節點或者是尾節點后續節點新增失敗時進入enq
        enq(node);
        return node;
    }
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //尾節點為空  第一次入隊  設置頭尾節點一致 同步隊列的初始化
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //所有的線程節點在構造完成第一個節點后 依次加入到同步隊列中
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

節點進入同步隊列之后,就進入了一個自旋的過程,每個線程節點都在自省地觀察,當條件滿足,獲取到了同步狀態,就可以從這個自旋過程中退出,否則依舊留在這個自旋過程中並會阻塞節點的線程,代碼如下:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //獲取當前線程節點的前驅節點
                final Node p = node.predecessor();
                //前驅節點為頭節點且成功獲取同步狀態
                if (p == head && tryAcquire(arg)) {
                    //設置當前節點為頭節點
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //是否阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

再來看看shouldParkAfterFailedAcquire和parkAndCheckInterrupt是怎么來阻塞當前線程的,代碼如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前驅節點的狀態決定后續節點的行為
     int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*前驅節點為-1 后續節點可以被阻塞
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*前驅節點是初始或者共享狀態就設置為-1 使后續節點阻塞
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
private final boolean parkAndCheckInterrupt() {
        //阻塞線程
        LockSupport.park(this);
        return Thread.interrupted();
    }

獨占式同步獲取鎖示例圖

獨占式同步獲取鎖示例圖

獨占式同步狀態的釋放

當同步狀態獲取成功之后,當前線程從acquire方法返回,對於鎖這種並發組件而言,就意味着當前線程獲取了鎖。有獲取同步狀態的方法,就存在其對應的釋放方法,該方法為release,現在來看下這個方法的實現,代碼如下:

public final boolean release(int arg) {
        if (tryRelease(arg)) {//同步狀態釋放成功
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //直接釋放頭節點
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*尋找符合條件的后續節點
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //喚醒后續節點
            LockSupport.unpark(s.thread);
    }

獨占式釋放是非常簡單而且明確的。

總結下獨占式同步狀態的獲取和釋放:在獲取同步狀態時,同步器維護一個同步隊列,獲取狀態失敗的線程都會被加入到隊列中並在隊列中進行自旋;移出隊列的條件是前驅節點為頭節點且成功獲取了同步狀態。在釋放同步狀態時,同步器調用tryRelease方法釋放同步狀態,然后喚醒頭節點的后繼節點。

共享式同步狀態的獲取

共享式同步狀態調用的方法是acquireShared,代碼如下:

public final void acquireShared(int arg) {
        //獲取同步狀態的返回值大於等於0時表示可以獲取同步狀態
        //小於0時表示可以獲取不到同步狀態  需要進入隊列等待
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
private void doAcquireShared(int arg) {
        //和獨占式一樣的入隊操作
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            //自旋
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //前驅結點為頭節點且成功獲取同步狀態 可退出自旋
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        //退出自旋的節點變成首節點
        setHead(node);
       
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

與獨占式一樣,共享式獲取也需要釋放同步狀態,通過調用releaseShared方法可以釋放同步狀態,代碼如下:

public final boolean releaseShared(int arg) {
        //釋放同步狀態
        if (tryReleaseShared(arg)) {
            //喚醒后續等待的節點
            doReleaseShared();
            return true;
        }
        return false;
    }
private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        //自旋
    for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //喚醒后續節點
            unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

unparkSuccessor方法和獨占式是一樣的。

AQS的應用

AQS被大量的應用在了同步工具上。

ReentrantLock:ReentrantLock類使用AQS同步狀態來保存鎖重復持有的次數。當鎖被一個線程獲取時,ReentrantLock也會記錄下當前獲得鎖的線程標識,以便檢查是否是重復獲取,以及當錯誤的線程試圖進行解鎖操作時檢測是否存在非法狀態異常。ReentrantLock也使用了AQS提供的ConditionObject,還向外暴露了其它監控和監測相關的方法。

ReentrantReadWriteLock:ReentrantReadWriteLock類使用AQS同步狀態中的16位來保存寫鎖持有的次數,剩下的16位用來保存讀鎖的持有次數。WriteLock的構建方式同ReentrantLock。ReadLock則通過使用acquireShared方法來支持同時允許多個讀線程。

Semaphore:Semaphore類(信號量)使用AQS同步狀態來保存信號量的當前計數。它里面定義的acquireShared方法會減少計數,或當計數為非正值時阻塞線程;tryRelease方法會增加計數,在計數為正值時還要解除線程的阻塞。

CountDownLatch:CountDownLatch類使用AQS同步狀態來表示計數。當該計數為0時,所有的acquire操作(對應到CountDownLatch中就是await方法)才能通過。

FutureTask:FutureTask類使用AQS同步狀態來表示某個異步計算任務的運行狀態(初始化、運行中、被取消和完成)。設置(FutureTask的set方法)或取消(FutureTask的cancel方法)一個FutureTask時會調用AQS的release操作,等待計算結果的線程的阻塞解除是通過AQS的acquire操作實現的。

SynchronousQueues:SynchronousQueues類使用了內部的等待節點,這些節點可以用於協調生產者和消費者。同時,它使用AQS同步狀態來控制當某個消費者消費當前一項時,允許一個生產者繼續生產,反之亦然。

除了這些j.u.c提供的工具,還可以基於AQS自定義符合自己需求的同步器。

我是御狐神,歡迎大家關注我的微信公眾號

qrcode_for_gh_83670e17bbd7_344-2021-09-04-10-55-16

本文最先發布至微信公眾號,版權所有,禁止轉載!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM