Condition


Condition接口

在並發編程中,每個Java對象都存在一組監視器方法,如wait()notify()以及notifyAll()方法,通過這些方法,我們可以實現線程間通信與協作(也稱為等待喚醒機制),如生產者-消費者模式,而且這些方法必須配合着synchronized關鍵字使用。

與synchronized的等待喚醒機制相比,Condition具有更多的靈活性以及精確性,這是因為notify()在喚醒線程時是隨機(同一個鎖),而Condition則可通過多個Condition實例對象建立更加精細的線程控制,也就帶來了更多靈活性:

  • 通過Condition能夠精細的控制多線程的休眠與喚醒。

  • 對於一個鎖,可以為多個線程間建立不同的Condition。

Condition是一個接口:

public interface Condition {

 /**
  * 使當前線程進入等待狀態,直到【被通知(signal)】或【中斷】
  * 當其他線程調用singal()或singalAll()方法時,該線程將被喚醒
  * 當其他線程調用interrupt()方法中斷當前線程
  * await()相當於synchronized等待喚醒機制中的wait()方法
  */
 void await() throws InterruptedException;

 // 當前線程進入等待狀態,直到被喚醒,該方法【不響應中斷要求】
 void awaitUninterruptibly();

 // 調用該方法,當前線程進入等待狀態,直到【被喚醒】或【被中斷】或【超時】
 // 其中nanosTimeout指的等待超時時間,單位納秒
 long awaitNanos(long nanosTimeout) throws InterruptedException;

 // 同awaitNanos,但可以指明時間單位
 boolean await(long time, TimeUnit unit) throws InterruptedException;

 // 調用該方法當前線程進入等待狀態,直到被喚醒、中斷或到達某個時間期限(deadline)
 // 如果沒到指定時間就被喚醒,返回true,其他情況返回false
 boolean awaitUntil(Date deadline) throws InterruptedException;

 // 喚醒一個等待在Condition上的線程,
 // 該線程從等待方法返回前,必須獲取與Condition相關聯的鎖,功能與notify()相同
 void signal();

 // 喚醒所有等待在Condition上的線程,
 // 該線程從等待方法返回前,必須獲取與Condition相關聯的鎖,功能與notifyAll()相同
 void signalAll();
}

同步隊列與等待隊列

AQS中存在兩種隊列,一種是同步隊列,一種是等待隊列,而等待隊列就相對於Condition而言的。在使用Condition前必須獲得鎖,同時在Condition的等待隊列上的結點與前面同步隊列的結點是同一個類即Node,其結點的waitStatus的值為CONDITION=1

同步隊列等待隊列的關系:

每個Condition都對應着一個等待隊列,也就是說如果一個鎖上創建了多個Condition對象,那么也就存在多個等待隊列。等待隊列是一個FIFO的隊列,在隊列中每一個節點都包含了一個線程的引用,而該線程就是Condition對象上等待的線程

當一個線程調用了await()相關的方法,那么該線程將會釋放鎖,並構建一個Node節點封裝當前線程的相關信息,加入到等待隊列中進行等待,直到被喚醒中斷超時才從隊列中移出。

Condition中的等待隊列模型如下:

Node節點的數據結構,在等待隊列中使用的變量與同步隊列是不同的,Condtion中等待隊列的結點,只有直接指向的后繼結點,並沒有指明前驅結點,而且使用的變量是nextWaiter而不是next。

等待隊列中結點的狀態只有兩種即CANCELLEDCONDITION,前者表示線程已結束,需要從等待隊列中移除,后者表示條件結點等待被喚醒

每個Codition對象對應於一個等待隊列,也就是說AQS中只能存在一個同步隊列,但可擁有多個等待隊列

newCondition

public class ReentrantLock implements Lock, java.io.Serializable {
    public Condition newCondition() {
        // 使用自定義的條件
        return sync.newCondition();
    }
}

public class MyMutex implements Lock {
    private static class MySync extends AbstractQueuedSynchronizer {   
        /**
         * 主要用於等待/通知機制,每個condition都有一個與之對應的條件等待隊列
         * @return condition
         */
        Condition newCondition() {
            return new ConditionObject();
        }
    }
}

public class ReentrantLock implements Lock, java.io.Serializable {
    final ConditionObject newCondition() {
        return new ConditionObject();
    }
}

AQS#ConditionObject

ConditionObject是Condition的實現類,該類就定義在了AQS中。在實現類ConditionObject中有兩個結點,分別是firstWaiterlastWaiter,firstWaiter代表等待隊列第一個等待結點,lastWaiter代表等待隊列最后一個等待結點

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {
    
    public class ConditionObject implements Condition, java.io.Serializable {

        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        public ConditionObject() { }
        // ...
    }
}

所以,只需要來看一下ConditionObject實現的await/signal方法來使用這兩個成員變量就可以了。

await()

await()方法主要做了3件事:

  • 一是調用addConditionWaiter()方法將當前線程封裝成node結點加入等待隊列

  • 二是調用fullyRelease(node)方法釋放同步狀態,並喚醒后繼結點的線程

  • 三是調用isOnSyncQueue(node)方法判斷結點是否在同步隊列中。注意是個while循環,如果同步隊列中沒有該結點就直接掛起該線程,需要明白的是如果線程被喚醒后就調用acquireQueued(node, savedState)執行自旋操作爭取鎖,即當前線程結點從等待隊列轉移到同步隊列並開始努力獲取鎖

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {
    
    public class ConditionObject implements Condition, java.io.Serializable {

        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        public ConditionObject() { }
        // ...
        
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter(); // 1.構建Node節點,並加入到等待隊列中
            int savedState = fullyRelease(node); // 2.釋放當前線程鎖,即釋放同步狀態
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) { // 3.判斷結點是否在同步隊列(SyncQueue)中,即是否被喚醒
                LockSupport.park(this); // 掛起當前線程
                // 判斷是否被中斷喚醒,如果是退出循環
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 被喚醒后執行自旋操作爭取獲得鎖,同時判斷線程是否被中斷
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters(); // 清理等待隊列中不為CONDITION狀態的結點
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
    }
}

執行addConditionWaiter()添加到等待隊列:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {
    
    public class ConditionObject implements Condition, java.io.Serializable {

        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) { // 判斷是否為結束狀態的結點並移除
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 新構建的節點的waitStatus是CONDITION,注意不是0或SIGNAL了
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // 構建單向同步隊列
            if (t == null)
                firstWaiter = node; // 加入等待隊列
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
    }
}

為什么這里是單向隊列,也沒有使用CAS來保證加入隊列的安全性呢?

因為await是Lock范式try中使用的,說明已經獲取到鎖了,所以就沒必要使用CAS了。至於是單向,因為這里還不涉及到競爭鎖,只是做一個條件等待隊列。

線程已經按相應的條件加入到了條件等待隊列中,那如何再嘗試獲取鎖呢?

signal/signalAll

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {
    
    public class ConditionObject implements Condition, java.io.Serializable {

        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        public final void signal() {
            if (!isHeldExclusively()) // 判斷是否持有獨占鎖,如果不是拋出異常
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null) // 喚醒等待隊列第一個結點的線程
                doSignal(first);
        }
        
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
    }
}

這里signal()方法做了兩件事:

  • 一是判斷當前線程是否持有獨占鎖,沒有就拋出異常,從這點也可以看出只有獨占模式先采用等待隊列,而共享模式下是沒有等待隊列的,也就沒法使用Condition。

  • 二是喚醒等待隊列的第一個結點,即執行doSignal(first)

doSignal/doSignalAll

doSignal(first)方法中做了兩件事:

  • 一是條件等待隊列移除被喚醒的節點,然后重新維護條件等待隊列的firstWaiter和lastWaiter的指向。

  • 二是將從等待隊列移除的結點加入同步隊列(在transferForSignal()方法中完成的),如果進入到同步隊列失敗,並且條件等待隊列還有不為空的節點,則繼續循環喚醒后續其他結點的線程

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer implements java.io.Serializable {
    
    public class ConditionObject implements Condition, java.io.Serializable {

        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        private void doSignal(Node first) {
            do {
                // 移除條件等待隊列中的第一個結點,
             	// 如果后繼結點為null,那么說沒有其他結點,將尾結點也設置為null
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
             // 如果被通知節點沒有進入到同步隊列,並且條件等待隊列還有不為空的節點,則繼續循環通知后續結點
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
        
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            /*
             * 循環判斷是否還有nextWaiter,
             * 如果有就像signal操作一樣,將其從【條件等待隊列】中移到【同步隊列】中
             */
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }
    }

    final boolean transferForSignal(Node node) {
        // 嘗試設置喚醒結點的waitStatus為0,即初始化狀態
        // 如果設置失敗,說明當前結點node的waitStatus已不為CONDITION狀態,那么只能是結束狀態了,因此返回false
        // 返回doSignal()方法中繼續喚醒其他結點的線程,注意這里並不涉及並發問題,
        // 所以CAS操作失敗只可能是預期值不為CONDITION,
    	// 而不是多線程設置導致預期值變化,畢竟操作該方法的線程是持有鎖的。
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
	// 加入同步隊列並返回前驅結點p
        Node p = enq(node);
        int ws = p.waitStatus;
        // 判斷前驅結點是否為結束結點(CANCELLED=1)或者
        // 在設置前驅節點狀態為Node.SIGNAL狀態失敗時,喚醒被通知節點代表的線程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // 喚醒node結點的線程
            LockSupport.unpark(node.thread);
        return true;
    }
}

總結:signal的喚醒過程

signal()被調用后,先判斷當前線程是否持有獨占鎖,如果有,那么喚醒當前Condition對象中等待隊列的第一個結點的線程,並從等待隊列中移除該結點,移動到同步隊列中:

  • 如果加入同步隊列失敗,那么繼續循環喚醒等待隊列中的其他結點的線程;

  • 如果成功加入同步隊列,那么如果其前驅節點是已結束的節點或者設置前驅節點狀態為Node.SIGNAL狀態失敗,則通過LockSupport.unpark()喚醒被通知節點代表的線程,到此signal()任務完成。

被喚醒后的線程,將從前面的await()方法中的while循環中退出,因為此時該線程的結點已在同步隊列中,那么while (!isOnSyncQueue(node))將不在符合循環條件,進而調用AQS的acquireQueued()方法加入獲取同步狀態的競爭中,這就是等待喚醒機制的整個流程實現原理,流程如下圖所示(注意無論是同步隊列還是等待隊列使用的Node數據結構都是同一個,不過是使用的內部變量不同罷了):

生產者-消費者Condition

package condition;

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;

import static java.lang.Thread.currentThread;
import static java.util.concurrent.ThreadLocalRandom.current;

public class TestCondition {
    // 定義顯示鎖
    private static final ReentrantLock lock = new ReentrantLock();
    // 創建與顯式鎖關聯的Condition對象
    private static final Condition condition = lock.newCondition();
    // 鏈表
    private static final LinkedList<Long> list = new LinkedList<>();
    // 鏈表最大容量為100
    private static final int CAPACITY = 100;
    // 定義數據的初始值
    private static long i = 0;

    /**
     * 生產者方法
     */
    private static void produce() {
        // 獲取鎖
        lock.lock();

        try {
            // 當鏈表中數據量達到100時,生產者線程將被阻塞,加入與Condition關聯的wait隊列中
            while (list.size() >= CAPACITY) {
                condition.await();
            }

            // 當鏈表中數據量不足100時,生產新的數據
            i++;
            list.addLast(i);
            System.out.println(currentThread().getName() + " 生產了數據 " + i);
            // 1. 通知其他線程
            condition.signalAll();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }

    /**
     * 消費者方法
     */
    private static void consume() {
        lock.lock();

        try {
            // 當list中數據為空時,消費者線程將被阻塞加入與Condition關聯的wait隊列
            while (list.isEmpty()) {
                condition.await();
            }

            // 消費數據
            Long value = list.removeFirst();
            System.out.println(currentThread().getName() + " 消費了數據 " + value);
            // 2.通知其他線程
            condition.signalAll();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private static void sleep() {
        try {
            TimeUnit.SECONDS.sleep(current().nextInt(5));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        // 啟動10個生產者線程
        IntStream.range(0, 10).forEach(i ->
                new Thread(
                        () -> {
                            for (; ; ) {
                                produce();
                                sleep();
                            }
                        }, "Producer-" + i
                ).start()
        );

        // 啟動5個消費者線程
        IntStream.range(0, 5).forEach(i ->
                new Thread(
                        () -> {
                            for (; ; ) {
                                consume();
                                sleep();
                            }
                        }, "Consumer-" + i
                ).start()
        );
    }
}

輸出結果:

...
Producer-6 生產了數據 8
Producer-4 生產了數據 9
Producer-8 生產了數據 10
Consumer-0 消費了數據 1
Consumer-1 消費了數據 2
Consumer-2 消費了數據 3
Consumer-4 消費了數據 4
Consumer-3 消費了數據 5
Producer-2 生產了數據 11
Producer-6 生產了數據 12
...

注釋1和2處condition.signalAll()喚醒的是與Condition關聯的阻塞隊列中的所有阻塞線程。由於使用的是唯一的一個Condition實例,因此,生產者喚醒的有可能是與Condition關聯的wait隊列中的生產者線程。假設此時生產者線程被喚醒並搶到了CPU的調度獲得了執行權,但又發現隊列已滿再次進入阻塞。這樣的線程上下文開銷實際上是沒有意義的,甚至會影響性能(多線程下的線程上下文切換開銷是非常大的性能損耗)。

因此,需要使用兩個Condition對象,一個用於隊列已滿臨界值條件的處理,另外一個用於隊列為空的臨界值條件的處理。此時,在生產者中喚醒的阻塞線程只能是消費者線程在消費者中喚醒的也只能是生產者線程

package condition;

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;

import static java.lang.Thread.currentThread;
import static java.util.concurrent.ThreadLocalRandom.current;

public class TestCondition {
    // 定義顯示鎖
    private static final ReentrantLock lock = new ReentrantLock();
    // 創建與顯式鎖關聯的Condition對象
    private static final Condition Full_Condition = lock.newCondition();
    private static final Condition EMPTY_Condition = lock.newCondition();
    // 鏈表
    private static final LinkedList<Long> list = new LinkedList<>();
    // 鏈表最大容量為100
    private static final int CAPACITY = 100;
    // 定義數據的初始值
    private static long i = 0;

    /**
     * 生產者方法
     */
    private static void produce() {
        // 獲取鎖
        lock.lock();

        try {
            // 當鏈表中數據量達到100時,生產者線程將被阻塞,加入Full_Condition wait隊列中
            while (list.size() >= CAPACITY) {
                Full_Condition.await();
            }

            // 當鏈表中數據量不足100時,生產新的數據
            i++;
            list.addLast(i);
            System.out.println(currentThread().getName() + " 生產了數據 " + i);
            // 1. 生產者線程通知消費者線程
            EMPTY_Condition.signalAll();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }

    /**
     * 消費者方法
     */
    private static void consume() {
        lock.lock();

        try {
            // 當list中數據為空時,消費者線程將被阻塞加入EMPTY_Condition wait隊列
            while (list.isEmpty()) {
                EMPTY_Condition.await();
            }

            // 消費數據
            Long value = list.removeFirst();
            System.out.println(currentThread().getName() + " 消費了數據 " + value);
            // 2.消費者線程通知生產者線程
            Full_Condition.signalAll();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private static void sleep() {
        try {
            TimeUnit.SECONDS.sleep(current().nextInt(5));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        // 啟動10個生產者線程
        IntStream.range(0, 10).forEach(i ->
                new Thread(
                        () -> {
                            for (; ; ) {
                                produce();
                                sleep();
                            }
                        }, "Producer-" + i
                ).start()
        );

        // 啟動5個消費者線程
        IntStream.range(0, 5).forEach(i ->
                new Thread(
                        () -> {
                            for (; ; ) {
                                consume();
                                sleep();
                            }
                        }, "Consumer-" + i
                ).start()
        );
    }
}

輸出結果:

...
Producer-6 生產了數據 7
Producer-5 生產了數據 8
Producer-8 生產了數據 9
Producer-9 生產了數據 10
Consumer-0 消費了數據 1
Consumer-3 消費了數據 2
Consumer-4 消費了數據 3
Consumer-1 消費了數據 4
Consumer-2 消費了數據 5
Consumer-0 消費了數據 6
Producer-8 生產了數據 11
...


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM