並發編程的基石——AQS類



本博客系列是學習並發編程過程中的記錄總結。由於文章比較多,寫的時間也比較散,所以我整理了個目錄貼(傳送門),方便查閱。

並發編程系列博客傳送門


本文參考了[Java多線程進階(六)—— J.U.C之locks框架:AQS綜述(1)]和Java技術之AQS詳解兩篇文章。

AQS 簡介

AbstractQueuedSynchronizer (簡稱AQS)類是整個 JUC包的核心類。JUC 中的ReentrantLockReentrantReadWriteLock CountDownLatchSemaphoreLimitLatch等同步工具都是基於AQS實現的。

AQS 分離出了構建同步器時的通用關注點,這些關注點主要包括如下:

  • 資源是可以被同時訪問?還是在同一時間只能被一個線程訪問?(共享/獨占功能)
  • 訪問資源的線程如何進行並發管理?(等待隊列)
  • 如果線程等不及資源了,如何從等待隊列退出?(超時/中斷)

這些關注點都是圍繞着資源——同步狀態(synchronization state)來展開的,AQS將這些通用的關注點封裝成了一個個模板方法,讓子類可以直接使用。

AQS 留給用戶的只有兩個問題

  • 什么是資源
  • 什么情況下資源是可以被訪問的

這樣一來,定義同步器的難度就大大降低了。用戶只要解決好上面兩個問題,就能構建出一個性能優秀的同步器。

下面是幾個常見的同步器對資源的定義:

同步器 資源的定義
ReentrantLock 資源表示獨占鎖。State為0表示鎖可用;為1表示被占用;為N表示重入的次數
ReentrantReadWriteLock 資源表示共享的讀鎖和獨占的寫鎖。state邏輯上被分成兩個16位的unsigned short,分別記錄讀鎖被多少線程使用和寫鎖被重入的次數。
CountDownLatch 資源表示倒數計數器。State為0表示計數器歸零,所有線程都可以訪問資源;為N表示計數器未歸零,所有線程都需要阻塞。
Semaphore 資源表示信號量或者令牌。State≤0表示沒有令牌可用,所有線程都需要阻塞;大於0表示由令牌可用,線程每獲取一個令牌,State減1,線程沒釋放一個令牌,State加1。

AQS 原理

上面一節中介紹到 AQS 抽象出了三個關注點,下面就具體看下 AQS 是如果解決這三個問題的。

同步狀態的管理

同步狀態,其實就是資源。AQS使用單個int(32位)來保存同步狀態,並暴露出getState、setState以及compareAndSetState操作來讀取和更新這個狀態。

private volatile int state;
  
protected final int getState() {
    return state;
}

protected final void setState(int newState) {
    state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

線程的阻塞和喚醒

在JDK1.5之前,除了內置的監視器機制外,沒有其它方法可以安全且便捷得阻塞和喚醒當前線程。

JDK1.5以后,java.util.concurrent.locks包提供了LockSupport類來作為線程阻塞和喚醒的工具。

等待隊列

等待隊列,是AQS框架的核心,整個框架的關鍵其實就是如何在並發狀態下管理被阻塞的線程。

等待隊列是嚴格的FIFO隊列,是Craig,Landin和Hagersten鎖(CLH鎖)的一種變種,采用雙向循環鏈表實現,因此也叫CLH隊列。

1. 節點定義

CLH隊列中的結點是對線程的包裝,結點一共有兩種類型:獨占(EXCLUSIVE)和共享(SHARED)。

每種類型的結點都有一些狀態,其中獨占結點使用其中的CANCELLED(1)、SIGNAL(-1)、CONDITION(-2),共享結點使用其中的CANCELLED(1)、SIGNAL(-1)、PROPAGATE(-3)。

結點狀態 描述
CANCELLED 1 取消。表示后驅結點被中斷或超時,需要移出隊列
SIGNAL -1 發信號。表示后驅結點被阻塞了(當前結點在入隊后、阻塞前,應確保將其prev結點類型改為SIGNAL,以便prev結點取消或釋放時將當前結點喚醒。)
CONDITION -2 Condition專用。表示當前結點在Condition隊列中,因為等待某個條件而被阻塞了
PROPAGATE -3 傳播。適用於共享模式(比如連續的讀操作結點可以依次進入臨界區,設為PROPAGATE有助於實現這種迭代操作。)
INITIAL 0 默認。新結點會處於這種狀態

AQS使用CLH隊列實現線程的結構管理,而CLH結構正是用前一結點某一屬性表示當前結點的狀態,之所以這種做是因為在雙向鏈表的結構下,這樣更容易實現取消和超時功能。

next指針:用於維護隊列順序,當臨界區的資源被釋放時,頭結點通過next指針找到隊首結點。

prev指針:用於在結點(線程)被取消時,讓當前結點的前驅直接指向當前結點的后驅完成出隊動作。

static final class Node {
    
    // 共享模式結點
    static final Node SHARED = new Node();
    
    // 獨占模式結點
    static final Node EXCLUSIVE = null;

    static final int CANCELLED =  1;

    static final int SIGNAL    = -1;

    static final int CONDITION = -2;

    static final int PROPAGATE = -3;

    /**
    * INITAL:      0 - 默認,新結點會處於這種狀態。
    * CANCELLED:   1 - 取消,表示后續結點被中斷或超時,需要移出隊列;
    * SIGNAL:      -1- 發信號,表示后續結點被阻塞了;(當前結點在入隊后、阻塞前,應確保將其prev結點類型改為SIGNAL,以便prev結點取消或釋放時將當前結點喚醒。)
    * CONDITION:   -2- Condition專用,表示當前結點在Condition隊列中,因為等待某個條件而被阻塞了;
    * PROPAGATE:   -3- 傳播,適用於共享模式。(比如連續的讀操作結點可以依次進入臨界區,設為PROPAGATE有助於實現這種迭代操作。)
    * 
    * waitStatus表示的是后續結點狀態,這是因為AQS中使用CLH隊列實現線程的結構管理,而CLH結構正是用前一結點某一屬性表示當前結點的狀態,這樣更容易實現取消和超時功能。
    */
    volatile int waitStatus;

    // 前驅指針
    volatile Node prev;

    // 后驅指針
    volatile Node next;

    // 結點所包裝的線程
    volatile Thread thread;

    // Condition隊列使用,存儲condition隊列中的后繼節點
    Node nextWaiter;

    Node() {
    }

    Node(Thread thread, Node mode) { 
        this.nextWaiter = mode;
        this.thread = thread;
    }
}

2. 隊列定義

對於CLH隊列,當線程請求資源時,如果請求不到,會將線程包裝成結點,將其掛載在隊列尾部。

下面結合代碼一起看下節點進入隊列的過程。

   private Node enq(final Node node) {
        for (;;) {
            Node t = tail;   // 1
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))  // 2 
                    tail = head;
            } else {
                node.prev = t; // 3
                if (compareAndSetTail(t, node)) { // 4
                    t.next = node;
                    return t;
                }
            }
        }
    }

如上代碼在第一次循環中,當要在AQS隊列尾部插入元素時,AQS隊列狀態如下圖中(default)所示。也就是隊列頭、尾節點都指向null;當執行代碼(1)后節點t指向了尾部節點,這時候隊列狀態如圖中(I)所示。

這時候t為null,故執行代碼(2),使用CAS算法設置一個哨兵節點為頭節點,如果CAS設置成功,則讓尾部節點也指向哨兵節點,這時候隊列狀態如圖中(II)所示。

到現在為止只插入了一個哨兵節點,還需要插入node節點,所以在第二次循環后執行到代碼(1),這時候隊列狀態如圖中(III)所示;然后執行代碼(3)設置node的前驅節點為尾部節點,這時候隊列狀態如圖中(IV)所示;

然后通過CAS算法設置node節點為尾部節點,CAS成功后隊列狀態如圖中(V)所示;

CAS成功后再設置原來的尾部節點的后驅節點為node,這時候就完成了雙向鏈表的插入,此時隊列狀態如圖中(VI)所示。

AQS 的方法介紹

用戶需要自己重寫的方法

上面介紹到 AQS 已經幫用戶解決了同步器定義過程中的大部分問題,只將下面兩個問題丟給用戶解決:

  • 什么是資源
  • 什么情況下資源是可以被訪問的

具體的,AQS 是通過暴露以下 API 來讓用戶解決上面的問題的。

鈎子方法 描述
tryAcquire 獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease 獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
tryAcquireShared 共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
tryReleaseShared 共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回true,否則返回false。
isHeldExclusively 該線程是否正在獨占資源。只有用到condition才需要去實現它。

如果你需要實現一個自己的同步器,一般情況下只要繼承 AQS ,並重寫 AQS 中的這個幾個方法就行了。至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。要不怎么說Doug Lea貼心呢。

需要注意的是:如果你沒在子類中重寫這幾個方法就直接調用了,會直接拋出異常。所以,在你調用這些方法之前必須重寫他們。不使用的話可以不重寫。

AQS 提供的一系列模板方法

查看 AQS 的源碼我們就可以發現這個類提供了很多方法,看起來讓人“眼花繚亂”的。但是最主要的兩類方法就是獲取資源的方法和釋放資源的方法。因此我們抓住主要矛盾就行了:

  • public final void acquire(int arg) // 獨占模式的獲取資源
  • public final boolean release(int arg) // 獨占模式的釋放資源
  • public final void acquireShared(int arg) // 共享模式的獲取資源
  • public final boolean releaseShared(int arg) // 共享模式的釋放資源

acquire(int)方法

該方法以獨占方式獲取資源,如果獲取到資源,線程繼續往下執行,否則進入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響。該方法是獨占模式下線程獲取共享資源的頂層入口。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

下面分析下這個acquire方法的具體執行流程:

step1:首先這個方法調用了用戶自己實現的方法tryAcquire方法嘗試獲取資源,如果這個方法返回true,也就是表示獲取資源成功,那么整個acquire方法就執行結束了,線程繼續往下執行;

step2:如果tryAcquir方法返回false,也就表示嘗試獲取資源失敗。這時acquire方法會先調用addWaiter方法將當前線程封裝成Node類並加入一個FIFO的雙向隊列的尾部。

step3:再看acquireQueued這個關鍵方法。首先要注意的是這個方法中哪個無條件的for循環,這個for循環說明acquireQueued方法一直在自旋嘗試獲取資源。進入for循環后,首先判斷了當前節點的前繼節點是不是頭節點,如果是的話就再次嘗試獲取資源,獲取資源成功的話就直接返回false(表示未被中斷過)

假如還是沒有獲取資源成功,判斷是否需要讓當前節點進入waiting狀態,經過 shouldParkAfterFailedAcquire這個方法判斷,如果需要讓線程進入waiting狀態的話,就調用LockSupport的park方法讓線程進入waiting狀態。進入waiting狀態后,這線程等待被interupt或者unpark(在release操作中會進行這樣的操作,可以參見后面的代碼)。這個線程被喚醒后繼續執行for循環來嘗試獲取資源。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //首先判斷了當前節點的前繼節點是不是頭節點,如果是的話就再次嘗試獲取資源,
                //獲取資源成功的話就直接返回false(表示未被中斷過)
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //判斷是否需要讓當前節點進入waiting狀態
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 如果在整個等待過程中被中斷過,則返回true,否則返回false。
                    // 如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

以上就是acquire方法的簡單分析。

單獨看這個方法的話可能會不太清晰,結合ReentrantLockReentrantReadWriteLock CountDownLatchSemaphoreLimitLatch等同步工具看這個代碼的話就會好理解很多。

release(int)方法

release(int)方法是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//上面已經講過了,需要用戶自定義實現
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

private void unparkSuccessor(Node node) {
    /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

與acquire()方法中的tryAcquire()類似,tryRelease()方法也是需要獨占模式的自定義同步器去實現的。正常來說,tryRelease()都會成功的,因為這是獨占模式,該線程來釋放資源,那么它肯定已經拿到獨占資源了,直接減掉相應量的資源即可(state-=arg),也不需要考慮線程安全的問題。

但要注意它的返回值,上面已經提到了,release()是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!所以自義定同步器在實現時,如果已經徹底釋放資源(state=0),要返回true,否則返回false。

unparkSuccessor(Node)方法用於喚醒等待隊列中下一個線程。這里要注意的是,下一個線程並不一定是當前節點的next節點,而是下一個可以用來喚醒的線程,如果這個節點存在,調用unpark()方法喚醒

總之,release()是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。(需要注意的是隊列中被喚醒的線程不一定能立馬獲取資源,因為資源在釋放后可能立馬被其他線程(不是在隊列中等待的線程)搶掉了

acquireShared(int)方法

acquireShared(int)方法是共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源為止,整個過程忽略中斷。

public final void acquireShared(int arg) {
    //tryAcquireShared需要用戶自定義實現
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

可以發現,這個方法的關鍵實現其實是獲取資源失敗后,怎么管理線程。也就是doAcquireShared的邏輯。

//不響應中斷
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

可以看出,doAcquireShared的邏輯和acquireQueued的邏輯差不多。將當前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應量的資源后才返回。

簡單總結下acquireShared的流程:

step1:tryAcquireShared()嘗試獲取資源,成功則直接返回;

step2:失敗則通過doAcquireShared()進入等待隊列park(),直到被unpark()/interrupt()並成功獲取到資源才返回。整個等待過程也是忽略中斷的。

releaseShared(int)方法

releaseShared(int)方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列里的其他線程來獲取資源。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

釋放掉資源后,喚醒后繼。跟獨占模式下的release()相似,但有一點稍微需要注意:獨占模式下的tryRelease()在完全釋放掉資源(state=0)后,才會返回true去喚醒其他線程,這主要是基於獨占下可重入的考量;而共享模式下的releaseShared()則沒有這種要求,共享模式實質就是控制一定量的線程並發執行,那么擁有資源的線程在釋放掉部分資源時就可以喚醒后繼等待結點。

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM