AQS詳解


AQS詳解

AQS:提供原子式管理同步狀態,阻塞和喚醒線程功能以及隊列模型。

ReentrantLock

特性

  • 為可重入鎖,一個線程能夠對一個臨界資源重復加鎖。
  • 通過AQS實現鎖機制。
  • 支持響應中斷,超時和嘗試獲取鎖。
  • 必須使用unlock()釋放鎖。
  • 有公平鎖和非公平鎖。
  • 可以關聯多個條件隊列。

加鎖

非公平鎖:

  1. 若通過AQS設置變量state(同步狀態)成功,即獲取鎖成功,則將當前下線程設置為獨占線程。
  2. 若獲取失敗,則進入acquire()方法進行后續處理。

公平鎖:

  1. 進入acquire()方法進行后續處理。

AQS

核心思想

  • 若請求的共享資源空閑,則將當前請求的線程設置為有效的工作線程,並將共享資源設置為鎖定狀態。
  • 若共享資源被占用,則需要阻塞等待喚醒機制保證鎖的分配。

實現

  • 通過CLH隊列的變體:FIFO雙向隊列實現的。
  • 每個請求資源的線程被包裝成一個節點來實現鎖的分配。
  • 通過volatileint類型的成員變量state表示同步狀態。
  • 通過FIFO隊列完成資源獲取的排隊工作。
  • 通過CAS完成對state的修改。

節點Node

方法和屬性

方法或屬性 含義
waitStatus 當前節點在隊列中的狀態
thread 節點對應的線程
prev 前驅指針
next 后繼指針
predecessor 返回前驅節點
nextWaiter 指向下一個CONDITION狀態節點

waitStatus狀態:

  • 0:Node初始化后的默認值。
  • CANCELLED:為1,線程獲取鎖的請求已被取消。
  • CONDITION:為-2,節點在等待隊列中,等待喚醒。
  • PROPAGATE:為-3,線程處於SHARED狀態下使用。
  • SIGNAL:為-1,線程已准備,等待資源釋放。

線程的鎖模式:

  • SHARED:共享模式等待鎖。
  • EXCLUSIVE:獨占模式等待鎖。

state與鎖模式

獨占模式:

  • 初始化state=0
  • 試圖獲取同步狀態:若state為0,則設置為1,獲取鎖,進行后續操作。
  • state非0,則當前線程阻塞。

共享模式:

  • 初始化state=n(表示最多n個線程並發)
  • 試圖獲取同步狀態:若state大於0,則使用CAS對state進行自減操作,進行后續操作。
  • 若不大於0,則當前線程阻塞。

重寫AQS實現的方法

方法 描述
boolean isHeldExclusively() 該線程是否正在獨占資源
boolean tryAcquire(int arg) 獨占試圖獲取鎖,arg為獲取鎖的次數,獲取成功則返回true
boolean tryRelease(int arg) 獨占方式試圖釋放鎖,arg為釋放的鎖的次數,成功則返回true
int tryAcquireShared(int arg) 共享方式獲取鎖,負數則失敗,0表示成功,但沒有剩余可用資源,正數表示成功,有剩余資源
boolean tryReleaseShared(int arg) 共享方式釋放鎖,允許喚醒后續等待節點並返回true

注:
一般為獨占或共享方式,也可同時實現獨占和共享(ReentrantReadWriteLock)

ReentrantLock非公平鎖lock()方法執行流程:

lock

加鎖流程:

  1. 通過ReentrantLock的加鎖方法lock()進行加鎖。
  2. 調用內部類Synclock()方法,由於是抽象方法,則由ReentrantLock初始化選擇公平鎖和非公平鎖,執行相關內部類的lock()方法,從而執行AQS的acquire()方法。
  3. AQS的acquire()方法會執行tryAcquire()方法,由於tryAcquire()需要自定義,則會執行ReentrantLock中的tryAcquire()方法,根據是公平鎖還是非公平鎖,執行不同的tryAcquire()
  4. tryAcquire()為獲取鎖,若獲取失敗,則執行AQS后續策略。

解鎖流程:

  1. 通過ReentrantLock的解鎖方法unlock()進行解鎖。
  2. unlock()調用內部類Syncrelease()方法。
  3. release()會調用tryRelease()方法,其由ReentrantLock中的Sync實現。
  4. 釋放成功,其余由AQS進行處理。

從ReentrantLock到AQS

ReentrantLock中的lock()

// ReentrantLock
final void lock() {
    if (compareAndSetState(0, 1)) // CAS設置state
        setExclusiveOwnerThread(Thread.currentThread()); // 設置成功則當前線程獨占資源
    else
        acquire(1); // 設置失敗則后續處理
}

ReentrantLock中,lock()的實現邏輯為:

  • 試圖CAS設置狀態為1。
    • 若設置成功,則設置當前線程獨占資源。
    • 若設置失敗,則通過acquire()方法進一步處理。

acquire()方法實現:

  • 試圖獲取訪問tryAcquire(),若成功,則獲取鎖成功。
  • 若失敗,則加入等待隊列。
// AbstractQueuedSynchronizer
public final void acquire(int arg) {
    // 先嘗試獲取,獲取成功則獨占資源,否則進入等待隊列
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

線程加入等待隊列

通過addWaiter(Node.EXCLUSIVE)將當前線程加入等待隊列。

加入流程:

  1. 由當前線程構造一個節點。
  2. 若等待隊列不為空時,則設置當前節點為隊列尾節點。
  3. 若隊列為空或者失敗時,則重復嘗試將該節點加入到隊列成為尾節點。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); // 當前線程構造一個節點
    if (pred != null) { // 設置當前節點尾隊列的尾節點
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node); // 節點加入隊列失敗,則循環嘗試加入隊列,直到成功
    return node;
}

// 隊列為空或加入隊列失敗,則循環嘗試加入隊列,直到成功
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 若隊列為空,則當前節點設為頭節點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t; // 若隊列非空,則當前節點加入隊列為尾節點
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

注:

  • 若隊列為空,則構造一個空節點作為頭節點,然后將當前線程構造的節點加入作為尾節點。
  • 判斷等待隊列是否有有效節點:
    • 若頭節點等於尾節點,則返回false(沒有有效節點,當前節點可以爭奪共享資源).
    • 若不等於,則判斷頭節點的下一個節點是否不為null並且是否等於當前節點,若兩個條件均滿足,則返回false.
    • 否則返回true(隊列中有有效節點,當前線程進入隊列等待)

線程出隊列

  1. 獲取當前的前一個節點.
  2. 若前一個節點為頭節點head並且當前節點獲取鎖成功,則將當前節點設為head結點,當前線程執行后續操作.
  3. 若前一個節點非頭節點head或者當前結點獲取鎖失敗,則阻塞當前線程,等待被喚醒.
  4. 被喚醒后,循環重復上面步驟,直到成功獲取鎖.
  5. 若線程被中斷,則跳出循環,檢查是否獲取成功.成功則執行后續代碼,否則取消當前線程的獲取請求.
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor(); // 獲取前一個節點
            if (p == head && tryAcquire(arg)) { // 若前一個節點為head,則試圖獲取,若獲取成功,則當前線程設為head
                setHead(node);
                p.next = null;
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) // 若前一個節點非head或獲取失敗,則阻塞當前線程,直到其被喚醒
                interrupted = true;
        }
    } finally {
        if (failed) // 若獲取失敗,取消當前線程的請求
            cancelAcquire(node);
    }
}

獲取失敗后進行阻塞檢查:

  • 若前一個節點處於喚醒狀態,則當前線程被阻塞.
  • 若前一個節點不處於阻塞狀態,則向前查找節點.
    • 若找到一個處於喚醒狀態的節點,則當前線程阻塞.
    • 若沒有喚醒狀態的結點,則設置當前結點喚醒,當前線程將循環試圖獲取鎖.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) // 當前線程的前一個線程處於喚醒狀態,當前線程阻塞
        return true;
    if (ws > 0) { // 前一個線程的請求被取消,則刪除向前查找,將所有取消的線程從隊列刪除
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else { // 當前線程設置為喚醒狀態
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

// 阻塞當前線程,返回當前線程的中斷狀態
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

線程移出隊列的流程:

移出步驟1
移出步驟2

取消線程請求

流程:

  1. 獲取當前節點的前驅節點.
  2. 若前驅節點的狀態為CANCELLED,則一直向前遍歷,找到第一個非CANCELED節點,設置當前節點為CANCELLED.
    • 若當前節點為尾節點:則設置前驅結點指向的下一個節點指針為null.
    • 若當前節點為head節點的后繼結點,則設置當前節點的下一個節點指針為null.
    • 若當前節點非尾節點並且非head節點的下一個節點,則設置當前結點的下一個結點指向當前結點的下一個節點(從而從前向后遍歷時跳過被CANCELLED的結點)

為什么只對next指針操作,而不對prev指針操作?
修改prev指針,可能導致prev指向一個已經被移出隊列的節點,存在安全問題.
shouldParkAfterFailedAcquire()方法內,會處理prev指針,使得CANCELLED的節點從隊列中刪除.

ReentrantLock的unlock()

解鎖流程:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread()) // 若解鎖的線程非占有資源的線程,則拋出異常
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { // 若持有的線程全部釋放,則占有資源的線程設為null,更新狀態state
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

// 當前線程釋放鎖
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0) // 若隊列非空或者當前線程不處於喚醒狀態,喚醒當前線程后面的一個線程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    // node為當前線程的后一個等待線程
    int ws = node.waitStatus;
    if (ws < 0) // 若后一個等待線程未被取消,則設置其狀態為可獲取鎖
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) { // 若當前線程下一個線程為null或者被取消
        s = null;
        // 從后向前查找第一個沒有被取消的等待線程,將其喚醒
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

為什么從后往前查找可喚醒的線程?
新節點入隊列時,是先將其prev指針指向隊尾節點,在將尾節點的next指向新節點,若從前向后查找可喚醒的線程,在這兩個步驟之間發生的查找線程操作會忽略新節點.
同時,在產生CANCELLED節點時,也是先斷開next指針,而prev指針.只有從后向前遍歷才能遍歷完全部的節點.

獲取鎖之后還要進行中斷響應:

  • 線程在等待資源后被喚醒,喚醒后不斷嘗試獲得鎖,直到搶到鎖為止.整個流程不會響應中斷,直到搶到鎖后檢查是否被中斷,若被中斷,則補充一次中斷.

小結

  1. 線程獲取鎖失敗后怎么樣?
    在等待隊列中等待,並繼續嘗試獲得鎖.
  2. 排隊隊列的數據類型?
    CLH變體的FIFO雙向隊列.
  3. 排隊的線程什么時候有機會獲得鎖?
    當前面的線程釋放鎖時,會喚醒后面等待的線程.
  4. 若等待的線程一直無法獲得鎖,需要一直等待嗎?
    線程對應的節點被設為CANCELLED狀態,並被清除出隊列.
  5. lock()方法通過acquire()方法進行加鎖,如何進行加鎖?
    acquire()調用tryAcquire()進行加鎖,具體由自定義同步器實現.

AQS應用

核心:

  1. state初始化為0,表示沒有任何線程持有鎖.
  2. 當有線程持有鎖時,state在原來值上加1,同一個線程多個獲得鎖,則多次加1.
  3. 如線程釋放鎖,則state減1,直到為0,表示線程釋放鎖.
同步工具 特點
ReentantLock 使用state保存鎖重復持有的次數,多次獲得鎖時其值遞增
Semaphore 使用AQS同步狀態保存信號量的當前計數,tryRelease增加計數,acquireShared減少計數
CountDownLatch 通過AQS同步狀態計數,計數為0,所有的acquire操作才可以通過
ReentrantReadWriteLock AQS同步狀態的16位保存寫鎖持有次數,剩下16位用於保存讀鎖持有次數
ThreadPoolExecutor 利用AQS同步狀態實現獨占線程變量設置

參考:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM