java中Condition類的詳細介紹(詳解)


一 condition 介紹及demo
Condition是在java 1.5中才出現的,它用來替代傳統的Object的wait()、notify()實現線程間的協作,相比使用Object的wait()、notify(),使用Condition的await()、signal()這種方式實現線程間協作更加安全和高效。因此通常來說比較推薦使用Condition,阻塞隊列實際上是使用了Condition來模擬線程間協作。
Condition是個接口,基本的方法就是await()和signal()方法;Condition依賴於Lock接口,生成一個Condition的基本代碼是lock.newCondition();調用Condition的await()和signal()方法,都必須在lock保護之內,就是說必須在lock.lock()和lock.unlock之間才可以使用

  • Conditon中的await()對應Object的wait();
  • Condition中的signal()對應Object的notify();
  • Condition中的signalAll()對應Object的notifyAll()。

condition常見例子arrayblockingqueue。下面是demo:

package com.springboot.study.tests.conditions;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Author: guodong
 * @Date: 2021/8/27 17:22
 * @Version: 1.0
 * @Description:
 */
public class ConTest {

    final Lock lock = new ReentrantLock();
    final Condition condition = lock.newCondition();

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ConTest test = new ConTest();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
        consumer.start();
        producer.start();
    }

    class Consumer extends Thread{

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            try {
                lock.lock();
                System.out.println("我在等一個新信號" + Thread.currentThread().getName());
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
                // TODO Auto-generated catch block
            } finally{
                System.out.println("拿到一個信號" + Thread.currentThread().getName());
                lock.unlock();
            }
        }
    }

    class Producer extends Thread{

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            lock.lock();
            try {
                System.out.println("我拿到鎖" + Thread.currentThread().getName());
                condition.signalAll();
                System.out.println("我發出了一個信號:" + Thread.currentThread().getName());
            } finally{
                lock.unlock();
            }
        }
    }
        


}

運行結果:

我在等一個新信號Thread-1 我拿到鎖Thread-0 我發出了一個信號:Thread-0 拿到一個信號Thread-1

Condition的執行方式,是當在線程Consumer中調用await方法后,線程Consumer將釋放鎖,並且將自己沉睡,等待喚醒,線程Producer獲取到鎖后,開始做事,完畢后,調用Condition的signalall方法,喚醒線程Consumer,線程Consumer恢復執行。以上說明Condition是一個多線程間協調通信的工具類,使得某個,或者某些線程一起等待某個條件(Condition),只有當該條件具備( signal 或者 signalAll方法被帶調用)時 ,這些等待線程才會被喚醒,從而重新爭奪鎖。
Condition實現生產者、消費者模式:

package com.springboot.study.tests.conditions;

import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Author: guodong
 * @Date: 2021/8/27 17:42
 * @Version: 1.0
 * @Description:
 */
public class ConTest2 {
    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) throws InterruptedException  {
        ConTest2 test = new ConTest2();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
        producer.start();
        consumer.start();
        Thread.sleep(0);
        producer.interrupt();
        consumer.interrupt();
    }

    class Consumer extends Thread{
        @Override
        public void run() {
            consume();
        }

        volatile boolean flag=true;

        private void consume() {
            while(flag){
                lock.lock();
                try {
                    while(queue.isEmpty()){
                        try {
                            System.out.println("隊列空,等待數據");
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            flag = false;
                        }
                    }

                    //每次移走隊首元素
                    queue.poll();
                    notFull.signal();
                    System.out.println("從隊列取走一個元素,隊列剩余" + queue.size()+"個元素");
                } finally{
                    lock.unlock();
                }
            }
        }
    }

    class Producer extends Thread{

        @Override
        public void run() {
            produce();
        }

        volatile boolean flag=true;

        private void produce() {
            while(flag){
                lock.lock();
                try {
                    while(queue.size() == queueSize){
                        try {
                            System.out.println("隊列滿,等待有空余空間");
                            notFull.await();
                        } catch (InterruptedException e) {
                            flag = false;
                        }
                    }

                    //每次插入一個元素
                    queue.offer(1);
                    notEmpty.signal();
                    System.out.println("向隊列取中插入一個元素,隊列剩余空間:"+(queueSize-queue.size()));
                } finally{
                    lock.unlock();
                }
            }
        }
    }

}

運行結果如下所示:

向隊列取中插入一個元素,隊列剩余空間:9 向隊列取中插入一個元素,隊列剩余空間:8 向隊列取中插入一個元素,隊列剩余空間:7 向隊列取中插入一個元素,隊列剩余空間:6 從隊列取走一個元素,隊列剩余3個元素 從隊列取走一個元素,隊列剩余2個元素 從隊列取走一個元素,隊列剩余1個元素 從隊列取走一個元素,隊列剩余0個元素 向隊列取中插入一個元素,隊列剩余空間:9 向隊列取中插入一個元素,隊列剩余空間:8 向隊列取中插入一個元素,隊列剩余空間:7 向隊列取中插入一個元素,隊列剩余空間:6 向隊列取中插入一個元素,隊列剩余空間:5 向隊列取中插入一個元素,隊列剩余空間:4 向隊列取中插入一個元素,隊列剩余空間:3 向隊列取中插入一個元素,隊列剩余空間:2 向隊列取中插入一個元素,隊列剩余空間:1 向隊列取中插入一個元素,隊列剩余空間:0 隊列滿,等待有空余空間 隊列滿,等待有空余空間 從隊列取走一個元素,隊列剩余9個元素 從隊列取走一個元素,隊列剩余8個元素 從隊列取走一個元素,隊列剩余7個元素 從隊列取走一個元素,隊列剩余6個元素 從隊列取走一個元素,隊列剩余5個元素 從隊列取走一個元素,隊列剩余4個元素 從隊列取走一個元素,隊列剩余3個元素 從隊列取走一個元素,隊列剩余2個元素 從隊列取走一個元素,隊列剩余1個元素 從隊列取走一個元素,隊列剩余0個元素 隊列空,等待數據 隊列空,等待數據 向隊列取中插入一個元素,隊列剩余空間:9 從隊列取走一個元素,隊列剩余0個元素

二 Condition接口
condition可以通俗的理解為條件隊列。當一個線程在調用了await方法以后,直到線程等待的某個條件為真的時候才會被喚醒。這種方式為線程提供了更加簡單的等待/通知模式。Condition必須要配合鎖一起使用,因為對共享狀態變量的訪問發生在多線程環境下。一個Condition的實例必須與一個Lock綁定,因此Condition一般都是作為Lock的內部實現。

  • await() :造成當前線程在接到信號或被中斷之前一直處於等待狀態。
  • await(long time, TimeUnit unit) :造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態
  • awaitNanos(long nanosTimeout) :造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態。返回值表示剩余時間,如果在nanosTimesout之前喚醒,那么返回值 = nanosTimeout - 消耗時間,如果返回值 <= 0 ,則可以認定它已經超時了。
  • awaitUninterruptibly() :造成當前線程在接到信號之前一直處於等待狀態。【注意:該方法對中斷不敏感】。
  • awaitUntil(Date deadline) :造成當前線程在接到信號、被中斷或到達指定最后期限之前一直處於等待狀態。如果沒有到指定時間就被通知,則返回true,否則表示到了指定時間,返回返回false。
  • signal() :喚醒一個等待線程。該線程從等待方法返回前必須獲得與Condition相關的鎖。
  • signal()All :喚醒所有等待線程。能夠從等待方法返回的線程必須獲得與Condition相關的鎖。

三 condition實現分析:

 

Condition接口包含了多種await方式和兩個通知方法;ConditionObject實現了Condition接口,是AbstractQueuedSynchronizer的內部類(因為Condition的操作都需要獲取想關聯的鎖);Reentrantlock的newCondition方法返回與某個lock實例相關的Condition對象

public abstract class AbstractQueuedLongSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

結合上面的類圖,我們看到condition實現是依賴於aqs,而aqs是個抽象類。里面定義了同步器的基本框架,實現了基本的結構功能。只留有狀態條件的維護由具體同步器根據具體場景來定制,如常見的 ReentrantLock 、 RetrantReadWriteLock和CountDownLatch 等等,
3.1 等待隊列
Condition是AQS的內部類。每個Condition對象都包含一個隊列(等待隊列)。等待隊列是一個FIFO的隊列,在隊列中的每個節點都包含了一個線程引用,該線程就是在Condition對象上等待的線程,如果一個線程調用了Condition.await()方法,那么該線程將會釋放鎖、構造成節點加入等待隊列並進入等待狀態。AQS有一個同步隊列和多個等待隊列,節點都是Node。等待隊列的基本結構如下所示。

等待分為首節點和尾節點。當一個線程調用Condition.await()方法,將會以當前線程構造節點,並將節點從尾部加入等待隊列。新增節點就是將尾部節點指向新增的節點。節點引用更新本來就是在獲取鎖以后的操作,所以不需要CAS保證。同時也是線程安全的操作。(java.util.concurrent.locks.AbstractQueuedSynchronizer)

    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

3.2 等待
當線程調用了Condition的await()方法以后。線程就作為隊列中的一個節點被加入到等待隊列中去了。同時會釋放鎖的擁有。當從await方法返回的時候。當前線程一定會獲取condition相關聯的鎖。
如果從隊列(同步隊列和等待隊列)的角度去看await()方法,當調用await()方法時,相當於同步隊列的首節點(獲取鎖的節點)移動到Condition的等待隊列中。
調用該方法的線程成功的獲取鎖的線程,也就是同步隊列的首節點,該方法會將當前線程構造成節點並加入到等待隊列中,然后釋放同步狀態,喚醒同步隊列中的后繼節點,然后當前線程會進入等待狀態。
當等待隊列中的節點被喚醒的時候,則喚醒節點的線程開始嘗試獲取同步狀態。如果不是通過 其他線程調用Condition.signal()方法喚醒,而是對等待線程進行中斷,則會拋出InterruptedException異常信息。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); //將當前線程包裝下后,
//添加到Condition自己維護的一個鏈表中。
int savedState = fullyRelease(node);//釋放當前線程占有的鎖,從demo中看到,
//調用await前,當前線程是占有鎖的

int interruptMode = 0;
     while (!isOnSyncQueue(node)) {//釋放完畢后,遍歷AQS的隊列,看當前節點是否在隊列中,
//不在 說明它還沒有競爭鎖的資格,所以繼續將自己沉睡。
//直到它被加入到隊列中,聰明的你可能猜到了,
//沒有錯,在singal的時候加入不就可以了?
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被喚醒后,重新開始正式競爭鎖,同樣,如果競爭不到還是會將自己沉睡,等待喚醒重新開始競爭。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

結合代碼去看,同步隊列的首節點 並不會直接加入等待隊列,而是通過addConditionWaiter把當前線程構造成一個新節點並加入到等待隊列中。

        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

3.3 通知
調用Condition的signal()方法,將會喚醒在等待隊列中等待最長時間的節點(條件隊列里的首節點),在喚醒節點前,會將節點移到同步隊列中。當前線程加入到等待隊列中如圖所示:

回到上面的demo,鎖被釋放后,線程Consumer開始沉睡,這個時候線程因為線程Consumer沉睡時,會喚醒AQS隊列中的頭結點,所所以線程Producer會開始競爭鎖,並獲取到,執行完后線程Producer會調用signal方法,“發出”signal信號,signal方法如下:

public final void signal() {
     if (!isHeldExclusively())
     throw new IllegalMonitorStateException();
     Node first = firstWaiter; //firstWaiter為condition自己維護的一個鏈表的頭結點,
                              //取出第一個節點后開始喚醒操作
     if (first != null)
     doSignal(first);
     }

在調用signal()方法之前必須先判斷是否獲取到了鎖(isHeldExclusively方法)。接着獲取等待隊列的首節點,將其移動到同步隊列並且利用LockSupport喚醒節點中的線程。被喚醒的線程將從await方法中的while循環中退出( while (!isOnSyncQueue(node)) { 方法返回true,節點已經在同步隊列中)。隨后調用同步器的acquireQueued()方法加入到同步狀態的競爭當中去。成功獲取到競爭的線程從先前調用await方法返回,此時該線程已經成功獲取了鎖。AQS的同步隊列與Condition的等待隊列,兩個隊列的作用是不同,事實上,每個線程也僅僅會同時存在以上兩個隊列中的一個,流程是這樣的:

注意:
1.線程producer調用signal方法,這個時候Condition的等待隊列中只有線程Consumer一個節點,於是它被取出來,並被加入到AQS的等待隊列中。 注意,這個時候,線程Consumer 並沒有被喚醒。
2.Sync是AQS的抽象子類,實現可重入和互斥的大部分功能。在Sync的子類中有FairSync和NonfairSync兩種代表公平鎖策略和非公平鎖策略。Sync lock方法留給子類去實現,NonfairSync的實現:

 /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

其中如果一開始獲取鎖成功,是直接設置當前線程。否則執行acquire(1),也就是進入aqs等待隊列。這里不展開細節。可以這樣理解,整個協作過程是靠結點在AQS的等待隊列和Condition的等待隊列中來回移動實現的,每個隊列的意義不同,Condition作為一個條件類,很好的自己維護了一個等待信號的隊列,並在適時的時候將結點加入到AQS的等待隊列中來實現的喚醒操作。 

參考博客:
https://blog.csdn.net/a1439775520/article/details/98471610
https://www.cnblogs.com/gemine/p/9039012.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM