Java並發編程系列-AbstractQueuedSynchronizer


原創作品,可以轉載,但是請標注出處地址:https://www.cnblogs.com/V1haoge/p/10566625.html

一、概述

AbstractQueuedSynchronizer簡稱為AQS,是並發包中用於實現並發工具的基礎類,非常明顯,它是一個抽象類。

它提供了一個依賴於FIFO隊列的框架用於實現各種阻塞鎖與同步器。

它依賴於一個int值來表示狀態,並定義了獲取和修改該狀態值的原子方法,具體的同步器需要實現該抽象類,並且使用它定義的這些原子方法來操作狀態值。

它的實現類一般作為待實現的同步器的靜態內部類而存在,用來提供一些方法來實現同步器的功能。

我們可以將其看作是基礎的同步器,並不是具體的某一個同步器,而是同步器的一個抽象。

二、源碼解析

2.1 繼承體系解析

首先來看看其繼承體系:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {}

可以看到它繼承了AbstractOwnableSynchronizer抽象類,這個類很簡單,我們可以整體來看看:

// 就是一個簡單的獨占式同步器,持有被獨占擁有的線程
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private static final long serialVersionUID = 3737899427754241961L;
    // 供子類調用的構造器
    protected AbstractOwnableSynchronizer() { }
    // 表示獨占擁有的線程,下面是其get和set方法
    private transient Thread exclusiveOwnerThread;
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

2.2 內部類解析

2.2.1 Node

靜態內部類Node用於將要加入同步隊列的線程封裝成為隊列節點。這個隊列采用雙向鏈表實現,支持先進先出。

(1)修飾符:
static final class Node {}

該靜態內部類被final修飾,表明作者希望其不被繼承修改。

(2)字段:
static final class Node {
    // 兩個節點標記,用於標識節點對應的線程獲取鎖的模式,是共享式獲取,還是獨享式獲取
    static final Node SHARED = new Node();// 共享模式的節點標記
    static final Node EXCLUSIVE = null;// 獨享模式的節點標記
    // 四個節點狀態,其實還有一個狀態為0-表示當前節點在同步隊列中,等待着獲取鎖
    static final int CANCELLED =  1;// 表示當前節點封裝的線程被中斷或者超時
    static final int SIGNAL    = -1;// 表示當前節點的后繼節點需要被喚醒(unpark)
    static final int CONDITION = -2;// 表示當前節點位於等待隊列中,在等待條件滿足
    static final int PROPAGATE = -3;// 表示當前場景下后續的acquireShared能夠得以執行??
    // 節點狀態,其值就是上面定義的這四個狀態值再加上0
    volatile int waitStatus;
    // 同步隊列的節點指針
    volatile Node prev;// 雙向鏈表中節點指向前節點的指針
    volatile Node next;// 雙向鏈表中節點指向后節點的指針
    // 節點封裝的執行線程
    volatile Thread thread;
    // 等待隊列的節點指針
    Node nextWaiter;// 單向鏈表中節點指向后節點的指針
}

節點狀態:

  • 0:默認狀態,表示節點是同步隊列中等待獲取鎖的線程的節點
  • 1:CANCELLED,表示節點被取消,原因可能是超時或者被中斷,一旦置於該狀態,則不再改變
  • -1:SIGNAL,表示當前節點的后繼節點被阻塞(或即將被阻塞)(使用park),因此當前線程釋放鎖或者被取消執行時需要喚醒(unpark)后繼節點
  • -2:CONDITION,表示當前節點位於等待隊列中,當節點被轉移到同步隊列的時候,狀態值會被更新為0
  • -3:PROPAGATE,表示持續的傳播releaseShared操作
(3)構造器:
static final class Node {
    Node() {}
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

三個構造器各有用處:

  • Node():用戶初始化頭結點,或者創建共享標記SHARED
  • Node(Thread thread, Node mode):給同步隊列添加新節點時使用,用於構造新節點
  • Node(Thread thread, int waitStatus):給等待隊列添加新節點時使用,用於構造新節點

注意:上面的構造器中的mode(模式)屬於Node類型,它有兩種模式SHARED和EXCLUSIVE,分別表示共享模式和獨享模式。而waitStatus表示的是節點狀態。

(4)方法:
static final class Node {
    // 校驗當前節點是否是共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    // 獲取前置節點,必須為非null
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
}

方法解析:

isShared方法主要用於校驗當前節點的鎖獲取模式,是共享還是獨享,實現方式采用nextWaiter與SHARED比較,參照上面的第二個構造器的實現,我們可以知道在新增一個節點的時候,會對節點的nextWaiter進行賦值,而所賦的值正好是新增節點的模式標記,可以說nextWaiter持有節點的模式標記,那么拿其來與SHARED進行比較就是很顯然的事情了。

predecessor方法用於獲取前置節點,主要是在當前置節點不可為null時使用,這樣當前置節點為null,就會拋出空指針。

2.2.2 Condition

Condition並非AQS中的內部類,而是其內部類ConditionObject的父接口,為了后面的ConditionObject,我們提前了解下Condition。

Condition是一個接口,旨在定義一系列針對獲取鎖的線程的操作,實現類似於Object類中wait/notify的功能。我們通過其方法定義可以明顯感覺到這一點。

public interface Condition {
    // 使當前線程等待,知道被喚醒或者中斷,注意需要在臨界區使用,執行該方法之后該線程持有的鎖將被釋放,線程處於等待狀態
    // 四種情況下會退出等待狀態:被signal喚醒,被signalAll喚醒,被interrupt喚醒(需要當前線程可以響應中斷),發生偽喚醒
    void await() throws InterruptedException;
    // 使當前線程等待,直到被喚醒(不響應中斷),注意要在臨界區使用,執行該方法之后該線程持有的鎖將被釋放,線程處於等待狀態
    // 三種情況下會退出等待狀態:被signal喚醒,被signalAll喚醒,發生偽喚醒
    void awaitUninterruptibly();
    // 使當前線程等待,知道被喚醒或者中斷或者超時,注意需要在臨界區使用,執行該方法之后該線程持有的鎖將被釋放,線程處於等待狀態
    // 五種情況下會退出等待狀態:被signal喚醒,被signalAll喚醒,被interrupt喚醒(需要當前線程可以響應中斷),超時,發生偽喚醒
    // nanosTimeout表示當前線程要等待的時間長度
    // 該方法返回一個正數表示線程被提前喚醒,返回一個負數或0表示等待超時
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    // 同上,不同在於上面的只能傳參為納秒值,該方法可以通過單位隨便傳值
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    // 使當前線程等待,知道被喚醒或者中斷或者過了截止日期,注意需要在臨界區使用,執行該方法之后該線程持有的鎖將被釋放,線程處於等待狀態
    // 退出等待狀態的情況同上,只是這里傳參為一個固定的時間點,線程等待到這個時間點將自動蘇醒
    boolean awaitUntil(Date deadline) throws InterruptedException;
    // 喚醒等待隊列中的一個線程,該線程從await返回時必須獲取到鎖
    void signal();
    // 喚醒等待隊列中的所有線程,每個線程從await返回時必須獲取到鎖
    void signalAll();
}

2.2.3 ConditionObject

ConditionObject是Condition的實現類,在AQS中以普通內部類的方式存在。

ConditionObject內部維護了一個單向鏈表實現的等待隊列,隊列的節點與AQS中同步隊列的節點類型一致,均為上面的內部類Node類型。

下面我們來仔細看看這個類:

(1)體系結構
public class ConditionObject implements Condition, java.io.Serializable {}

該類實現了Condition接口和Serializable接口,擁有序列化功能

(2)字段
public class ConditionObject implements Condition, java.io.Serializable {
    // 序列化ID
    private static final long serialVersionUID = 1173984872572414699L;
    // 等待隊列頭結點指針
    private transient Node firstWaiter;
    // 等待隊列尾節點指針
    private transient Node lastWaiter;
    // 中斷模式
    private static final int REINTERRUPT =  1;// 退出等待隊列時重新中斷
    private static final int THROW_IE    = -1;// 退出等待隊列時拋出InterruptedException異常
}

我們可以看到類的五個字段中除了三個靜態字段之外,剩下的兩個被transient修飾,也就是說雖然該類支持序列化,但是序列化無值。

(3)方法

ConditionObject中的公共方法其實就是對Condition接口中定義方法的實現,下面我們逐個分析:

await()

public class ConditionObject implements Condition, java.io.Serializable {
    public final void await() throws InterruptedException {
        // 1-響應中斷,同時會清除中斷標記
        if (Thread.interrupted())
            throw new InterruptedException();
        // 2-將當前線程封裝成Node節點並添加到等待隊列尾部
        Node node = addConditionWaiter();
        // 3-釋放當前線程所占用的lock,在釋放的過程中會喚醒同步隊列中的下一個節點
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 4-阻塞當前線程,直到被中斷或者被喚醒
        while (!isOnSyncQueue(node)) {// 校驗當前線程是否被喚醒(是否被轉移到同步隊列),如果已喚醒則退出循環
            LockSupport.park(this);// 阻塞當前線程
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)// 校驗當前線程是否被中斷
                break;// 如果被中斷則退出循環
        }
        // 5-自旋等待獲取到同步狀態(即獲取到lock)
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        //  6-處理被中斷的情況
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
}

方法解析:

  • 第一步:優先響應中斷,首先校驗當前線程是否被中斷,如果被中斷則拋出InterruptedException異常,否則下一步;
  • 第二步:調用addConditionWaiter()方法,目的是將當前線程封裝成為Node節點並添加到等待隊列的尾部,源碼如下:
public class ConditionObject implements Condition, java.io.Serializable {
    private Node addConditionWaiter() {
        Node t = lastWaiter;// 保存尾節點
        // If lastWaiter is cancelled, clean out.
        // 如果尾節點線程被取消,則清除之
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();// 清除等待隊列中所有的被取消的線程節點
            t = lastWaiter;
        }
        // 將當前線程封裝成為等待隊列的Node節點
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            // 如果等待隊列為空,則將新節點作為頭節點
            firstWaiter = node;
        else
            // 否則將新節點作為新的尾節點添加到等待隊列中
            t.nextWaiter = node;
        // 更新尾節點指針
        lastWaiter = node;
        return node;
    }
}

這個方法里面除了封裝節點和添加節點之外,還有針對等待隊列進行清理的流程,主要是為了清理被取消的線程節點

  • 第三步:調用fullyRelease(node)方法,用於釋放當前線程所持有的鎖並喚醒同步隊列的下一節點,詳情可見AQS方法解析部分;
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();// 獲取同步狀態state值
            // 執行release方法,嘗試釋放當前線程持有的共享狀態,並喚醒下一個線程
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
}
  • 第四步:調用LockSupport.park(this)阻塞當前線程,一但消除被中斷后者線程被喚醒轉移到同步隊列,則退出循環,繼續下一步;

這里涉及到一個中斷模式的問題。中斷模式之前提到過,有兩種:REINTERRUPT和THROW_IE,分別表示針對被中斷的線程在退出等待隊列時的處理方式,前者重新中斷,后者則拋出異常。
此處interruptMode表示的就是中斷模式的值,初始賦值為0,然后通過checkInterruptWhileWaiting(node)方法不斷的進行校驗,其源碼如下:

public class ConditionObject implements Condition, java.io.Serializable {
    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
    }
}

如果線程被中斷則通過方法transferAfterCancelledWait(node)判斷線程是否是在被喚醒之前被中斷,如果是則返回true,否則返回false;如果返回true則采用THROW_IN模式,否則采用REINTERRUPT模式。無論是上面的哪一種模式都代表線程被中斷了,那么此處interruptMode就不再是0,那么條件成立,break退出循環。除此之外transferAfterCancelledWait(node)方法無論返回true還是false,都會將現場節點轉移到同步隊列中

  • 第五步:當前線程已經被轉移到同步隊列中,然后開始自旋以獲取同步狀態,待其獲取到同步狀態(鎖)之后,返回該線程是否被中斷,如果被中斷,再根據其中斷模式進行整理,如何整理呢,主要就是如果當前中斷模式是THROW_IE模式,則保持不變,否則一律修改成REINTERRUPT模式,之后會再次進行一次同步隊列節點清理。
  • 第六步:最后針對不同的中斷模式進行中斷處理,如果是THROW_IN則拋出異常,如果是REINTERRUPT則再次進行中斷。

awaitNanos(long):

public class ConditionObject implements Condition, java.io.Serializable {
    public final long awaitNanos(long nanosTimeout)
            throws InterruptedException {
        // 1-優先響應中斷
        if (Thread.interrupted())
            throw new InterruptedException();
        // 2-將當前線程封裝成Node節點並添加到等待隊列尾部
        Node node = addConditionWaiter();
        // 3-釋放當前線程所占用的lock,在釋放的過程中會喚醒同步隊列中的下一個節點
        int savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;// 計算截止時間點
        int interruptMode = 0;
        // 4-阻塞當前線程,直到被中斷或者被喚醒或者超時
        // 4-1 校驗當前線程是否被喚醒,如果沒有進入循環體
        while (!isOnSyncQueue(node)) {
            // 4-2 如果超時時間小於等於0,則表示線程立即超時,然后進行線程節點轉移處理,並結束循環
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);// 轉移線程節點
                break;
            }
            // 4-3 如果超時設置時間nanosTimeout大於等於spinForTimeoutThreshold,則進行定時阻塞當前線程
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 4-4 如果線程被中斷,則轉移線程到同步隊列,並結束循環
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            // 每次循環都會計算新的nanosTimeout值,然后在下次循環的時候設置阻塞的時限
            nanosTimeout = deadline - System.nanoTime();
        }
        // 5-自旋等待獲取到同步狀態(即獲取到lock)
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        //  6-處理被中斷的情況
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }
}

方法解析:

這個方法的流程與上面的await基本一致,只是在第4步中添加了關於超時判斷的邏輯,這里就着重看一下這一部分,其余部分不再贅述。

包括兩個部分的內容,第一是開始的校驗,如果設置的超時時間小於等於0,表示線程等待立即超時,然后立即轉移到同步隊列尾部,嘗試獲取鎖;第二是如果設置的超時時間大於等於spinForTimeoutThreshold的值,則將當前線程阻塞指定的時間,這個時間會隨着循環的次數不斷的減小。

另外的兩個等待方法awaitUntil(Date deadline)和await(long time, TimeUnit unit)就不再贅述了,原理完全一致,有一個不同的是awaitUninterruptibly()方法:

awaitUninterruptibly()

public class ConditionObject implements Condition, java.io.Serializable {
    public final void awaitUninterruptibly() {
        // 1-將當前線程封裝成Node節點並添加到等待隊列尾部
        Node node = addConditionWaiter();
        // 2-釋放當前線程所占用的lock,在釋放的過程中會喚醒同步隊列中的下一個節點
        int savedState = fullyRelease(node);
        boolean interrupted = false;
        // 3-阻塞當前線程,直到被喚醒
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);// 阻塞當前線程
            if (Thread.interrupted())
                interrupted = true;
        }
        // 4-自旋嘗試獲取同步鎖
        if (acquireQueued(node, savedState) || interrupted)
            selfInterrupt();
    }
}

其實就是不響應中斷的等待方法,從源碼中可以看出,雖然不響應中斷,但是仍然保存着中斷標志。

下面就來看看喚醒的方法:

signal()

public class ConditionObject implements Condition, java.io.Serializable {
    public final void signal() {
        // 1-校驗當前線程時候獨享式持有共享鎖,如果不持有則拋出異常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;// 保存等待隊列首節點
        // 2-如果隊列不為空,則執行頭節點喚醒操作
        if (first != null)
            doSignal(first);
    }
    private void doSignal(Node first) {
        do {
            // 3-如果等待隊列只有一個節點,則將lastWaiter更新為null
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
            // 4-嘗試將線程節點從等待隊列轉移到同步隊列,如果成功則結束循環,如果失敗則再次判斷firstWaiter首節點是否為null,如果不是null,則再次循環,否則結束循環
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
}

方法解析:

  • 第一步:校驗當前線程時候獨享式持有共享鎖,如果不持有則拋出異常
  • 第二步:如果隊列不為空,則執行頭節點喚醒操作
  • 第三步:如果等待隊列只有一個節點(頭節點),則將lastWaiter更新為null
  • 第四步:嘗試將線程節點從等待隊列轉移到同步隊列,如果成功則結束循環,如果失敗則再次判斷firstWaiter首節點是否為null,如果不是null,則再次循環,否則結束循環

signalAll()

public class ConditionObject implements Condition, java.io.Serializable {
    public final void signalAll() {
        // 1-校驗當前線程時候獨享式持有共享鎖,如果不持有則拋出異常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        // 2-如果隊列不為空,則執行節點喚醒操作
        if (first != null)
            doSignalAll(first);
    }
    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;// 要喚醒所有線程節點,那么等待隊列就是被清空,那么就將這兩個指針置為null
        // 3-針對等待隊列中的節點一個一個進行喚醒操作
        do {
            Node next = first.nextWaiter;// 保存二節點
            first.nextWaiter = null;
            transferForSignal(first);// 將首節點轉移到同步隊列
            first = next;// 重置首節點,將二節點作為新的首節點
        } while (first != null);
    }
}

2.3 靜態內容解析

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private static final Unsafe unsafe = Unsafe.getUnsafe();// 注入Unsafe實例
    private static final long stateOffset;// 同步狀態偏移量
    private static final long headOffset;// 等待隊列的頭結點偏移量
    private static final long tailOffset;// 等待隊列的尾節點偏移量
    private static final long waitStatusOffset;// 節點等待狀態偏移量
    private static final long nextOffset;// 節點的下級節點偏移量
    static {
        try {
            // 獲取這五個字段的內存偏移量並保存到各自的字段中
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));
        } catch (Exception ex) { throw new Error(ex); }
    }
}

從這一部分內容可以看出來AQS底層和ConcurrentHashMap一樣是使用CAS來實現原子操作的。

這一部分就是引入Unsafe來實現原子以上幾個字段的原子更新。知道即可。

2.4 字段解析

AQS中字段不多,如下所示:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private transient volatile Node head;// 等待隊列的頭結點
    private transient volatile Node tail;// 等待隊列的尾節點
    private volatile int state;// 同步狀態,初始為0,獲取鎖時會加1,釋放鎖時減1,當重入鎖時也會加1
    static final long spinForTimeoutThreshold = 1000L;// 自旋時限1000納秒
}

這里的head和tail分別指向的是同步器的同步隊列的頭結點與尾節點。這個同步隊列采用雙向鏈表實現,其節點就是之前介紹的內部類中的Node類型。

state表示同步狀態,初始為0,表示未被持有,當其被某線程持有時,就會增加1,而且這個也是實現重入的基礎,當該線程再次獲取當前鎖時,只需要state加1即可,每釋放一個鎖,state-1,直到state等於0時,該同步鎖為完全釋放。

spinForTimeoutThreshold是一個內置的快速自旋時限,當設置的超時時間小於這個值的時候,無需再執行等待設置,直接進入快速自旋即可,原因在於 spinForTimeoutThreshold 已經非常小了,非常短的時間等待無法做到十分精確,如果這時再次進行超時等待,相反會讓nanosTimeout 的超時從整體上面表現得不是那么精確,所以在超時非常短的場景中,AQS會進行無條件的快速自旋。

2.5 方法解析

AQS中的方法可以粗分為四類:獲取同步狀態方法、釋放同步狀態方法、隊列檢驗方法、隊列監控方法,我們羅列一個表格來簡單介紹下這些方法:

分類 序號 方法 說明 備注
獲取同步狀態方法 1 final void acquire(int arg) 獨享獲取同步狀態,不響應中斷
獲取同步狀態方法 2 final void acquireInterruptibly(int arg) 獨享獲取同步狀態,響應中斷
獲取同步狀態方法 3 final boolean tryAcquireNanos(int arg, long nanosTimeout) 獨享獲取同步狀態,響應中斷,響應超時
獲取同步狀態方法 4 final void acquireShared(int arg) 共享獲取同步狀態,不響應中斷
獲取同步狀態方法 5 final void acquireSharedInterruptibly(int arg) 共享獲取同步狀態,響應中斷
獲取同步狀態方法 6 final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 共享獲取同步狀態,響應中斷,響應超時
釋放同步狀態方法 7 final boolean release(int arg) 獨享釋放同步狀態
釋放同步狀態方法 8 final void acquireShared(int arg) 共享釋放同步狀態
隊列檢驗方法 9 final boolean hasQueuedThreads() 校驗同步隊列中是否有線程在等待獲取同步狀態
隊列檢驗方法 10 final boolean hasContended() 校驗是否有線程爭用過此同步器(同步隊列是否為空)
隊列檢驗方法 11 final boolean isQueued(Thread thread) 校驗給定線程是否在同步隊列之上
隊列檢驗方法 12 final boolean hasQueuedPredecessors() 校驗是否有線程等待獲取同步狀態比當前線程時間長(同步隊列中是都有前節點)
隊列檢驗方法 13 final boolean owns(ConditionObject condition) 校驗給定的condition是否是使用當前同步器作為鎖
隊列檢驗方法 14 final boolean hasWaiters(ConditionObject condition) 校驗等待隊列是否有等待線程
隊列監控方法 15 final int getWaitQueueLength(ConditionObject condition) 獲取等待隊列中線程數量
隊列監控方法 16 final Collection getWaitingThreads(ConditionObject condition) 獲取等待隊列中等待線程的集合
隊列監控方法 17 final Thread getFirstQueuedThread() 獲取同步隊列中的頭節點線程
隊列監控方法 18 final int getQueueLength() 獲取同步隊列中線程數量
隊列監控方法 19 final Collection getQueuedThreads() 獲取同步隊列中線程的集合
隊列監控方法 20 final Collection getExclusiveQueuedThreads() 獲取同步隊列中欲獨享獲取同步狀態的線程集合
隊列監控方法 21 final Collection getSharedQueuedThreads() 獲取同步隊列中欲共享獲取同步狀態的線程集合

這些方法中重點就是獲取同步狀態方法和釋放同步狀態方法,下面我們也重點就看下這些個方法的實現:

acquire(int)

該方法表示獨享式獲取同步狀態,但不響應中斷,源碼如下:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
}

該方法中調用了四個方法來完成功能,依次為:

  • tryAcquire(int):一個模板方法,授權子類來實現,主要用於嘗試獨享式獲取同步狀態。
  • addWaiter(Node):將當前線程封裝成Node節點,添加到同步隊列尾部
  • acquireQueued(Node,int):自旋獲取鎖,獲取成功后返回等待過程中是否被中斷過
  • selfInterrupt():進行中斷處理

解析:首先嘗試獨享式獲取同步狀態,如果獲取到了就結束,
如果未獲取到則將線程封裝成為Node節點並添加到同步隊列尾部,然后自旋以獲取同步狀態,
一旦獲取到同步狀態,退出自旋,並返回當前線程在自旋期間是否被中斷過,如果被中斷過則再次自我中斷,
為什么需要再次自我中斷呢,這只是為了保留中斷現場,因為在自旋結束進行中斷校驗時使用的是Thread.interrupted(),
該方法會導致中斷狀態被清除。

tryAcquire方法是一個模板方法,需要在AQS的子類中實現,默認的實現只是拋出了一個異常

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
}

addWaiter方法源碼:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private Node addWaiter(Node mode) {
        // 將當前線程與同步狀態獲取模式封裝成為Node節點
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 嘗試快速進行一次enq操作,將新節點設置為同步地列尾節點,
        // 如果成功會結束方法但如果不成功,可以由下面的enq方法來執行,
        // 這個enq方法可以通過無限循環的方法直到執行成功
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 將新節點添加到同步隊列中
        enq(node);
        return node;
    }
    // 將新節點添加到同步隊列中
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                // 這一步主要是針對同步隊列未初始化時進行的初始化操作,初始化完成后下次循環就會執行新節點的添加操作
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 將之前的尾節點設置為新節點的前節點,然后原子更新尾節點為新節點
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
}

解析:很明顯上面的addWaiter方法中出現了添加新節點到同步隊列的邏輯,而在之后的enq方法中再次出現,
主要目的就是為了能在執行enq方法之前可以先進行一次嘗試,看能否一次執行成功,若成功,則皆大歡喜,
不必走下面的邏輯,若不成功,再走enq方法,來通過無限循環的方式強制執行成功。所以前面的邏輯可以看成是一次簡單的enq操作。

acquireQueued方法源碼:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;// 默認失敗
        try {
            boolean interrupted = false;// 中斷標記
            for (;;) {// 無限循環以自旋
                final Node p = node.predecessor();// 獲取前置節點
                // 如果前節點是頭節點,並且當前線程獲取同步狀態成功,則將當前節點置為頭節點
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC,這里去除以前的節點對當前節點的引用,當前節點對象不再被使用后可以被GC清理
                    failed = false;// 表示成功
                    return interrupted;
                }
                // 如果前置節點不是頭節點,或者當前節點線程未獲取到同步狀態,則將嘗試將前置節點狀態更新為SIGNAL,並阻塞當前線程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}

解析:以無限循環的方法自旋,每次循環都嘗試獨享式獲取同步狀態,如果獲取到了同步狀態,
那么將當前節點置為頭節點;如果前置節點不是頭節點或者未獲取到同步狀態則嘗試將前置節點的狀態更新為SIGNAL,並阻塞當前線程(park),
這種情況下,當前線程需要被喚醒才能繼續執行,當被喚醒之后可以再次循環,嘗試獲取同步狀態,如果不成功,將會再次阻塞,等待再次被喚醒。

AbstractQueuedSynchronizer方法源碼:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;// 獲取前置節點的狀態
        if (ws == Node.SIGNAL)
            // 表示后置線程節點(當前節點需要被喚醒)
            return true;
        if (ws > 0) {
            // 表示前置節點線程被取消,那么清理被取消的線程節點
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 嘗試將前置節點的狀態置為SIGNAL,只有置為SIGNAL之后才能返回true.
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
}

解析:這個方法主要目的就是為了將前置節點狀態置為SIGNAL,這個狀態意思是它后面的那個節點被阻塞了,
需要被喚醒,可見這個狀態就是一個標記,標記着后面節點需要被喚醒。

parkAndCheckInterrupt方法源碼:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);// 阻塞當前線程
        return Thread.interrupted();
    }
}

解析:一旦線程執行到這一步,那么當前線程就會阻塞,后面的return暫時就不會執行。只有在被喚醒之后才能接着返回中斷校驗的結果。

總結:acquire方法首先嘗試獨享式獲取同步狀態(tryAcquire),獲取失敗的情況下需要將當前線程封裝成為一個Node節點,
然后首先嘗試將其設置為同步隊列的為節點,如果失敗,則自旋直到成功為止,然后進行自旋判斷當前節點是否第二節點,如果是,
則嘗試獲取同步狀態,如果成功,將當前節點置為頭節點;否則如果當前節點不是第二節點,或者獲取同步狀態失敗,
則將前置節點狀態置為SIGNAL,然后阻塞(park)當前線程,等待被喚醒,喚醒之后會重復自旋,判斷節點是否第二節點和嘗試獲取同步狀態,
如果還不成功,那么就再次阻塞...

acquireInterruptibly(int)

該方法表示獨享式獲取同步狀態,響應中斷,源碼如下:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        // 中斷校驗,會清除中斷狀態
        if (Thread.interrupted())
            throw new InterruptedException();
        // 嘗試獨享式獲取同步狀態,如果失敗則嘗試中斷的獲取。
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    // 中斷的獲取同步狀態
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        // 首先將當前線程封裝成為Node節點,並保存到同步隊列尾部
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            // 自旋,邏輯桶acquire
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}

解析:一開始就進行中斷校驗,如果未被中斷則嘗試獨享式獲取同步狀態,獲取失敗后則封裝線程為Node節點並保存到同步隊列,然后自旋,邏輯與acquire種的acquireQueued方法邏輯一致,不再贅述。

tryAcquireNanos(int, long)

該方法表示獨享式獲取同步狀態,響應中斷,響應超時,源碼如下:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 首先響應中斷,進行中斷校驗,若被中斷,拋出異常
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);// 超時獲取
    }
    // 超時獲取
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 如果超時時間小於等於0,則直接超時,返回false
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;// 計算截止時間點
        final Node node = addWaiter(Node.EXCLUSIVE);// 封裝線程節點,並添加到同步隊列
        boolean failed = true;
        try {
            for (;;) {// 自旋
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();// 計算剩余超時時間
                // 如果剩余超時時間小於等於0,這說明超時,返回false
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&// 將前置節點狀態置為SIGNAL
                    nanosTimeout > spinForTimeoutThreshold)// 剩余超時時間大於快速自旋時限(1000納秒)
                    LockSupport.parkNanos(this, nanosTimeout);// 限時阻塞當前線程,超時時間為剩余超時時間
                // 再次響應中斷,進行中斷校驗,若被中斷直接拋出異常
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}

spinForTimeoutThreshold:這是系統內置的一個常量,設置為1000納秒,這是一個很短的時間,如果要阻塞的剩余時間小於這個值,就沒有必要再執行阻塞,直接進入快速自旋過程。

解析:整體邏輯基本與前面的兩種類似,不同之處在於增加了針對超時時間處理的邏輯。

與acquireInterruptibly類似,一開始就進行中斷校驗,若被中斷則拋出異常,否則嘗試獨享式獲取同步狀態,
獲取成功,則返回true,如果獲取失敗,則將線程封裝成Node節點保存到同步隊列,然后計算截止時間點(當前時間+超時時間),
然后開始自旋,自旋的邏輯中前半部分與之前相同,只有在前置節點不是頭節點或者獲取同步狀態失敗的情況下邏輯發生了改變,
先計算剩余超時時間nanosTimeout(截止時間點-當前時間),然后將前置節點的狀態置為SIGNAL,判斷剩余超時時間是否大於
spinForTimeoutThreshold,如果大於則限時阻塞當前線程,否則快速自旋即可。

acquireShared(int)

該方法表示共享式獲取同步狀態,不響應中斷,源碼如下:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
}

解析:首先嘗試共享式獲取同步狀態,如果獲取失敗(返回負值),則執行doAcquireShared方法。

tryAcquireShared方法源碼:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
}

該方法是一個模板方法,需要子類來完善邏輯。但大致意義如下,如果獲取失敗返回負數(-1),如果是該同步狀態被首次共享獲取成功,返回0,非首次獲取成功,則返回正數(1)

doAcquireShared方法源碼:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private void doAcquireShared(int arg) {
        // 將線程封裝成功節點,保存到同步隊列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {// 自旋
                final Node p = node.predecessor();// 獲取前置節點
                if (p == head) {
                    // 如果前置節點為頭節點
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 如果成功獲取到同步狀態,則將當前節點置為頭節點,並進行傳播喚醒
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 如果前置節點非頭節點或者獲取同步狀態失敗,則將前置節點設置為SIGNAL,然后阻塞當前線程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // 預存原始頭節點
        setHead(node);// 將當前節點置為頭節點
        // propagate可為0或1,0表示同步狀態被首次獲取,1表示被多次獲取
        // h為原始頭節點
        // head為新頭節點
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;// 獲取下級節點s
            // 如果后繼節點不存在或者后繼節點是共享式的,則喚醒后繼節點
            if (s == null || s.isShared())
                doReleaseShared();// 喚醒后繼節點
        }
    }
}

解析:該方法的邏輯相對於acquireQueued只是稍有變動,大致意思是相同的。不同之處在於此處涉及到一個傳播(Propagate)。
所謂的傳播,其實是在當前節點共享式獲取到同步狀態之后,檢查其后置節點是否也是在等待共享式獲取同步狀態,若是,則將喚醒其后置節點。

doReleaseShared源碼:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private void doReleaseShared() {
        for (;;) {// 自旋
            Node h = head;// 獲取頭節點
            if (h != null && h != tail) {// 如果隊列中存在多個節點的話
                int ws = h.waitStatus;// 頭節點狀態ws
                // 如果頭節點狀態為SIGNAL,則將其
                if (ws == Node.SIGNAL) {// 說明其后繼節點線程被阻塞,需要喚醒
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 首先將頭節點狀態重置為0
                        continue;// 如果重置頭節點狀態操作失敗則重試
                    unparkSuccessor(h);// 然后進行后繼節點喚醒
                }
                // 如果頭節點狀態為0,則將其狀態更新為PROPAGATE
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;// 頭節點更新操作失敗則重試
            }
            if (h == head)
                break;// 頭節點發生變化則退出自旋
        }
    }
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;// 獲取后繼節點s
        if (s == null || s.waitStatus > 0) {
            // 如果s為null或者其狀態為取消,則從后遍歷隊列節點,找到node節點之后的首個未被取消的節點t,賦給s
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);// 執行s節點線程的喚醒操作
    }
}

解析:doReleaseShared方法被兩處調用,一為此處,另一為releaseShared方法,這個是用來共享式釋放同步狀態的方法。
doReleaseShared方法的作用就是為了喚醒后繼節點,主要邏輯如下:首先獲取頭節點的狀態ws,如果ws是SIGNAL,
表示后繼節點需要被喚醒,然后自旋將頭節點狀態更新為0,並執行后繼節點喚醒操作,這里要確保喚醒的是頭節點之后首個
未被取消的線程節點,喚醒之后,后繼節點的線程開始繼續執行,當前線程也繼續執行;如果ws是0,則將頭節點的狀態更新為PROPAGATE,
來確保同步狀態可以順利傳播(因為如果ws為SIGNAL,會自動喚醒下一個節點,而0則不會,所有將其更新為PROPAGATE,表示共享式獲取的傳播)
被喚醒的線程會重置頭節點,一旦重置,當前線程在最后校驗頭節點那一步就會成功,然后執行break退出自旋。

一般來說這里喚醒的主要目的是為了喚醒一個共享式獲取同步狀態的線程節點,它會直接獲取到同步狀態;但也存在特殊情況,比如
這個節點線程被取消了,導致喚醒了一個獨享式獲取的線程節點,那么在這個線程被喚醒后嘗試獨享式獲取同步狀態的時候會獲取不到
(因為同步狀態被共享式獲取的線程持久,而且可能是多個)從而再次進入阻塞。

其實喚醒的主要來源還是靠同步狀態釋放操作來發起的。

acquireSharedInterruptibly(int)

該方法表示共享式獲取同步狀態,響應中斷,源碼如下:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 首先響應中斷
        if (Thread.interrupted())
            throw new InterruptedException();
        // 嘗試共享式獲取同步狀態,失敗則執行doAcquireSharedInterruptibly方法
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    // 可中斷的共享式獲取同步狀態
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 首先封裝線程節點,保存到同步隊列尾部
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {// 自旋
                final Node p = node.predecessor();// 獲取前置節點
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 如果發生了中斷則拋出異常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}

解析:這個方法與acquireShared幾乎一致,僅僅是在處理中斷的問題上有點區別,所以不再贅述。

tryAcquireSharedNanos(int, long)

該方法表示共享式獲取同步狀態,響應中斷,響應超時,源碼如下:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        // 如果超時時間小於等於0,則直接超時,返回false
        if (nanosTimeout <= 0L)
            return false;
        // 計算超時截止時間點(當前時間+超時時間)
        final long deadline = System.nanoTime() + nanosTimeout;
        // 封裝節點並保存隊列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {// 自旋
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                // 計算剩余的超時時間
                nanosTimeout = deadline - System.nanoTime();
                // 如果剩余超時時間小於等於0,直接超時,返回false
                if (nanosTimeout <= 0L)
                    return false;
                // 將前置節點置為SIGNAL,然后校驗剩余超時時間,如果不足spinForTimeoutThreshold,則進入快速自旋,否則執行阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                // 再次響應中斷
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    }

解析:基本雷同,可以參考共享式獲取同步狀態的方法和獨享式響應中斷超時的獲取方法。

release(int)

該方法表示獨享式釋放同步狀態,源碼如下:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    public final boolean release(int arg) {
        // 首先嘗試獨享式釋放同步狀態
        if (tryRelease(arg)) {
            Node h = head;// 頭節點
            // 頭節點存在且狀態不為0,則喚醒其后繼節點
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        // 釋放失敗返回false
        return false;
    }
}

解析:首先調用tryRelease來嘗試獨享式釋放同步狀態,如果成功,則根據頭節點的狀態來決定是否喚醒后繼節點,
頭節點為0則不喚醒。喚醒操作通過調用unparkSuccessor方法來實現,具體邏輯之前已有描述,這里總結一下:
其實就是喚醒頭節點之后的首個未被取消的節點線程,這個線程可能是獨享式的也可能是共享式的。

tryRelease源碼:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
}

解析:tryRelease方法是一個模板方法,同樣需要子類來實現。

releaseShared(int)

該方法表示共享式釋放同步狀態,源碼如下:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    public final boolean releaseShared(int arg) {
        // 嘗試共享式釋放同步狀態,成功后喚醒后繼節點
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
}

解析:很簡單,其中的doReleaseShared方法我們也了解了。

tryReleaseShared源碼:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
}

解析:和前面的那幾個模板方法一樣,需要子類來實現。

剩下的方法都是一些校驗和監控的方法,並不涉及重點邏輯,不再贅述,下面做一個總結

三、總結

總結:

  1. AQS同步器內部維護了一個底層為雙向鏈表的同步隊列,用於保存那些獲取同步狀態失敗的線程。每個AQS同步器還可以關聯多個Condition,其中每個Condition內部維護了一個底層為單向鏈表的等待隊列,用於保存那些基於特定條件而陷入等待的線程。
  2. 內部類Node描述的是同步隊列和等待隊列中節點的類型。節點有兩點需要注意,那就是節點的模式與狀態
    • 節點模式:
      • EXCLUSIVE:獨享式
      • SHARED:共享式
    • 節點狀態:
      • 0:初始狀態,該狀態下不會喚醒后繼節點
      • CANCELLED(1):取消狀態,節點線程被中斷或超時
      • SIGNAL(-1):喚醒狀態,表示該節點的后繼節點被阻塞,需要喚醒
      • CONDITION(-2):表示當前節點位於等待隊列中,在等待條件滿足
      • PROPAGATE(-3):表示共享式獲取同步狀態的傳播
  3. 內部類ConditionObject是Condition的實現類,作為附着在同步器上的一個功能,可用可不用;它提供了一些方法來執行等待和喚醒操作:
    • 等待操作:
      • await():響應中斷
      • awaitNanos(long):響應中斷,響應超時
      • awaitUninterruptibly():不響應中斷,不響應超時
    • 喚醒操作:
      • signal()
      • signalAll()
  4. AQS同步器提供了多個方法從來輔助實現同步狀態的獲取與釋放:
    • 獨享式獲取:
      • acquire(int):不響應中斷,不響應超時
      • acquireInterruptibly(int):響應中斷
      • tryAcquireNanos(int, long):響應中斷,響應超時
    • 獨享式釋放:
      • release(int)
    • 共享式獲取:
      • acquireShared(int):不響應中斷,不響應超時
      • acquireSharedInterruptibly(int):響應中斷
      • tryAcquireSharedNanos(int, long):響應中斷,響應超時
    • 共享式釋放:
      • releaseShared(int)

參考:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM