全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(二)資源的獲取和釋放


上期的《全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(一)AQS基礎》中介紹了什么是AQS,以及AQS的基本結構。有了這些概念做鋪墊之后,我們就可以正式地看看AQS是如何通過state(以下也稱資源)和同步隊列,實現線程之間的同步功能了
那么線程之間是如何同步呢?其實就是通過資源的獲取和釋放來進行同步。如果獲取到就繼續運行,獲取不到就放入同步隊列阻塞等待,釋放就是交出獲得的資源,並釋放同步隊列中需要被喚醒的線程。對,就是這么簡單!
本篇我們繼續深入AQS內部,一起來看看線程是怎么利用AQS來獲取、釋放資源的~

獲取資源

AQS獲取資源是通過各種acquire方法。不同acquire方法之間存在區別,如下:

  • acquire:以互斥模式獲取資源,忽略中斷
  • acquireInterruptibly:以互斥模式獲取資源,響應中斷
  • acquireShared:以共享模式獲取資源,忽略中斷
  • acquireSharedInterruptibly:以共享模式獲取資源,響應中斷

獲取互斥資源

忽略中斷的acquire方法

acquire方法是獲取互斥資源,忽略中斷。如果獲取成功,直接返回,否則該線程會進入同步隊列阻塞等待。源碼如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire是一個模板方法,定義為final方法防止子類重寫。其中的鈎子方法tryAcquire需要子類去實現。
如果tryAcquire返回true,說明嘗試獲取成功,直接返回即可。如果tryAcquire返回false,說明嘗試獲取失敗,會調用addWaiter方法進入等待隊列。該方法的解析見上一篇博客全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(一)AQS基礎
執行完addWaiter方法后,該線程就處於同步隊列中了(queued),接下來就會調用acquireQueued方法

acquireQueued方法為一個已經位於同步隊列的線程,以互斥模式獲取資源,不響應中斷但是會記錄中斷狀態。源碼如下:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();		// 獲取node的前一個節點
            if (p == head && tryAcquire(arg)) {		// 如果p是head,說明node是隊列頭,可以競爭資源
                setHead(node);				// 將node出隊
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued方法代碼主要都包含在一個for循環中。如果發現node是隊首節點,就會再次嘗試獲取資源。如果此時獲取成功,就直接出隊並返回,不用阻塞等待,這里體現了同步隊列先進先出的特點
如果不是隊首節點,或者是再次嘗試獲取資源又雙叒叕失敗了,則調用shouldParkAfterFailedAcquire方法判斷當前線程是否應該被阻塞

shouldParkAfterFailedAcquire方法會檢查當前線程是否應該被阻塞,如果是就返回true,否則返回false。其源碼如下:

// 調用此方法必須保證pred是node的直接前驅,即node.prev == pred
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
        * This node has already set status asking a release
        * to signal it, so it can safely park.
        */
        return true;
    if (ws > 0) {
        // 如果前面的Node都被cancel了,那么就跳過這些Node
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
        * waitStatus must be 0 or PROPAGATE.  Indicate that we
        * need a signal, but don't park yet.  Caller will need to
        * retry to make sure it cannot acquire before parking.
        */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

只有當node的直接前驅節點等待狀態waitStatusSIGNAL時,才會認為該線程應該被阻塞。否則還需要回到acquireQueuedfor循環中重新檢查,不會立即阻塞

我畫了一張shouldParkAfterFailedAcquire執行流程圖,如下:

那么會不會有一種可能:shouldParkAfterFailedAcquire方法一直返回false,始終認為該線程不應該阻塞,那么該線程就會一直占用CPU資源,“忙等”
其實一般來說是不會的,原因見上面示意圖中的紫色文字部分

再回到acquireQueued方法中,如果shouldParkAfterFailedAcquire判斷該線程,並返回了true,就需要執行parkAndCheckInterrupt將該線程阻塞,源碼如下:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

parkAndCheckInterrupt中借助了工具類LockSuppport將線程阻塞。阻塞過程中如果該線程被設置了中斷狀態,雖然中斷不會導致阻塞立即被喚醒,但是線程的中斷狀態會被記錄下來,並作為該方法的返回值

總體來說,acquireQueued方法的執行流程如下圖所示:

再回到acquire方法中。如果acquire失敗而阻塞等待的過程中被中斷,那么等它被喚醒並成功獲得資源之后,會立即調用setInterrupt方法設置線程的中斷狀態。setInterrupt的源碼如下:

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

最后補充一點,acquire方法除了會在線程獲取互斥資源時被調用,也會被條件等待方法await方法調用,具體分析見本系列最后一期博客全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(三)條件變量

響應中斷的acquireInterruptibly方法

acquireInterruptibly用於獲取互斥資源。顧名思義,這個方法響應中斷,即如果在調用過程中發生了中斷,會拋出中斷異常中止資源的獲取。其源碼如下:

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

acquireInterruptibly方法首先會檢查中斷狀態,如果沒有發生中斷,才會繼續向下執行,否則拋出中斷異常
接下來執行鈎子方法tryAcquire,如果獲取成功則直接返回,否則獲取失敗,執行doAcquireInterruptibly方法:

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireInterruptibly會先調用addWaiter方法,將當前線程加入隊尾。之后的邏輯和acquireQueued類似,就是在for循環中,先判斷當前節點是否是頭節點,如果是則再次嘗試獲取資源。如果不是隊首或者獲取失敗,則調用shouldParkAfterFailedAcquire方法判斷該線程是否應該被阻塞。如果不是就進入下一輪循環。如果需要被阻塞,則調用parkAndCheckInterrupt方法將其阻塞。如果阻塞過程中發生中斷,則當該線程被喚醒后回到doAcquireInterruptibly中,會拋出中斷異常,並調用cancelAcquire執行取消節點的邏輯

doAcquireInterruptiblyacquireQueued的區別有兩點

  • acquireQueued調用之前,當前線程就已經被放入同步隊列;而doAcquireInterruptibly沒有,需要自己調用addWaiter方法
  • acquireQueued中不會因發生中斷而拋出中斷異常、取消節點,只會記錄是否發生中斷並返回;而doAcquireInterruptibly會響應中斷,拋出中斷異常,並取消該線程對應的節點
作者: 酒冽        出處: https://www.cnblogs.com/frankiedyz/p/15674098.html
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任

獲取共享資源

忽略中斷的acquireShared方法

acquireShared是以共享模式獲取資源,並且忽略中斷。源碼如下:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

該方法首先會調用鈎子方法tryAcquireShared嘗試獲取共享資源,如果獲取成功則直接返回,否則獲取失敗,調用doAcquireShared方法:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {						// 表示tryAcquireShared獲取成功
                    
                    // 設置head,並判斷是否需要喚醒后繼線程。如果需要則喚醒,並保證傳播
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

這里也會調用addWaiter將當前線程加入同步隊列,不過這里的Node是共享模式(Node.SHARED
在接下來的for循環中,如果當前線程位於隊首,則再次嘗試獲取資源。如果獲取成功,則調用setHeadAndPropagate方法,處理中斷之后返回

其中setHeadAndPropagate方法的作用是彈出隊頭,並檢測其后繼節點是否需要被喚醒,如果需要的話就喚醒,並確保傳播。源碼如下;

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    // 這個復雜的if條件判斷就是用於判斷:后繼節點的線程是否要被喚醒
    // propagate > 0 表示允許后續節點繼續獲取共享資源
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();		// 喚醒后繼的共享模式的線程,並確保狀態傳播下去
    }
}

在共享模式下,一個線程獲取資源成功后,可能會引起后繼等待獲取共享資源的線程。注意,這里是后繼而非同步隊列中所有后面的。在這一點上,不同於互斥資源的獲取,共享資源的獲取更像是一人得道,雞犬升天

如果在setHeadAndPropagate中發現存在后繼線程需要被釋放,則調用doReleaseShared方法將它釋放,並確保傳播,它也是releaseShared方法的核心,該方法會在后面講解釋放共享資源時給出解析,這里暫時不分析

確保傳播的含義:
保證被喚醒的線程可以繼續喚醒它的后繼線程。如果每個線程都能確保傳播,那么所有應該被釋放的后繼線程都能得到釋放(類似於遞歸釋放

總的來說,acquireShared的流程與acquire基本一致,最大的區別在於:獲取共享資源成功后,可能需要喚醒后繼的多個線程。而獲取互斥資源成功后,不需要喚醒其他任何線程

響應中斷的acquireSharedInterruptibly方法

acquireSharedInterruptibly方法用於獲取共享資源,但是該方法會響應中斷,即在獲取過程中接收到中斷信號,會拋出中斷異常。其源碼如下:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

acquireInterruptibly一樣,acquireSharedInterruptibly也會先檢查線程的中斷狀態是否已經被設置。如果設置則直接拋出中斷異常
接下來會調用鈎子方法tryAcquireShared嘗試獲取共享資源,獲取成功則直接返回,獲取失敗就會調用doAcquireSharedInterruptibly方法:

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

不多解釋,直接上圖吧!下面是doAcquireSharedInterruptibly方法的執行流程圖

doAcquireSharedInterruptibly方法和doAcquireShared方法大體上差不多,區別僅在於前者響應中斷並會拋出中斷異常,而后者忽略中斷,只記錄中斷狀態並返回

作者: 酒冽        出處: https://www.cnblogs.com/frankiedyz/p/15674098.html
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任

釋放資源

AQS釋放資源是通過各種release方法。不同release之間存在區別,如下:

  • release:以獨占模式釋放對象
  • releaseShared:以共享模式釋放對象

這些釋放資源的方法都不存在響應中斷的區別,都是忽略中斷的,因為線程在釋放資源的時候被中斷可能引起意外的錯誤

釋放互斥資源

AQS使用release方法釋放互斥資源,源碼如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

該方法會先調用鈎子方法tryRelease,如果釋放失敗則直接返回false,如果釋放成功,則調用unparkSuccessor方法喚醒隊首線程,並返回true

unparkSuccessor方法是喚醒線程的主要邏輯。源碼如下:

private void unparkSuccessor(Node node) {
    
    // 如果status < 0(表明可能需要signal),先清除狀態(設為0)
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 一般來說后繼需要unpark的節點就是next節點
    // 但是如果next被cancel或為null,則需要從后向前遍歷,直到找到有效的后繼節點
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

該方法的作用是喚醒node有效后繼節點。有效指的是跳過那些被cancel的節點。 由於同步隊列是FIFO的,所以node一定是head

作者: 酒冽        出處: https://www.cnblogs.com/frankiedyz/p/15674098.html
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任

釋放共享資源

releaseShared用於釋放共享資源,源碼如下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

該方法首先調用鈎子方法tryReleaseShared嘗試釋放資源,如果失敗則直接返回false,如果成功則執行doReleaseShared方法喚醒后繼的其他共享模式線程同時確保傳播,最后返回true

doReleaseShared方法在前面的acquireShared -> setHeadAndPropagate中出現過,該方法的作用是在共享模式下喚醒后繼線程,並確保傳播。其源碼如下:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 如果需要,則喚醒后繼線程,同時設置waitStatus為0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            	// loop to recheck cases
                unparkSuccessor(h);		// 喚醒后繼線程
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))		// 設置PROPAGATE狀態,保證喚醒可以傳播下去
                continue;                // loop on failed CAS
        }
        // 如果上述的執行過程沒有被別的線程打擾,那就退出,否則重新loop
        if (h == head)                   // loop if head changed
            break;
    }
}
作者: 酒冽        出處: https://www.cnblogs.com/frankiedyz/p/15674098.html
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任

AQS的應用

AQS的應用就不用我多吹了吧,那些個JUC里面的大名鼎鼎的可重入鎖、讀寫鎖,底層實現都是基於AQS

如果想要自己使用AQS實現某個並發工具,也很簡單,只需要繼承AQS,並實現一些特定方法即可~

繼承AQS的注意點

  • 如果要使用AQS中的互斥資源同步方法,需要手動實現tryAcquiretryRelease方法
  • 如果要使用AQS中的共享資源同步方法,需要手動實現tryAcquireSharedtryReleaseShared方法
  • 如果要使用AQS中的條件變量,需要實現isHeldExclusively方法

應用實踐:實現非可重入鎖

非可重入鎖NonReentrantLock定義了一個內部工具類Sync實現關於鎖的操作,而Sync則繼承了AQS。實現的代碼如下:

點擊查看代碼
public class NonReentrantLock implements Lock {

    private final Sync sync = new Sync();

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            if (arg != 1) {
                throw new IllegalArgumentException();
            }
            if (compareAndSetState(0, 1)) {
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (arg != 1) {
                throw new IllegalArgumentException();
            }
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }
    
    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

最后來做個總結

AQS針對互斥資源、共享資源的獲取和釋放,提供了不同的方法。而獲取資源的方法也可以分為響應中斷和忽略中斷,釋放資源都是忽略中斷的

AQS正是通過資源state)的釋放和獲取,配合同步隊列讓線程排隊等待,以FIFO的方式讓競爭資源失敗的線程阻塞、喚醒

這些釋放、獲取方法都是AQS提供給子類去調用的模板方法,其中的一些關鍵步驟均設計為了鈎子方法,讓子類可以個性化定制

正是有了AQS這個強大的后盾,才能誕生出那么多實用的並發同步工具類。不得不說,AQS是真的

好了,能看到這里的讀者,相信已經掌握了AQS的基本結構,以及AQS是獲取、釋放資源的原理
我這里其實並沒有剖析所有AQS提供的資源獲取方法,還有兩個可超時方法tryAcquireNanostryAcquireSharedNanos沒有分析,但是基本上和其他獲取資源方法是類似的,只是多了一個超時而取消的邏輯,感興趣的讀者可以打開AQS源碼自己分析
接下來的就是AQS的最后一篇了,我們來看看AQS里面的條件隊列是怎么實現的

全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(一)AQS基礎
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(二)資源的獲取和釋放
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(三)條件變量


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM