AbstractQueuedSynchronizer 詳解


一、AQS的概念及使用

  Java並發編程核心在於 java.concurrent.util 包而juc當中的大多數同步器實現都是圍繞着共同的基礎行為,比如等待隊列、條件隊列、獨占獲取、共享獲取等,而這個行為的抽象就是基於 AbstractQueuedSynchronizer 簡稱AQS,AQS定義了一套多線程訪問共享資源的同步器框架,是一個依賴狀態(state)的同步器。

  子類們必須定義改變state變量的protected方法,這些方法定義了state是如何被獲取或釋放的。鑒於此,本類中的其他方法執行所有的排隊和阻塞機制。子類也可以維護其他的state變量,但是為了保證同步,必須原子地操作這些變量

  AbstractQueuedSynchronizer會把所有的請求線程構成一個CLH隊列,當一個線程執行完畢(lock.unlock())時會激活自己的后繼節點,但正在執行的線程並不在隊列中,而那些等待執行的線程全部處於阻塞狀態。

  AQS是一個同步器,設計模式是模板模式。核心數據結構:雙向鏈表 + state(鎖狀態);底層操作:CAS

 Java.concurrent.util當中同步器的實現如Lock,Latch,Barrier 等,都是基於AQS框架實現 ;

  • 一般通過定義內部類Sync繼承AQS ;
  • 將同步器所有調用都映射到Sync對應的方法; 

ReentrantLock主要方法:

  • lock()獲得鎖
  • lockInterruptibly()獲得鎖,但優先響應中斷
  • tryLock()嘗試獲得鎖,成功返回true,否則false,該方法不等待,立即返回
  • tryLock(long time,TimeUnit unit)在給定時間內嘗試獲得鎖
  • unlock()釋放鎖

Condition:await()、signal()方法分別對應之前的Object的wait()和notify()

  • 和重入鎖一起使用
  • await()是當前線程等待同時釋放鎖
  • awaitUninterruptibly()不會在等待過程中響應中斷
  • signal()用於喚醒一個在等待的線程,還有對應的singalAll()方法

二、AQS源碼分析

我們以 ReentrantLock(獨占鎖)作為切入點來學習 AbstractQueuedSynchronizer;

1、ReentrantLock 和 AbstractQueuedSynchronizer 的部分代碼

ReentrantLock的內部類 Sync 繼承了 AbstractQueuedSynchronizer 類, Sync的兩個子類又實現了非公平鎖和公平鎖;

class ReentrantLock { class Sync extends AbstractQueuedSynchronizer{ abstract void lock(); //加鎖, 子類需求重寫該方法
 } //非公平鎖, 默認
    class NonfairSync extends Sync { //省略 lock()的實現
 } //公平鎖
    static final class FairSync extends Sync { //省略 lock()的實現
 } }

 

 AQS源碼中的屬性:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer { private transient volatile Node head; //頭結點 //阻塞的尾節點,每個新的節點進來,都插入到最后,也就形成了一個鏈表
    private transient volatile Node tail; //代表當前鎖的狀態,0代表沒有被占用,大於0代表有線程持有當前鎖,這個值可以大於1,是因為鎖可以重入,每次重入都加1 private volatile int state; 
//AbstractOwnableSynchronizer(AQS的父類)的屬性,代表當前持有獨占鎖的線程,舉個最重要的使用例子,因為鎖可以重入 private transient Thread exclusiveOwnerThread; }

AQS源碼中的內部 Node:

static final class Node { static final Node SHARED = new Node();    //標記節點為共享模式
    static final Node EXCLUSIVE = null;       //標記節點為獨占模式
    
    /* 下面的4個常量是給waitStatus用的 */
    static final int CANCELLED = 1static final int SIGNAL = -1static final int CONDITION = -2static final int PROPAGATE = -3; /** * 標記當前節點的信號量狀態 (1,0,-1,-2,-3)5種狀態 * 使用CAS更改狀態,volatile保證線程可見性,高並發場景下, * 即被一個線程修改后,狀態會立馬讓其他線程可見。 */
    volatile int waitStatus; volatile Node prev;    //前驅節點,當前節點加入到同步隊列中被設置
    volatile Node next;    //后繼節點
    volatile Thread thread;  //節點同步狀態的線程 }

2、鎖實現(加鎖 Lock.lock())

非公平鎖:無論CLH隊列中是否有節點,當前線程都要和隊列頭的節點去競爭一下鎖;若競爭到鎖,則該線程去持有鎖;若沒有競爭到鎖,則放入到CLH隊列尾部;

公平鎖無論CLH隊列中是否有節點,當前線程都是去放到隊列的尾部;

2.1 非公平鎖實現

static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0, 1)) //用CAS算法嘗試獲取鎖
            setExclusiveOwnerThread(Thread.currentThread());  //當前線程占用鎖
        else acquire(1); } }

(1)若沒有加鎖(state == 0),則直接讓當前線程占有鎖;

(2)若已經加鎖了(state > 0),則執行 AbstractQueuedSynchronizer.acquire(int arg)方法,代碼如下;

public final void acquire(int arg) { if (!tryAcquire(arg) && //嘗試獲取鎖
 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
<1> 使用 NonfairSync.tryAcquire(arg)方法去嘗試獲取鎖,它調用了 Sync.nonfairTryAcquire(int acquires) 方法;
<2> 若嘗試獲取鎖失敗,則調用 AbstractQueuedSynchronizer.addWaiter(Node.EXLUSIVE) 方法將當前線程放入到CLH隊列;
<3>
AbstractQueuedSynchronizer.acquireQueued()方法把已經追加到隊列的線程節點(addWaiter方法返回值)進行阻塞;

Sync.nonfairTryAcquire
該方法主要是去嘗試獲取鎖(加鎖)
final boolean nonfairTryAcquire(int acquires) { //acquires = 1
    final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { //unsafe操作,cas修改state狀態
            setExclusiveOwnerThread(current); //獨占狀態鎖持有者指向當前線程
            return true; } } else if (current == getExclusiveOwnerThread()) { //state狀態不為0,鎖持有者是當前線程,則state+1
        int nextc = c + acquires; if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; //加鎖失敗
}

a. 首先判斷當前狀態,若 c==0 說明沒有線程占用該鎖,並在占用鎖成功之后將鎖指向當前線程;

b. 如果 c != 0 說明有線程正擁有了該鎖,而且若占用該鎖就是當前線程(鎖重入),則將 state 加 1;這段的代碼只是簡單地++acquires,並修改status值,是因為沒有競爭獲取鎖的本身就是當前線程,所以通過setStatus修改state,而非CAS。

 

AbstractQueuedSynchronizer.addWaiter

addWaiter方法負責把當前無法獲得鎖的線程包裝為一個Node節點添加到隊尾:

private Node addWaiter(Node mode) { // 將當前線程構建成Node類型
    Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) {   //若當前尾節點不是null
        node.prev = pred; //將當前節點的pre指向tail節點
        if (compareAndSetTail(pred, node)) { // CAS將節點插入同步隊列的尾部
            pred.next = node; return node; } } enq(node); //將節點加入CLH同步隊列
    return node; }

其中參數mode是獨占鎖還是共享鎖,默認為Node.EXCLUSIVE(null,獨占鎖。追加到隊尾的動作分兩步:

 <1> 如果當前隊尾已經存在(tail!=null),則使用CAS把當前線程更新為Tail

 <2> 如果當前Tail為 null 或則線程調用CAS設置隊尾失敗,則通過enq方法繼續設置Tail

enq方法如下:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize //隊列為空需要初始化,創建空的頭節點
            if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; //set尾部節點
            if (compareAndSetTail(t, node)) {//當前節點置為尾部
                t.next = node; //前驅節點的next指針指向當前節點
                return t; } } } }

  該方法就是循環調用CAS,即使有高並發的場景,無限循環將會最終成功把當前線程追加到隊尾(或設置隊頭)。總而言之,addWaiter的目的就是通過CAS把當前現在追加到隊尾,並返回包裝后的Node實例。

把線程要包裝為Node對象的主要原因,除了用Node構造供虛擬隊列外,還用Node包裝了各種線程狀態;

 

AbstractQueuedSynchronizer.acquireQueued

acquireQueued的主要作用是把已經追加到隊列的線程節點(addWaiter方法返回值)進行阻塞,但阻塞前又通過 tryAccquire 重試是否能獲得鎖,如果重試成功能則無需阻塞,這就是非公平鎖。

//已經在隊列當中的Thread節點,准備阻塞等待獲取鎖
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//死循環
            final Node p = node.predecessor();//找到當前結點的前驅結點
            if (p == head && tryAcquire(arg)) {// 如果前驅結點是頭結點,才tryAcquire,其他結點是沒有機會tryAcquire的。
                setHead(node);//獲取同步狀態成功,將當前結點設置為頭結點。
                p.next = null; // help GC
                failed = false; return interrupted; } /** * 如果前驅節點不是Head,通過shouldParkAfterFailedAcquire判斷是否應該阻塞 * 前驅節點信號量為-1,當前線程可以安全被parkAndCheckInterrupt用來阻塞線程 */
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //阻塞線程  interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

仔細看看這個方法是個無限循環,感覺如果 p == head && tryAcquire(arg)條件不滿足循環將永遠無法結束,當然不會出現死循環,奧秘在於第12行的parkAndCheckInterrupt會把當前線程掛起,從而阻塞住線程的調用棧

程序執行到 ① 之后會就會阻塞當前的線程T1;若這個T1線程是放在CLH 隊列頭的,當有其他線程將鎖釋放之后會去喚醒這個線程T1;線程T1會繼續自旋,執行到 處,會再次去嘗試獲取鎖;


下面是 shouldParkAfterFailedAcquire方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //若前驅結點的狀態是SIGNAL,意味着當前結點可以被安全地park
        return true; if (ws > 0) { // 前驅節點狀態如果被取消狀態,將被移除出隊列
        do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* 當前驅節點waitStatus為 0 or PROPAGATE狀態時, * 將其設置為SIGNAL狀態,然后當前結點才可以可以被安全地park */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

檢查原則:

規則1:如果前繼的節點狀態為SIGNAL,表明當前節點需要unpark,則返回成功,此時acquireQueued方法的第12行(parkAndCheckInterrupt)將導致線程阻塞

規則2:如果前繼節點狀態為CANCELLED(ws>0),說明前置節點已經被放棄,則回溯到一個非取消的前繼節點,返回false,acquireQueued方法的無限循環將遞歸調用該方法,直至規則1返回true,導致線程阻塞

規則3:如果前繼節點狀態為非SIGNAL、非CANCELLED,則設置前繼的狀態為SIGNAL,返回false后進入acquireQueued的無限循環,與規則2同

總體看來,shouldParkAfterFailedAcquire就是靠前繼節點判斷當前線程是否應該被阻塞,如果前繼節點處於CANCELLED狀態,則順便刪除這些節點重新構造隊列。

 

3、解鎖( Lock.unlock() )

解鎖代碼相對簡單,主要體現在AbstractQueuedSynchronizer.release 和 Sync.tryRelease方法中:

AbstractQueuedSynchronizer.release方法

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }

release的語義在於:如果可以釋放鎖,則喚醒隊列第一個線程(Head);

Sync.tryRelease 方法

protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }

tryRelease語義很明確:如果線程多次鎖定,則進行多次釋放,直至status==0則真正釋放鎖,所謂釋放鎖即設置status為0,因為無競爭所以沒有使用CAS。

AbstractQueuedSynchronizer.unparkSuccessor 方法

用於喚醒隊列中的第一個線程(head)

private void unparkSuccessor(Node node) { //獲取wait狀態
    int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);// 將等待狀態waitStatus設置為初始值0

    /** 若后繼結點為空,或狀態為CANCEL(已失效),則從后尾部往前遍歷找到最前的一個處於正常阻塞狀態的結點進行喚醒 */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//喚醒線程
}

 

I.  若有T1、T2兩個線程的時候,所以有一個線程要會放入到隊列中,CLH會創建一個節點(pre=null,thread=null,next=下級節點),head 和 tail 都指向該節點;

II.  AQS的喚醒不會喚醒隊列中的所有節點,而是依次去喚醒的;notify 和 notifyall 不能去指定線程喚醒的;

III. AQS中的線程阻塞是使用 Unsafe 魔術類當中的 park() 和 unpark() 做的;

 參考:https://www.jianshu.com/p/279baac48960


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM