全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(三)條件變量


前兩期我們已經掌握了AQS的基本結構、以及AQS是如何釋放和獲取資源的。其實到這里,我們已經掌握了AQS作為同步器的全部功能
不過,有些情況使用同步功能不夠靈活,所以AQS又引入了操作系統中的另一個高度相關的概念——條件變量。由於條件變量的使用緊密依賴於AQS提供的釋放、獲取資源功能和同步隊列,因此都放在了AQS源碼中
能堅持看到這里的同學已經很不容易了,再接再厲,一起沖掉最后一座堡壘吧🦾🦾🦾

簡介

條件變量是什么

條件對象這一概念源自於操作系統,設計它是為了解決等待同步需求,實現線程間協作通信的一種機制。Java其實也已經內置了條件變量,它和監視器鎖是綁定在一起的,即Objectwaitnotify方法,使用這兩個方法就可以實現線程之間的協作

Java中的條件變量直到Java 5才出現,用它來代替傳統的Objectwaitnotify方法。相比waitnotifyConditionawaitsignal方法更加安全和高效,因為Condition是基於AQS實現的,加鎖、釋放鎖的效率更高

條件變量顧名思義就是表示某種條件的變量。不過需要說明的是,條件變量中的條件並沒有實際含義,僅僅只是一個標記,條件的含義需要代碼來賦予

Condition接口

Condition是一個接口,條件變量都實現了Condition接口。該接口的基本方法是awaitsignalsignalAll這些

不過Condition依賴於Lock接口,需要借助Lock.newCondition方法來創建條件變量。因此Condition必然是和某個Lock綁定在一起的,這就和awaitsignal一定會和Object監視器鎖綁定在一起。因此,Java中的條件變量必須配合鎖使用

Condition和Java內置的條件變量方法之間的對應關系如下:

  • Condition.await 等價於 Object.wait
  • Condition.signal 等價於 Object.notify

不多BB了,直接看源碼吧~

AQS之條件對象

AQS為條件變量的實現提供了95%以上的功能,Lock接口實現類一般只需要實現一下newCondition方法,以及AQS的isHeldExclusively方法,就可以直接使用條件變量了,你說方不方便!

AQS實現的方式就是直接提供了Condition的一個實現類——ConditionObject,我接下來就將其稱為條件對象(可能不太嚴謹)。ConditionObject是AQS的內部類,可見性為public

條件對象的結構

每個ConditionObject都維護了一個條件隊列,首尾節點分別由firstWaiterlastWaiter兩個域來管理:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

每個在條件隊列中等待的線程都由Node類來維護,它們之間通過Node類的nextWaiter相連,形成一個單向鏈表。每個NodewaitStatus都是Node.CONDITION,表明該線程正在某個條件變量的條件隊列中等待ing~

條件對象的創建

十分的朴實無華且簡單通透,要是所有代碼都這么簡單,那該有多好啊~

public ConditionObject() { }

正如前面所說,如果基於AQS實現的Lock接口實現類想要使用AQS提供的條件變量,只需要實現newCondition方法、isHeldExclusively方法即可。而實現newCondition方法一般只需要直接調用ConditionObject的構造方法即可,多么簡單!

isHeldExclusively方法會在條件喚醒(signalsignalAll)中被調用,因為只有在持有互斥資源的情況下,才允許使用條件變量,在持有共享資源的情況下是不允許使用的!

接下來主要剖析一下ConditionObject是如何實現Condition接口的兩大重要功能——條件等待條件喚醒

作者: 酒冽        出處: https://www.cnblogs.com/frankiedyz/p/15676704.html
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任

條件等待

await

await方法就是最基本的條件等待,它是可以被中斷的。源碼如下:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 能夠離開while循環,說明被signal或被中斷了,而且在同步隊列中,所以可以調用acquireQueued
    // 而且之前釋放了多少資源,現在就要還原回來,因此獲取的參數是之前保存的state
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

addConditionWaiter方法會添加一個新節點到等待隊列的隊尾,該節點的waitStatusCONDITION。源碼如下:

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果lastWaiter被取消(CANCELLED),就移除它以及前面所有被取消的節點
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);		// 為當前線程創建一個Node,waitStatus為CONDITION
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

unlinkCancelledWaiters會將所有被取消的Node從條件隊列中移除,該方法只會在持有鎖的情況下才會被調用,源碼如下:

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == nul)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

unlinkCancelledWaiters相當於是進行了一次正向遍歷,將那些waitStatus不為CONDITION(即為CANCELLED)的Node給移除,挺簡單的~

回到await方法,接下來調用fullyRelease方法保存當前的state,並調用一次release,最后返回保存的state。源碼如下:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;	// 如果失敗就會將節點取消
    }
}

調用release就是為了完全釋放當前線程所持有的資源,防止死鎖。如果釋放失敗,說明當前線程壓根沒有獲取到資源就調用了await方法,這是不允許的,會拋出IllegalMonitorStateException異常。這也解決了我之前的疑惑——如何保證調用unlinkCancelledWaiters之前必須持有鎖呢? 這里就給出了答案!只有在當前線程持有資源的前提下,代碼才能正常執行下去

回到await方法,接下來調用isOnSyncQueue方法來檢查當前線程對應的Node同步隊列上還是條件隊列上,源碼如下:

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    return findNodeFromTail(node);
}

該方法會先看目標nodewaitStatus是否為CONDITION,或者它的prev域是否為空,如果是,那么它一定在條件隊列上,返回false;如果目標nodenext域不為空,那么它一定在同步隊列上,返回true。如果以上兩種快捷判斷方法無法判定,那么需要調用findNodeFromTail進行暴力判定。總之,如果目標node在條件隊列上等待,則返回true,否則說明在同步隊列上,返回false

await中的while(!isOnSyncQueue)語義:如果當前節點所等待的條件沒有滿足,那么說明它還在條件隊列上等待,就會一直被困在while循環中不停地被阻塞,不能離開while循環去競爭獲取資源

findNodeFromTail方法會從tail開始向前遍歷,以確定目標node是否位於同步隊列上,很簡單哦~其源碼如下:

// 此方法只會被isOnSyncQueue方法調用
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

回到await方法,只有當node在條件隊列上等待,才會執行while循環。在循環中,將當前線程(即node對應的線程)阻塞。如果被喚醒,就調用checkInterruptWhileWaiting方法檢查是否在阻塞過程中被中斷,如果發生了中斷,該方法會返回非0值。while循環中如果發現被中斷,則退出循環

checkInterruptWhileWaiting方法即其調用到的transferAfterCancelledWait方法,它們的源碼如下:

/*
 * 不同返回值的含義:
 * 		0:沒有發生中斷
 * 		THROW_IE(-1):中斷發生在被signal之前
 * 		REINTERRUPT(1):中斷發生在被signal之后
 */
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    0;
}

/*
 * transferAfterCancelledWait有兩個作用:
 * 1、將目標node放到同步隊列上
 * 2、檢測node是否已經被signal,true表示還未被signal
final boolean transferAfterCancelledWait(Node node) {
    // 如果CAS成功,說明該node還未接收到signal信號
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    // 如果上面if中的CAS失敗,說明node已經被signal
    // 那么就等待signal中的邏輯將node移動到同步隊列上去,這里什么也不做,自旋等待即可
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

總之,如果線程在阻塞過程中沒有發生中斷,則checkInterruptWhileWaiting返回0,否則返回非0值
具體來說,如果發生中斷,就會調用transferAfterCancelledWait方法將node轉移到同步隊列,並檢測中斷是發生在被signal之前還是之后。如果發生在被signal之前則返回THROW_IE(-1),如果發生在被signal之后則返回REINTERRUPT(1)

回到await方法的while循環中,如果沒有被signal或被中斷就會一直阻塞。如果被signal就不滿足while條件,會退出循環;如果被中斷而不是被signal,會直接break離開while循環
退出while循環說明此時已經位於同步隊列(要么因為被signal而transfer到同步隊列,要么因為被中斷而調用transferAfterCancelledWait方法移動到同步隊列),可以直接調用acquireQueued方法(調用該方法的前提:位於同步隊列中)排隊獲取鎖。獲取雖然可能導致再次被阻塞,但這里是在同步隊列上的阻塞,而不是在條件隊列上的阻塞

退出處理是通過reportInterruptAfterWait方法,具體如何處理,取決於interruptMode的值。源碼如下:

private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

如果interruptModeTHROW_IE,那么說明該線程並未真正收到signal信號,只是因為被中斷而被喚醒,所等待的條件並不一定被滿足,於是拋出中斷異常;如果是REINTERRUPT,調用selfInterrupt方法設置線程的中斷狀態

總的來說,await執行分為6步:
1、如果當前線程中斷,那么拋出中斷異常
2、為當前線程創建Node,並加入該條件變量的條件隊列隊尾
3、獲取並保存下當前state,並使用保存的state來調用release方法,如果失敗則拋出IllegalMonitorStateException異常
4、阻塞直到被signal或被中斷
5、利用保存的state,調用acquireQueued方法重新獲取狀態
6、如果第四步期間被中斷,則拋出中斷異常

超時await

await方法有一個重載版本,可以設置超時參數,如果超時也會導致等待停止。其源碼如下:

public final boolean await(long time, TimeUnit unit)
    throws InterruptedException {
    long nanosTimeout = unit.toNanos(time);
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

帶**超時參數**的`await`和無參數`await`基本差不多,執行一共分6步,不過最后一步會**返回是否超時**: 1、如果當前線程中斷,那么拋出中斷異常 2、為當前線程創建`Node`,並加入該條件變量的條件隊列 隊尾 3、獲取並保存下當前`state`,並使用保存的`state`來調用`release`方法,如果失敗則拋出`IllegalMonitorStateException`異常 4、阻塞直到被`signal`或被中斷,或**超時** 5、利用保存的`state`,調用`acquireQueued`方法重新獲取狀態 6、如果第四步期間被中斷,則拋出中斷異常;如果第四步是**因為超時而停止阻塞**,則返回false,否則返回true

awaitUninterruptibly

調用awaitUninterruptibly方法在等待過程中,不會因為中斷而停止等待。源碼如下:

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

`awaitUninterruptibly`執行分4步: 1、為當前線程創建`Node`,並加入該條件變量的條件隊列 隊尾 2、獲取並保存下當前`state`,並使用保存的`state`來調用`release`方法,如果失敗則拋出`IllegalMonitorStateException`異常 3、阻塞直到被`signal` 4、利用保存的`state`,調用`acquireQueued`方法重新獲取狀態

awaitNanos

awaitNanos最多等待一定時間,可以被中斷。其源碼如下:

public final long awaitNanos(long nanosTimeout)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 如果超時,則將node從條件隊列轉移到同步隊列上去,並退出while循環
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return deadline - System.nanoTime();
}

`awaitNanos`執行分6步: 1、如果當前線程中斷,那么拋出中斷異常 2、為當前線程創建`Node`,並加入該條件變量的條件隊列 隊尾 3、獲取並保存下當前`state`,並使用保存的`state`來調用`release`方法,如果失敗則拋出`IllegalMonitorStateException`異常 4、阻塞直到被`signal`或被中斷或**超時** 5、利用保存的`state`,調用`acquireQueued`方法重新獲取狀態 6、如果第四步期間被中斷,則拋出中斷異常;最后**返回距離超時還剩多少納秒**

awaitUntil

類似於awaitNanos,會返回是否是因為超時而終止等待。源碼如下:

public final boolean awaitUntil(Date deadline)
    throws InterruptedException {
    long abstime = deadline.getTime();
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (System.currentTimeMillis() > abstime) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        LockSupport.parkUntil(this, abstime);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

`awaitUntil`和`awaitNanos`基本差不多,執行一共分6步,不過最后一步會返回是否超時: 1、如果當前線程中斷,那么拋出中斷異常 2、為當前線程創建`Node`,並加入該條件變量的條件隊列 隊尾 3、獲取並保存下當前`state`,並使用保存的`state`來調用`release`方法,如果失敗則拋出`IllegalMonitorStateException`異常 4、阻塞直到被`signal`或被中斷或超時 5、利用保存的`state`,調用`acquireQueued`方法重新獲取狀態 6、如果第四步期間被中斷,則拋出中斷異常;如果第四步是 因為超時而停止阻塞,則返回false,否則返回true
作者: 酒冽        出處: https://www.cnblogs.com/frankiedyz/p/15676704.html
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任

條件喚醒

signal

signal用於喚醒某個等待在條件隊列的線程。源碼如下:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

首先調用isHeldExclusively方法來判斷當前線程是否持有互斥資源(isHeldExclusively方法只會在signalsignalAll中才會被調用),如果沒有則拋出IllegalMonitorStateException異常。接下來調用doSignal方法將等待最久(隊首)的線程從條件隊列轉移到同步隊列

這里說明一下,只有當線程持有互斥資源時,才支持使用條件變量。關於這點,可以參考全網最詳細的ReentrantReadWriteLock源碼剖析(萬字長文)
其中,寫鎖支持條件變量,而讀鎖不支持。因為寫鎖是互斥資源,而讀鎖是共享資源。原因也可以參見這篇博客里的解析,這里不再贅述

doSignal方法會從目標node(一般都是隊首)開始,將遇到的第一個未被取消的線程從條件隊列移除,並調用transferForSignal方法將其放到同步隊列隊尾去等待獲取資源。源碼如下:

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

transferForSignal方法不僅會判斷目標node是否被取消,也會將目標node從條件隊列移動到同步隊列。源碼如下:

final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))		// 如果CAS失敗,說明node已經被取消了,直接返回false
        return false;

    Node p = enq(node);		// 入隊,返回node的前驅節點p
    int ws = p.waitStatus;
    // 入隊之后希望將前驅節點置為SIGNAL,表明node的線程正在苦等獲取資源
    // 如果前驅節點已被取消,或CAS修改前驅的waitStatus失敗,就干脆直接將node的線程喚醒,不多bb
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

總之,`signal`方法會將 最早的(非取消)線程移動到同步隊列去排隊獲取資源

signalAll

signalAll方法會將條件隊列中所有的線程都移動到同步隊列,讓它們去獲取資源。其源碼如下:

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

該方法會調用doSignalAll方法將條件隊列中所有的線程都移除,並放入同步隊列。源碼如下:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;		// 將條件隊列清空
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

總之,`signalAll`方法會將條件隊列中 所有的線程都移動到同步隊列去排隊獲取資源
作者: 酒冽        出處: https://www.cnblogs.com/frankiedyz/p/15676704.html
版權:本文版權歸作者和博客園共有
轉載:歡迎轉載,但未經作者同意,必須保留此段聲明;必須在文章中給出原文連接;否則 必究法律責任

條件變量的應用

實現生產者-消費者模型

使用上面的非可重入鎖NonReentrantLock的條件變量來實現簡單的生產者-消費者模型(單生產者、單消費者),實現代碼如下:

點擊查看代碼
public class ProducerConsumerModel {

    private static final NonReentrantLock lock = new NonReentrantLock();
    private static final Condition notFull = lock.newCondition();
    private static final Condition notEmpty = lock.newCondition();

    private static final Queue<String> queue = new LinkedList<>();
    private static final int queueLength = 10;        // 隊列長度

    public static void main(String[] args) {
        Thread producer = new Thread(() -> {
            int i = 0;
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueLength) {
                        System.out.println("Queue is full!");
                        notFull.await();
                    }
                    Thread.sleep(2000);
                    System.out.println("Produce product: " + "product" + i);
                    queue.add("product" + i);
                    ++i;
                    notEmpty.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        });

        Thread consumer = new Thread(() -> {
            while (true) {
                lock.lock();
                try {
                    while (queue.isEmpty()) {
                        System.out.println("Queue is empty!");
                        notEmpty.await();
                    }
                    Thread.sleep(2000);
                    String product = queue.poll();
                    System.out.println("Consume product: " + product);
                    notFull.signalAll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        });

        producer.start();
        consumer.start();
    }
}

好了,AQS系列共三期,到此為止,Bye~

全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(一)AQS基礎
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(二)資源的獲取和釋放
全網最詳細的AbstractQueuedSynchronizer(AQS)源碼剖析(三)條件變量


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM