全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(一)AQS基礎


AbstractQueuedSynchronizer(以下簡稱AQS)的內容確實有點多,博主考慮再三,還是決定把它拆成三期。原因有三,一是放入同一篇博客勢必影響閱讀體驗,而是為了表達對這個偉大基礎並發組件的崇敬之情。第三點其實是為了偷懶。
又扯這么多沒用的,還是直接步入正題吧~

AQS介紹

AQS是一個抽象類,它是實現多種並發同步工具的核心組件。比如大名鼎鼎的可重入鎖ReentrantLock),它的底層實現就是借助內部類Sync,而Sync類就是繼承了AQS並實現了AQS定義的若干鈎子方法。這些並發同步工具包括:

設計模式上來看,AQS主要使用的是模板方法模式(Template Method Pattern)。它提供了若干鈎子方法供子類實現(如tryAcquiretryRelease等),AQS的模板方法(如acquirerelease等)會調用這些鈎子方法。子類使用AQS的方式就是直接調用AQS的模板方法,並重寫這些模板方法涉及到的特定鈎子方法即可。不需要調用的鈎子方法可以不用重寫,AQS為它們均提供了默認實現拋出UnsupportedOperationException異常

此外,AQS也提供了其他一些方法供子類調用,如getStatehasQueuedPredecessors等方法,方便子類獲取、判斷同步器的狀態

什么是鈎子方法?
鈎子方法的概念源於模板方法模式,這種模式是在一個方法中定義了算法的骨架,某些關鍵步驟會交給子類去實現。模板方法在不改變算法本身結構的情況下,允許子類自定義其中一些關鍵步驟
這些關鍵步驟可以由父類定義成方法,這些方法可以是抽象方法,或鈎子方法

  • 抽象方法:父類定義但不實現,由abstract關鍵字標識
  • 鈎子方法:父類定義且實現,但這種實現一般都是空實現,並沒有任何意義,這么做只是為了方便子類根據需要重寫特定的鈎子方法,而不用實現所有的鈎子方法

AQS的核心思想:

  • 使用一個volatile int變量state(也被稱為資源),進行同步控制,但是state在不同的同步工具實現中具有不同的語義。另外配合Unsafe類提供的CAS操作,原子性地修改state值,保證其線程安全性
  • AQS內部維護了一個同步隊列,用來管理排隊的線程。另外需要借助LockSupport類提供的線程阻塞、喚醒方法
作者: 酒冽        出處: https://www.cnblogs.com/frankiedyz/p/15673957.html
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任

AQS的基本結構

狀態state

AQS使用volatile int變量state來作為核心狀態,所有的同步控制都是圍繞這個state來進行的,volatile保證其內存可見性,並使用CAS確保state的修改是原子的。volatile和CAS同時存在,就保證了state線程安全性

對於不同的同步工具實現來說,語義是不同的,如下:

  • ReentratntLock:表示當前線程獲取鎖的重入次數,0表示鎖空閑
  • ReentrantReadWriteLockstate的高16位表示讀鎖數量,低16位表示寫鎖數量
  • CountDownLatch:表示當前的計數值
  • Semaphore:表示當前可用信號量的個數

針對state這個核心狀態,AQS提供了getStatesetState等多個獲取、修改方法,源碼如下:

private volatile int state;

protected final int getState() {
    return state;
}

protected final void setState(int newState) {
    state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
作者: 酒冽        出處: https://www.cnblogs.com/frankiedyz/p/15673957.html
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任

同步隊列

Node類

AQS內部維護了一個同步隊列(網上有些文章會叫它為CLH隊列,至於為啥叫這個我也不知道-_-||,但不重要~)。隊列中的每個節點都是Node類型其源碼如下:

static final class Node {
    
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;
    volatile Node next;

    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

prevnext用於保存該節點的前驅、后繼節點,表明這個同步隊列是一個雙向隊列

Nodethread域保存了對應的線程,只有在創建時賦值,使用完要null掉,以方便GC

Node使用SHAREDEXCLUSIVE兩個常量來標記該線程是由於獲取共享資源、互斥資源失敗,而被阻塞並放入到同步隊列中進行等待

Node使用waitStatus來記錄當前線程的等待狀態,通過CAS進行修改。它的取值可以是:

  • CANCELLED:表示該節點由於超時中斷而被取消。該狀態不會再轉變為其他狀態,而且該節點的線程再也不會被阻塞
  • SIGNAL:表示其后繼節點(后面相鄰的那個節點)需要被喚醒,即該線程被釋放或被取消時,必須喚醒其后繼節點
  • CONDITION:表示該節點的線程在條件隊列中等待,而非在同步隊列中。如果該條件變量signal該節點后,該節點會被轉移到同步隊列中參與資源競爭
  • PROPAGATE:只有在共享模式下才會被用到,表示無條件傳播狀態。引入這個狀態是為了解決共享模式下並發釋放而引起的線程掛起的bug,這里不多解釋,網上有文章給出了更加詳細的解釋,見下方

AQS:為什么需要PROPAGATE?
AQS源碼深入分析之共享模式-你知道為什么AQS中要有PROPAGATE這個狀態嗎?

同步隊列的結構

AQS中維護了一個同步隊列,它通過兩個指針標記隊頭隊尾,分別是headtail,源碼如下:

private transient volatile Node head;

private transient volatile Node tail;

該隊列的出入規則遵循FIFO(First In, First Out)

注意:如果該同步隊列非空,那么head其實並不是指向第一個線程對應的Node,而是指向一個空的Node

接下來讓我們剖析一下AQS針對這個同步隊列設計的入隊、出隊算法

入隊算法

入隊事件主要在線程嘗試獲取資源失敗時觸發。當線程嘗試獲取資源失敗之后,會將該線程加入到同步隊列的隊尾

入隊算法的源碼見AQS的addWaiter方法,如下:

// mode可以是Node.EXCLUSIVE或Node.SHARED
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

首先為該線程創建一個Node節點,mode可以是Node.EXCLUSIVENode.SHARED,表示兩種不同的模式。
之后直接CAS試圖將其入隊。這里注意,如果隊列本身為空,或CAS競爭失敗,才會進入enq方法。這里addWaiter方法出於性能考慮,先嘗試快捷的入隊方式,不成功才執行enq方法

enq方法是完整的入隊邏輯,源碼如下:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 如果隊列為空,則將head和tail初始化為同一個空Node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {	// 不斷CAS直到成功為止
                t.next = node; 
                return t;
            }
        }
    }
}

enq中的代碼都包含在for循環中,如果CAS失敗,就會不斷循環CAS直到成功為止

注意,這段代碼也體現出同步隊列的三個特點

  • 入隊都是從隊尾
  • 進入隊列的操作都是CAS操作,保證了線程安全性
  • 如果隊列為空,則headtail都為null;如果不為空,head指向的節點並不是第一個線程對應的節點,而是一個啞節點

出隊算法

出隊事件主要發生在:位於同步隊列中的線程再次獲取資源,並成功獲得時

出隊算法在AQS中並沒有直接對應的方法,而是零散分布在某些方法中。因為獲取資源失敗而被阻塞的線程被喚醒后,會重新嘗試獲取資源。如果獲取成功,則會執行出隊邏輯

例如,在acquireQueued方法中,就包含了出隊事件:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
    }
}

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

出隊的邏輯體現在第6-9行,此時p指向head指向的空節點,而node是隊首元素(不是第一個空節點)
首先調用setHead方法,將head指向node、將nodethread域、prev域置空,然后將headnext域置空,以方便該節點的GC

節點的取消

線程會因為超時或中斷而被取消,之后不會再參與鎖的競爭,會等待GC

取消的過程見cancelAcquire方法,該方法的調用時機都是在獲取資源失敗之后,而失敗就是由於超時或中斷。其源碼如下:

private void cancelAcquire(Node node) {
    if (node == null)
        return;
    node.thread = null;    // 將thread域置空以方便GC

    // 向前遍歷並跳過被取消的Node
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    Node predNext = pred.next;
    node.waitStatus = Node.CANCELLED;

    // 如果是tail,那么將tail修改為pred
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            // 如果node的next需要signal,那么就將pred的next設為node的next
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

總之,cancelAcquire方法就是將目標節點nodethread域置空,並將waitStatus置為CANCELLED

這里有一個問題:node的后繼節點nextprev指針仍然指向node,沒有更新為pred,這不僅語義上是錯誤的,而且會阻礙node被GC。那么何時進行更新?
答:任何其他線程嘗試獲取鎖失敗之后,都會被放入同步隊列,然后調用shouldParkAfterFailedAcquire方法判斷是否應該被阻塞。如果發現當前節點的前驅節點被置為CANCELLED,就會執行:

do {
    node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);

此外,cancelAcquire方法也會做類似的操作,如下:

Node pred = node.prev;
while (pred.waitStatus > 0)
	node.prev = pred = pred.prev;

這兩處都會更新被取消節點的后繼節點的prev指針,所以前面說到的的問題根本不存在

注意:cancelAcquire的調用時機一般都是在獲取鎖邏輯后面的finally塊中,如果獲取失敗就會調用cancelAcquire方法。獲取失敗的原因主要有兩個,中斷或超時

總結:

  • 節點被取消的原因:獲取鎖超時或在獲取的過程中被中斷
  • 取消節點的主要邏輯:將其waitStatus修改為CANCELLED。再將節點thread域置空,將指向它的next指針指向其后繼節點,以方便GC

好了,到這里為止,我們就完成了對AQS基本結構的分析。這里如果有不懂的地方,可以暫時跳過,等看完后續博客再回頭看這篇,應該就能明白了
下一篇我們會逐步剖析AQS如何實現對資源的獲取和釋放,go go go!

全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(一)AQS基礎
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(二)資源的獲取和釋放
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(三)條件變量


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM