鎖原理 - AQS 源碼分析:有了 synchronized 為什么還要重復造輪子


鎖原理 - AQS 源碼分析:有了 synchronized 為什么還要重復造輪子

並發編程之美系列目錄:https://www.cnblogs.com/binarylei/p/9569428.html

在前兩篇文章中,我們主要是分析在並發編程問題上,計算機硬件、操作系統和編程語言分別提供了那些支持:

  1. 計算機硬件: CAS 原子操作是基於已有的狀態更新成另一個狀態,如 count++ 就是基於原有的 count 值進行更新。有了這個狀態,也就可以實現更上層的信號量和鎖。CAS 最底層其實也是使用內存屏障。
  2. 操作系統:主要是信號量和鎖。將共享變量 S 的 PV 操作封裝起來,我們就可以基於這個共享變量 S 實現線程的 "等待-通知" 機制,這就是信號量。其中鎖是信號量為 1 的特殊場景。
  3. 編程語言:在信號量的基礎是封裝條件變量,這就是管程。管程解決了並發編程領域的兩個核心問題:互斥和同步。因此,JDK 也選擇使用管程實現 AQS 和 synchronized。

有了這些基礎,我們繼續分析一下 AbstractQueuedSynchronizer(簡稱 AQS) 的源碼實現。AQS 涉及以下主要幾個知識點:

  1. 為什么需要 AQS:Java 已經在語言層次提供 synchronized 鎖,為什么要在 SDK 層次提供 AQS 鎖?
  2. AQS 實現原理:管程在 Java 中的應用?
  3. AQS 可見性問題:AQS 是 Java SDK 層次提供的鎖,它是如何保證可見性的 - volatile 內存語義(JSR-133)?
  4. CLH 隊列鎖:如何使用鏈表實現 CAS 原子性操作?

1. 為什么需要 AQS

性能是否可以成為“重復造輪子”的理由呢?Java1.5 中 synchronized 性能不如 AQS,但 1.6 之后,synchronized 做了很多優化,將性能追了上來。顯然性能不能重復造輪子的理由,因為性能問題優化一下就可以了,完全沒必要“重復造輪子”。

在前面在介紹死鎖問題的時候,我們知道可以通過破壞死鎖產生的條件從而避免死鎖,但這個方案 synchronized 沒有辦法解決。原因是 synchronized 申請資源的時候,如果申請不到,線程直接進入阻塞狀態,也釋放不了線程已經占有的資源。我們需要新的方案解決這問題。

如果我們重新設計一把互斥鎖去解決這個問題,那該怎么設計呢?AQS 提供了以下方案:

  1. 能夠響應中斷。synchronized 一旦進入阻塞狀態,就無法被中斷。但如果阻塞狀態的線程能夠響應中斷信號,能夠被喚醒。這樣就破壞了不可搶占條件了。
  2. 支持超時。如果線程在一段時間之內沒有獲取到鎖,不是進入阻塞狀態,而是返回一個錯誤,那這個線程也有機會釋放曾經持有的鎖。這樣也能破壞不可搶占條件。
  3. 非阻塞地獲取鎖。如果嘗試獲取鎖失敗,並不進入阻塞狀態,而是直接返回,那這個線程也有機會釋放曾經持有的鎖。這樣也能破壞不可搶占條件。

這三種方案可以全面彌補 synchronized 的問題。這三個方案就是“重復造輪子”的主要原因,體現在 API 上,就是 Lock 接口的三個方法。詳情如下:

// 支持中斷的API
void lockInterruptibly() throws InterruptedException;
// 支持超時的API
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 支持非阻塞獲取鎖的API
boolean tryLock();

2. AQS 必備知識

2.1 AQS 實現原理:管程

推薦文章:鎖原理 - 信號量 vs 管程:JDK 為什么選擇管程?

並發編程的兩大核心問題:一是互斥,即同一時刻只允許一個線程訪問共享資源;二是同步,即線程之間的 "wait-notify" 機制。管程能夠解決這兩大問題。Java SDK 並發包通過 Lock 和 Condition 兩個接口來實現管程,其中 Lock 用於解決互斥問題,Condition 用於解決同步問題。

2.2 AQS 可見性問題:volatile

你可能認為,鎖的可見性不是顯而易見的問題嗎?還真沒這么簡單,這涉及到 JMM 提供的 happens-before 原則。

  1. 順序性規則:單線程中,每個操作 happens-before 於任意后續操作。
  2. synchronized 規則:對一個鎖的解鎖,happens-before 於隨后對這個鎖的加鎖。
  3. volatile 規則:對一個 volatile 域的寫,happens-before 於任意后續對這個 volatile 域的讀。
  4. 傳遞性規則:如果A happens-before B,且B happens-before C,那么A happens-before C。

synchronized 之所以能夠保證有序性,是因為滿足了 synchronized 規則,那 AQS 又滿足了那條規則呢?如果我們仔細分析一下 JUC 並發工具類,可以發現一個通用化的實現模式:

  1. 聲明共享變量為 volatile。
  2. 使用 CAS 的原子條件更新來實現線程之間的同步。
  3. 配合以 volatile 的讀/寫和 CAS 所具有的 volatile 讀和寫的內存語義來實現線程之間的通信。

說明: AQS 就是根據上述模式實現的,在 lock.lock() 獲取鎖時,會讀取 volatile 變量,同時 lock.unlock() 釋放鎖時,會修改 volatile 變量。這樣滿足了 volatile 規則,所以 AQS 能夠保證可見性。

volatile 最開始只能保證可見性,禁止指定重排這個語義是在 JSR-133 加強的。禁止指定重排指,對一個 volatile 變量的讀,總是能看到(任意線程)對這個 volatile 變量最后的寫入。這樣,volatile 讀其實和 lock.lock() 具有相同的語義,volatile 寫具有 lock.unlock() 語義。

2.3 CLH 隊列鎖

CLH 隊列鎖是一種利用 CAS 實現的無鎖隊列。

  1. 為什么選擇鏈表。二叉樹相對鏈表的操作要復雜很多,需要左旋右旋來保持樹的平衡,也是說二叉樹需要鎖住很多結點才行。但鏈表非常簡單,通常只需要操作一個結點即可。
  2. 插入:和普通插入一樣,一次 CAS 即可。
  3. 刪除:和普通刪除不一樣,node 結點刪除時,如果直接設置 pre.next = next,可能有結點正在插入到 node.next,這樣會造成數據不安全。既然一次 CAS 不行,那就兩次 CAS:第一次是邏輯刪除,先標記 node 已經刪除;第二次是物理刪除,真正從鏈表上刪除結點。

AQS 中的同步隊列在 CLH 的基礎上做了改進。CLH 是單身鏈表,但 AQS 使用雙向鏈表,但要注意的是目前並不存在雙向鏈表的原子性算法,AQS 也保證 node.prev 域的原子性,並不能保證 node.next 域的原子性。如果通過 tail 反向遍歷可以查找到所有的結點,但從 head 正向遍歷則不一定了,node.next 域只是起輔助作用。

3. AQS 源碼分析 - Lock

AQS 包含 Lock 和 Condition 兩個接口來實現管程,其中 Lock 用於解決互斥問題,Condition 用於解決同步問題。

3.1 鎖狀態

AQS 使用共享變量 state 來管理鎖的狀態,很顯然 state 必須使用 volatile 修辭,AQS 提供的如下三個方法來訪問或修改同步狀態:

  • getState():獲取當前同步狀態。
  • setState(int newState):設置當前同步狀態。
  • compareAndSetState(int expect, int update):使用 CAS 設置當前狀態,該方法能夠保證狀態
    設置的原子性。

思考1:state 為什么要提供 setState 和 compareAndSetState 兩種修改狀態的方法?

這個問題,關鍵是修改狀態時是否存在數據競爭,如果有則必須使用 compareAndSetState。

  • lock.lock() 獲取鎖時會發生數據競爭,必須使用 CAS 來保障線程安全,也就是 compareAndSetState 方法。
  • lock.unlock() 釋放鎖時,線程已經獲取到鎖,沒有數據競爭,也就可以直接使用 setState 修改鎖的狀態。

3.2 同步隊列

同步隊列(syncQueue)結構

說明: 首先,需要注意的是:如果沒有鎖競爭,線程可以直接獲取到鎖,就不會進入同步隊列。也就說,沒有鎖競爭時,同步隊列(syncQueue)是空的,當存在鎖競爭時,線程會進入到同步隊列中。一旦進入到同步隊列中,就會有線程切換。

同步隊列特點:

  • 同步隊列頭結點是哨兵結點,表示獲取鎖對應的線程結點。
  • 當獲取鎖時,其前驅結點必定為頭結點。獲取鎖后,需要將頭結點指向當前線程對應的結點。
  • 當釋放鎖時,需要通過 unparkSuccessor 方法喚醒頭結點的后繼結點。

標准的 CHL 無鎖隊列是單向鏈表,同步隊列(syncQueue) 在 CHL 基礎上做了改進:

  1. 同步隊列是雙向鏈表。事實上,和二叉樹一樣,雙向鏈表目前也沒有無鎖算法的實現。雙向鏈表需要同時設置前驅和后繼結點,這兩次操作只能保證一個是原子性的。
  2. node.pre 一定可以遍歷所有結點,是線程安全的,而后繼結點 node.next 則是線程不安全的。也就是說,node.pre 一定可以遍歷整個鏈表,而 node.next 則不一定。至於為什么選擇前驅結點而不是后繼結點,會在 "第五部分 - AQS 無鎖隊列" 中進一步分析。

3.3 線程狀態

volatile int waitStatus; // 結點狀態
volatile Node prev;      // 同步隊列:互斥等待隊列 Lock
volatile Node next;      // 同步隊列
volatile Thread thread;  // 阻塞的線程
Node nextWaiter;         // 等待隊列:條件等待 Condition

Node 結點是對每一個等待獲取資源的線程的封裝,其包含了需要同步的線程本身及其等待狀態,如是否被阻塞、是否等待喚醒、是否已經被取消等。變量 waitStatus 則表示當前 Node 結點的等待狀態,共有 5 種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、INITIAL。

  • CANCELLED(1):表示當前結點已取消調度。因為超時或者中斷,結點會被設置為取消狀態,進入該狀態后的結點將不會再變化。注意,只有 CANCELLED 是正值,因此正值表示結點已被取消,而負值表示有效等待狀態。
  • SIGNAL(-1):表示后繼結點在等待當前結點喚醒。后繼結點入隊時,會將前繼結點的狀態更新為 SIGNAL。
  • CONDITION(-2):表示結點等待在Condition上,當其他線程調用了Condition的signal()方法后,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
  • PROPAGATE(-3):共享模式下,前繼結點不僅會喚醒其后繼結點,同時也可能會喚醒后繼的后繼結點。
  • INITIAL(0):新結點入隊時的默認狀態。
Node 結點的主要狀態變化過程

說明: 大致可以分為兩種場景,Lock 和 Condition 兩種。

  1. Lock 對應的狀態變化:
    • lock.lock() 獲取鎖時:
      • 如果線程能獲取到鎖,就不會進行等待隊列,進而也就不會有之后的各種狀態變化。
      • 如果不能獲取到鎖,此時就會進行同步隊列(syncQueue),進行同步隊列后會前其驅結點的狀態改為 SIGNAL。注意,是修改前驅結點的狀態為 SIGNAL,表示需要喚醒后繼結點。
      • 當然,在其前驅結點的狀態改為 SIGNAL 前,線程可能就被中斷、超時、喚醒。此時,會直接修改當前結點的狀態為 CANCELLED。
    • lock.unlock() 釋放鎖時:
      • 釋放鎖時,需要通過 unparkSuccessor 方法喚醒后繼結點。喚醒后繼結點后,會將 head 指針移動到該后繼結點,也就刪除頭結點。
  2. Condition 對應的狀態變化:
    • condition.await 進行等待隊列時:首先,線程會釋放鎖並喚醒后繼結點。然后,將當前線程進入到等待隊列(watiQueue)中,同時結點的狀態變成 CONDITION。
    • condition.signal 喚醒等待線程:首先,將結點的狀態修改為 INITIAL,如果失敗則說明結點已經取消,不需要處理,繼續輪詢下一個結點。然后,將該結點的前驅節點狀態修改為 SIGNAL,否則直接喚醒該線程。

AQS 支持阻塞、響應中斷、鎖超時、非阻塞獲取鎖四種場景,我們就以最常用的阻塞方式獲取鎖為例。

與獲取鎖有關的方法如下:

  • acquire:lock.lock() 方法用於獲取鎖。
  • tryAcquire:具體獲取鎖的策略,由子類實現。
  • addWaiter:通過 enq 方法添加到同步隊列中。需要注意的是 addWaiter 方法會嘗試一次添加到同步隊列中,如果不成功,再調用 enq 自旋添加到同步隊列中。
  • acquireQueued:線程進入同步隊列后,會將該線程掛起,直到有甚至線程喚醒該線程。
  • shouldParkAfterFailedAcquire:將前驅結點的狀態修改成 SIGNAL,同時會清理已經 CANCELLED 的結點。注意,只有前驅結點的狀態為 SIGNAL,當它釋放鎖時才會喚醒后繼結點。
  • parkAndCheckInterrupt:掛起線程,並判斷線程在自旋過程中,是否被中斷過。

與釋放鎖有關的方法如下:

  • release:lock.unlock() 方法用於釋放鎖。
  • tryRelease:具體釋放鎖的策略,由子類實現。
  • unparkSuccessor:喚醒后繼結點。

3.2 acquire

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

說明: tryAcquire 嘗試獲取鎖,如果成功,就不用進入同步隊列。否則,就需要通過 acquireQueued 進入等待隊列。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {   // 自旋修改前驅結點的狀態為SIGNAL,然后掛起線程,直到被喚醒和搶占鎖成功
            final Node p = node.predecessor();     // p表示前驅結點
            if (p == head && tryAcquire(arg)) {    // 1. 搶占鎖成功,喚醒該線程
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && // 2. 自旋修改前驅結點的狀態為SIGNAL
                parkAndCheckInterrupt())                 // 3. 掛起當前線程,並判斷是否被中斷
                interrupted = true;
        }
    } finally {
        if (failed)   // 異常則將結點狀態設置為CANCELLED
            cancelAcquire(node);
    }
}

說明: acquireQueued 進入同步隊列中,直到線程被喚醒。

  1. 修改前驅結點狀態為 SIGNAL:只有前驅結點的狀態為 SIGNAL 時,才能喚醒后繼結點。shouldParkAfterFailedAcquire 修改前驅結點狀態成功返回 true,否則不斷嘗試。
  2. 掛起當前線程:parkAndCheckInterrupt 方法通過 LockSupport.park 掛起當前線程,並返回線程是否被中斷,也就是可以響應中斷操作。
  3. 線程被喚醒:當其它線程釋放鎖時,會喚醒后繼結點。如果這個線程的前驅結點是頭結點,並搶占鎖成功,就會被喚醒,否則繼續被掛起。

3.3 release

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

說明: 釋放鎖相比獲取鎖要簡單一些,因為此時線程已經獲取到鎖,可以不使用 CAS 原子性操作。

private void unparkSuccessor(Node node) {
  
    // 1. node表示需要刪除的結點,將其狀態重新設置為INITIAL。允許失敗?
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 2. 這一段比較難理解,涉及到同步隊列的線程安全問題,目前就記住一點就可以:
    //    node.prev是線程安全的,而node.next則不是線程安全的
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 3. 喚醒同步線程,當然這個線程不一定能搶占到鎖。比如非公平鎖
    if (s != null)
        LockSupport.unpark(s.thread);
}

說明: unparkSuccessor 方法用於喚醒 node 的后繼結點,有幾個小細節需要關注一下:

  1. node 表示頭結點,也就是當前獲取鎖的線程。
  2. 雖然,通過 LockSupport.unpark 喚醒了后繼結點,但該線程不一定能爭搶到鎖。
  3. 一旦后繼結點爭搶到鎖,就會向頭指針向后移動。

4. AQS 源碼分析 - Condition

管程的兩個主要功能,我們已經分析了 Lock 是如何解決互斥問題,下面再看一下 Condition 是如何解決同步問題。條件同步 Condition 的典型用法如下:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

// 等待
public void conditionWait() throws InterruptedException {
    lock.lock();
    try {
        while(條件不滿足)
            condition.await();
    } finally {
        lock.unlock();
    }
}

// 通知
public void conditionSignal() throws InterruptedException {
    lock.lock();
    try {
        condition.signalAll();
    } finally {
        lock.unlock();
    }
}

4.1 等待隊列

Condition 等待隊列(waitQueue)要比 Lock 同步隊列(syncQueue)簡單很多,最重要的原因是 waitQueue 的操作都是在獲取鎖的線程中執行,不存在數據競爭的問題。

Condition 等待隊列結構

ConditionObject 重要的方法說明:

  • await:阻塞線程並放棄鎖,加入到等待隊列中。
  • signal:喚醒等待線程,沒有特殊的要求,盡量使用 signalAll。
  • addConditionWaiter:將結點(狀態為 CONDITION)添加到等待隊列 waitQueue 中,不存在鎖競爭。
  • fullyRelease:釋放鎖,並喚醒后繼等待線程。
  • isOnSyncQueue:根據結點是否在同步隊列上,判斷等待線程是否已經被喚醒。
  • acquireQueued:Lock 接口中的方法,通過同步隊列方法競爭鎖。
  • unlinkCancelledWaiters:清理取消等待的線程。

4.2 await

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();           // 1. 添加到等待隊列中
    int savedState = fullyRelease(node);        // 2. 釋放鎖
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {              // 3. 判斷線程是否在同步隊列中
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 4. 重新競爭鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

說明: 條件同步調用 await 方法后主要完成以下幾件事。

  1. 添加到等待隊列:將該線程添加到等待隊列后,初始狀態為 CONDITION。
  2. 釋放鎖:調用 unparkSuccessor 喚醒后繼結點。
  3. 阻塞:如果調用 signal 喚醒等待線程,該線程就會從等待隊列移動到同步隊列。isOnSyncQueue 判斷該結點是否已經在 syncQueue 中。
  4. 重新競爭鎖:acquireQueued 在分析 Lock 接口時已經分析過,重新競爭鎖。

4.3 signal

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&     // 如果喚醒失敗,就一直向下喚醒
             (first = firstWaiter) != null);
}

說明: doSignal 喚醒等待的線程,transferForSignal 都是真正喚醒等待線程的方法。如果該線程已經被喚醒或取消,則繼續喚醒下一個線程。

final boolean transferForSignal(Node node) {
    // 1. 結點的狀態必須是CONDITION。如果是其它狀態,則要么已經喚醒,或已經取消
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 2. 重新加入到同步隊列中,競爭鎖
    Node p = enq(node);
    int ws = p.waitStatus;
    // 3. 設置前驅結點的狀態為SIGNAL
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

說明: transferForSignal 喚醒等待的線程,重新加入到 syncQueue 同步隊列來競爭鎖。

  1. 等待線程的狀態必須是 CONDITION,否則該等待線程已經被喚醒或取消。
  2. enq 方法重新加入到同步隊列中,競爭鎖。
  3. 前驅結點的狀態必須是 SIGNAL。如果線程不能獲取到鎖,acquireQueued 自旋過程中會通過 shouldParkAfterFailedAcquire 修改其前驅結點的狀態直到 SIGNAL。

5. AQS 無鎖同步隊列

AQS 中同步隊列是雙向鏈表,node.prev 和 node.next 不可能同時通過 CAS 保證其原子性。AQS 中選擇了 node.prev 前驅結點的原子性,而 node.next 后繼結點則是輔助結點。

同步隊列(syncQueue)結構

5.1 為什么是前驅

思考1:AQS 為什么選擇 node.prev 前驅結點的原子性,而 node.next 后繼結點則是輔助結點?

  • next 域:需要修改二處來保證原子性,一是 tail.next;二是 tail 指針。
  • prev 域:只需要修改一處來保證原子性,就是 tail 指針。你可能會說不需要修改 node.prev 嗎?當然需要,但 node 還沒添加到鏈表中,其 node.prev 修改並沒有鎖競爭的問題,將 tail 指針指向 node 時,如果失敗會通過自旋不斷嘗試。
前驅和后驅原子性操作對比

說明: 通過上圖,前驅結點只需要一次原子性操作就可以,而后繼結點則需要二次原子性操作,復雜性就會大提升,這就是 AQS 選擇前驅結點進行原子性操作的原因。


思考2:AQS 明知道 node.next 有可見性問題,為什么還要設計成雙向鏈表?

喚醒同步線程時,如果有后繼結點,那么時間復雜為 O(1)。否則只能只反向遍歷,時間復雜度為 O(n)。

以下兩種情況,則認為 node.next 不可靠,需要從 tail 反向遍歷。

  1. node.next=null:可能結點剛剛插入鏈表中,node.next 仍為空。此時有其它線程通過 unparkSuccessor 來喚醒該線程。
  2. node.next.waitStatus>0:結點已經取消,next 值可能已經改變。

思考3:AQS 同步隊列什么時候刪除結點?

  • 入隊:lock.lock() 獲取鎖失敗時,會將線程添加到同步隊列中。tail 結點總是存在鎖競爭的問題。
  • 出隊:即物理刪除。acquireQueued 自旋(圖 5)時:
    • 獲取鎖成功時,會將頭結點移除,同時將 head 重新指向新結點。也就是 node.prev 鏈打斷了。head 結點基本上不存在鎖競爭問題。因為只有在初始化時 head 頭結點存在鎖競爭,之后都是持有鎖的線程在修改 head 結點。
    • 結點自旋時,如果 shouldParkAfterFailedAcquire 和 cancelAcquire 方法,發現結點已經被取消,則會剔除已經被取消的結點,node.prev 鏈同樣被打斷了。需要注意的是,node 結點自旋時,修改 node 自身的屬性沒有鎖競爭,但如果修改其它結點的屬性則會存在鎖競爭。
  • 取消:即邏輯刪除。如果線程被中斷、超時,那么會將線程的狀態修改為 CANCELLED,查找時會忽略該結點。同時會修改 node.next,但不會修改 node.prev,直到其它線程獲取鎖重新設置 head 才會打斷 node.prev 鏈。

總結: 物理刪除,acquireQueued 自旋時會修改 node.prev。而邏輯刪除,會先標記為 CANCELLED 狀態,並修改 node.next。

5.2 添加結點

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;                     // ① 不存在鎖競爭(因為還沒有添加到鏈表中)
            if (compareAndSetTail(t, node)) {  // ② node.prev一定是對其它線程可見的
                t.next = node;                 // ③ 可能存在並發操作,此時t.next=null
                return t;
            }
        }
    }
}

說明: 線程進入等待隊列時,node.prev 是絕對線程安全的,但 node.next 就不一定了。如果線程只好在此時被喚醒,unparkSuccessor 通過 prev.next 就無法查找到該結點,只能反向遍歷。

5.3 刪除結點

刪除結點,這里指的是物理刪除,修改 node.prev 域,只有在 prev 鏈中斷開才能真正的刪除。

(1)acquireQueued 獲取鎖時,需要重新設置頭結點,會將 node.prev=null:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

說明: head 頭結點只有在獲取鎖才會更新,所以不需要 CAS 原子性操作。

(2)shouldParkAfterFailedAcquire(cancelAcquire 方法類似) 清除取消結點代碼如下:

do {
    node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;

說明: node 結點所在的線程自旋時,修改自身的屬性 node.prev 不存在鎖競爭,但如果修改其它結點的屬性(eg pred.next)則會存在鎖競爭。

5.4 取消結點

取消結點,這里指的是邏輯刪除,將結點的狀態標記為 CANCELLED,同時修改 node.next 域。取消結點操作比較復雜,因為要考慮取消的結點可能為尾結點、中間結點、頭結點三種情況。

private void cancelAcquire(Node node) {
    node.thread = null;

    // 1. 查找前驅結點:忽略CANCELLED結點
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    Node predNext = pred.next;
    
    // 2. 邏輯刪除:此時已經無法從同步隊列中查找到該結點,直到其它線程獲取鎖時會真正物理刪除
    node.waitStatus = Node.CANCELLED;

    // 3.1 被取消的結點是尾結點:直接將 tail.next=null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // 3.2 被取消的結點是中間結點:前驅結點必須改成SIGNAL狀態,否則直接喚醒線程
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        // 3.3 被取消的結點是頭結點:直接喚醒后繼結點
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

說明: cancelAcquire 唯一可以確定的是將 node.waitStatus 修改成 CANCELLED。如果被取消的是頭結點時,需要喚醒后繼結點。至於取消的結點是尾結點或中間結點,並不能保證操作成功與否。

從上圖可以看到:

  • 取消尾結點:設置 tail=pred 且 pred.next=null,但這兩個操作都不能保證成功。
  • 取消中間結點:確保 pred.waitStatus=SIGNAL,如果成功則設置 pred.next=node.next,否則直接喚醒后繼結點。
  • 取消頭結點或設置前驅結點狀態為 SIGNAL 失敗:直接喚醒后繼結點。

總結: cancelAcquire 方法只是邏輯刪除,將結點狀態標記為 CANCELLED,同時可以修改 node.next 域。從這我們也可以看到為什么 unparkSuccessor 方法喚醒后繼結點時,如果后繼結點已經 CANCELLED,就需要從 tail 反向遍歷結點,因為 next 域可能已經被修改。

5.5 喚醒結點

喚醒結點時,需要查找后繼有效結點。如果 next=null 或 next.waitStatus>0 則需要反向遍歷。

private void unparkSuccessor(Node node) {
    ...
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
}

說明: node.next 是輔助結點,存在可見性問題,但 node.prev 一定可以遍歷所有的結點。

參考:


每天用心記錄一點點。內容也許不重要,但習慣很重要!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM