CyclicBarrier是如何成為一個"柵欄"的


CyclicBarrier是一種類似於柵欄的存在,意思就是在柵欄開放之前你都只能被擋在柵欄的一側,當柵欄移除之后,之前被擋在一側的多個對象則同時開始動起來。

1. 如何使用CyclicBarrier

  在介紹其原理之前,先了解一下CyclicBarrier應該如何使用。

  假設現在有這樣的場景,我們需要開一個會議,需要張1、張2、張3三個人參加,
會議需要三個人都到齊之后才能開始,否則只能干等着;這個場景用CyclicBarrier可以很契合的模擬出來。代碼如下:

public static void main(String[] args) {
    // 線程池,每個線程代表一個人
    ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();
    // 會議所需的人數為3
    CyclicBarrier barrier = new CyclicBarrier(3);

    executor.execute(() -> {
        try {
            System.err.println("張1到達會議室");
            barrier.await();
            System.err.println("會議開始,張1開始發言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("張2到達會議室");
            barrier.await();
            System.err.println("會議開始,張2開始發言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("張3先去個廁所,內急解決再去開會");
            TimeUnit.SECONDS.sleep(1);
            System.err.println("張3到達會議室");
            barrier.await();
            System.err.println("會議開始,張3開始發言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });


    executor.shutdown();
}

結果圖:
例子圖
  通過上方代碼可以知道CyclicBarrier的幾點:

  1. 使用`await()`來表示完成了某些事情。(上方例子的表現為**到達了會議室**)
  2. 使用`await()`之后**當前線程就進入阻塞狀態**,需要**等待完全滿足`CyclicBarrier`的條件后喚醒**才能繼續接下來的操作。(上方例子中 **為3個人都到達會議室**)
  3. **在最后一個線程達到條件之后,之前阻塞的線程全部放開**,繼續接下來的操作。(上方例子為張3到達會議室)

  這個簡單的例子也讓我們了解CyclicBarrier的使用方法,那來看看其內部究竟是如何實現柵欄的效果的。

2. CyclicBarrier是如何成為"柵欄"的

  從第一節的代碼中我們也能看到,需要關注的就兩個地方

  1. 構造函數
  2. await()方法

只要了解這兩個方法的內部,相當於了解了CyclicBarrier的內部。
那在深入了解之前,先來看下CyclicBarrier的幾個變量,不用刻意去記,看代碼的時候知道這個東西做什么用的就行了:

lock:CyclicBarrier類創建的ReentrantLock實例,關於ReentrantLock不清楚的可以->傳送。

trip:lock中的conditionCyclicBarrier使用該變量來實現各線程之間的阻塞和同時喚醒。同樣,不明白condition作用的=>傳送門

parties:需要滿足條件(調用await方法)的總數,就是說當有parties個線程await()之后就會喚醒全部線程。

barrierCommand:一個Runnable變量,在await方法的調用次數到達總數parties之后,在喚醒全部線程之前執行其run()方法

generation:其內部類,可以理解為周期,周期內需要完成n個任務,只要一個任務失敗,當前周期的所有任務就算失敗,結束當前周期,再開啟下個周期。

count:當前周期剩余需要完成的任務數(剩余調用await方法的次數)

以下為源碼:

public class CyclicBarrier {
    // 內部類,可理解為周期
    private static class Generation {
        // 當前周期是否失敗
        boolean broken = false;
    }

    // 鎖的實例
    private final ReentrantLock lock = new ReentrantLock();
    // ReentrantLock的condition變量,用來控制線程喚醒和阻塞
    private final Condition trip = lock.newCondition();
    // 需要滿足條件的次數,即需要調用await方法的次數
    private final int parties;
    // 滿足條件次數達到parties之后,喚醒所有線程之前執行其 run()方法
    private final Runnable barrierCommand;
    // 當前周期
    private Generation generation = new Generation();
    // 剩余滿足條件次數
    private int count;
    
    // ...
}

  看完CyclicBarrier的幾個變量后,來看其具體的內部實現。

  首先來看構造函數,其構造函數有兩個,一個在達到條件總數(parties)后直接叫醒所有線程;另一個指定一個Runnable在達到條件總數后先執行其run()方法再叫醒。

  • 不指定Runnable,參數只有一個:需要達成的任務數
public CyclicBarrier(int parties) {
    // 直接調用另一個構造方法,Runnable傳null,表示不執行
    this(parties, null);
}
  • 指定Runnable的構造方法,賦值任務總數、剩余任務數、喚醒操作之前的Runnable
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // 任務總數
    this.parties = parties;
    // 剩余需要完成的任務數
    this.count = parties;
    // 喚醒之前執行的Runnable
    this.barrierCommand = barrierAction;
}

  在第一節我們使用的是第一個構造方法,來試試第二個

public static void main(String[] args) throws InterruptedException {

    ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();
    /** =======增加Runnable,其他地方保持一致=============*/
    CyclicBarrier barrier = new CyclicBarrier(3, ()-> System.err.println("在會議開始之前,先給大家發下開會資料"));

    executor.execute(() -> {
        try {
            System.err.println("張1到達會議室");
            barrier.await();
            System.err.println("會議開始,張1開始發言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("張2到達會議室");
            barrier.await();
            System.err.println("會議開始,張2開始發言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });

    executor.execute(() -> {
        try {
            System.err.println("張3先去個廁所,內急解決再去開會");
            TimeUnit.SECONDS.sleep(1);
            System.err.println("張3到達會議室");
            barrier.await();
            System.err.println("會議開始,張3開始發言");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    });


    executor.shutdown();
}

結果圖:

pic2

 看完構造函數,就算理解了一半CyclicBarrier了,接下來來看另一半——await();跟蹤代碼,看到是這樣的

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

直接調用dowait方法,傳參為false0,意思就是不限時等待,除非線程被打斷或者喚醒。再進入dowait方法,這個方法就是CyclicBarrier的另一半,在下方的代碼中很清楚的寫了整個執行流程

/** 參數說明, timed:是否限時, nanos:限時時間*/
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException, TimeoutException {
    // 鎖
    final ReentrantLock lock = this.lock;
    // 獲取鎖,如果失敗的話線程睡眠,進入同步隊列(AQS中的知識)
    lock.lock();
    try {
        /* 拿到鎖之后進入代碼處理邏輯*/
        
        // 當前周期
        final Generation g = generation;

        // 如果當前周期是失敗的,那么直接拋錯
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果當前線程被打斷了,那么此次周期失敗,設置相關參數,然后拋錯
        if (Thread.interrupted()) {
            // 實現代碼在下行的注釋中,設置相關參數來提醒其他線程周期失敗了
            breakBarrier();
            /*
             * private void breakBarrier() {
             *     generation.broken = true;
             *     count = parties;
             *     // 喚醒condition中的所有線程
             *     trip.signalAll();
             * }
             */
            throw new InterruptedException();
        }

        // 如果成功了,那么剩余任務數(count)減1
        int index = --count;
        // 如果為0則表示達到剩余的任務數沒有了,達到CyclicBarrier的條件總數了,需要喚醒其他線程
        if (index == 0) {  
            boolean ranAction = false;
            try {
                // 喚醒之前的Runnable
                final Runnable command = barrierCommand;
                // 如果不為空的話執行其run方法
                if (command != null)
                    command.run();
                ranAction = true;
                // 開啟下個周期,這個方法是CyclicBarrier可以復用的原因,具體實現在下行注釋
                nextGeneration();
                /* private void nextGeneration() {
                 *     // 首先叫醒當前周期的其他線程,告訴其周期結束了,可以執行接下來的操作了
                 *     trip.signalAll();
                 *     // 然后開啟下個周期,剩余任務數重置
                 *     count = parties;
                 *     // 下個周期
                 *     generation = new Generation();
                 * }
                 */
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 如果還不能結束本周期,就一直等待直到結束或者周期失敗
        for (;;) {
            try {
                // await的過程中是釋放鎖的
                // 不限時的話就一直等待直到被喚醒或者打斷
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    // 否則的話等待一段時間后醒來
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

  到這里就基本理解CyclicBarrier的內部實現了,其他像帶參數的await也是一樣邏輯,只不過是多了限時的條件而已。

  最后還有一個比較隱蔽的方法——reset(),先來看下內部類Generation定義注釋中的一段話。(這里感謝評論區中147110老哥對於此方法的提醒

/**
     * ...
     * The generation changes whenever the barrier is tripped, or
     * is reset.
     * ...
     */
    private static class Generation {
        boolean broken = false;
    }

  意思就是說當一個生命周期被中斷或者重置這兩種情況發生的時候會發生改變,那中斷的情況上面注釋也講了(任務線程中斷),那重置又是啥意思呢?

  來,我們把鏡頭給到await()方法的注釋,

/**
     * ...
     * <p>If the current thread is not the last to arrive then it is
     * disabled for thread scheduling purposes and lies dormant until
     * one of the following things happens:
     * <ul>
     * <li>The last thread arrives; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * one of the other waiting threads; or
     * <li>Some other thread times out while waiting for barrier; or
     * <li>Some other thread invokes {@link #reset} on this barrier.
     * </ul>
     * ...
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        // ...
    }

  這段注釋的大概意思就是,如果線程不是最后一個到達(完成任務)的,那么在下面的情況發生之前會進入休眠狀態:

  1. 最后一個線程到達了。
  2. 其他線程打斷了當前線程。
  3. 其他線程打斷了其他等待中的線程。
  4. 其他線程等待時間到了。
  5. 其他線程主動調用了reset方法。(划重點

  上面4條基本就是線程打斷或者完成的情況,屬於能改變生命周期的第一種情況,而第五點就是能改變生命周期的第二種情況——線程主動調用reset()方法。

	public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 打斷當前的生命周期,就是將broken設置為true並且重置狀態后喚起其他線程
            breakBarrier();   
            // 重啟一個新的周期
            nextGeneration();
        } finally {
            lock.unlock();
        }
    }

  okay,以上就是對CyclicBarrier一些比較重要的地方,不過其實如果你了解ReentrantLock的話,就知道CyclicBarrier整個就是對ReentrantLockcondition的活用而已。

3.總結

  整體來說CyclicBarrier的實現相對較簡單,說是ReentrantLockcondition的升級版也不為過。其關鍵點為兩個,一個為其構造函數,決定任務個數和喚醒前操作;另外一個點為await方法,在正常情況下每次await都會減少一個任務數(總數由構造方法決定),在任務數變為0的時候表示周期結束,需要喚醒condition的其他線程,而途中遇到失敗的話當前周期失敗,喚醒其他線程一起拋錯。



失敗不會讓你變得弱小,害怕失敗會。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM