JUC回顧之-CyclicBarrier底層實現和原理


1.CyclicBarrier 字面意思是可循環(Cyclic)使用的屏障(Barrier)。它要做的事情是讓一組線程到達一個屏障(同步點)時被阻塞,直到最后一個線程到達屏障時候,屏障才會開門。所有被屏障攔截的線程才會運行。

 

 

2.常用的方法:

 

 
CyclicBarrier(int parties)
創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,但它不會在啟動 barrier 時執行預定義的操作。

CyclicBarrier(int parties, Runnable barrierAction)
創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,並在啟動 barrier 時執行給定的屏障操作,該操作由最后一個進入 barrier 的線程執行。

int await()
在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待。

int await(long timeout, TimeUnit unit)
在所有參與者都已經在此屏障上調用 await 方法之前將一直等待,或者超出了指定的等待時間。

int getNumberWaiting()
返回當前在屏障處等待的參與者數目。

int getParties()
返回要求啟動此 barrier 的參與者數目。

boolean isBroken()
查詢此屏障是否處於損壞狀態。

void reset() 
將屏障重置為其初始狀態。如果調用了該函數,則在等待的線程將會拋出BrokenBarrierException異常。

3.底層原理實現

 

 

CyclicBarrier是由ReentrantLock可重入鎖和Condition共同實現的。

具體實現源碼如下:

//1.CyclicBarrier構造方法
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    // parties表示“必須同時到達barrier的線程個數”。
    this.parties = parties;
    // count表示“處在等待狀態的線程個數”。
    this.count = parties;
    // barrierCommand表示“parties個線程到達barrier時,會執行的動作”。
    this.barrierCommand = barrierAction;
}
1. await源碼分析:
 

public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } }
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    // 獲取“獨占鎖(lock)”
    lock.lock();
    try {
        // 保存“當前的generation”
        final Generation g = generation;

        // 若“當前generation已損壞”,則拋出異常。
        if (g.broken)
            throw new BrokenBarrierException();

        // 如果當前線程被中斷,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程。
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

       // 將“count計數器”-1
       int index = --count;
       // 如果index=0,則意味着“有parties個線程到達barrier”。
       if (index == 0) {  // tripped
           boolean ranAction = false;
           try {
               // 如果barrierCommand不為null,則執行該動作。
               final Runnable command = barrierCommand;
               if (command != null)
                   command.run();
               ranAction = true;
               // 喚醒所有等待線程,並更新generation。
               nextGeneration();
               return 0;
           } finally {
               if (!ranAction)
                   breakBarrier();
           }
       }

        // 當前線程一直阻塞,直到“有parties個線程到達barrier” 或 “當前線程被中斷” 或 “超時”這3者之一發生,
        // 當前線程才繼續執行。
        for (;;) {
            try {
                // 如果不是“超時等待”,則調用await()進行等待;否則,調用awaitNanos()進行等待。
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 如果等待過程中,線程被中斷,則執行下面的函數。
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 如果“當前generation已經損壞”,則拋出異常。
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果“generation已經換代”,則返回index。
            if (g != generation)
                return index;

            // 如果是“超時等待”,並且時間已到,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程,並拋出TimeoutException異常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 釋放“獨占鎖(lock)”
        lock.unlock();
    }
}

 

實例如下:

1.await()方法使用

public class CyclicBarrierTest {

    private static int SIZE = 5;

    private static CyclicBarrier cb;

    public static void main(String[] args) {
        cb = new CyclicBarrier(SIZE);
        for (int i = 0; i < SIZE; i++) {
            new MyTask().start();
        }

    }

    static class MyTask extends Thread {
        @Override
        public void run() {
            try {

                System.out.println("線程" + Thread.currentThread().getName() + "正在執行同一個任務");
                // 以睡眠來模擬幾個線程執行一個任務的時間
                Thread.sleep(1000);
                System.out.println("線程" + Thread.currentThread().getName() + "執行任務完成,等待其他線程執行完畢");
                // 用來掛起當前線程,直至所有線程都到達barrier狀態再同時執行后續任務;
                cb.await();

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("所有線程寫入完畢");

        }

    }

}

結果

 

線程Thread-0正在執行同一個任務
線程Thread-2正在執行同一個任務
線程Thread-1正在執行同一個任務
線程Thread-3正在執行同一個任務
線程Thread-4正在執行同一個任務
線程Thread-3執行任務完成,等待其他線程執行完畢
線程Thread-4執行任務完成,等待其他線程執行完畢
線程Thread-0執行任務完成,等待其他線程執行完畢
線程Thread-1執行任務完成,等待其他線程執行完畢
線程Thread-2執行任務完成,等待其他線程執行完畢
所有線程寫入完畢
所有線程寫入完畢
所有線程寫入完畢
所有線程寫入完畢
所有線程寫入完畢

 

 

2.使用 barrier.await(2000, TimeUnit.MILLISECONDS)方法:

public class CyclicBarrierWriteDataWaitTimeOutTest {

    private static final int THREAD_NUM = 5;

    private static final Random random = new Random();

    public static void main(String[] args) throws InterruptedException {

        // 使用構造方法:public CyclicBarrier(int parties, Runnable barrierAction)
        // 參數parties表示一共有多少線程參與這次“活動”,barrierAction是可選的,用來指定當所有線程都完成這些必須的“任務”之后要干的其他事情
        CyclicBarrier barrier = new CyclicBarrier(THREAD_NUM, new Runnable() {

            @Override
            public void run() {
                // 最后寫完數據的線程,會執行這個任務
                System.out.println(Thread.currentThread().getId() + ":所有線程寫數據完畢!");
            }
        });

        // 啟動5個線程,寫數據
        for (int i = 0; i < THREAD_NUM; i++) {
            if (i < THREAD_NUM - 1) {
                Thread t = new Thread(new MyTask(barrier));
                t.start();
            } else {
                // 最后一個線程延遲3秒執行
                Thread.sleep(3000);
                Thread t = new Thread(new MyTask(barrier));
                t.start();
            }

        }

    }

    /**
     * 
     * (線程類)
     *
     * <p>
     * 修改歷史:                                            <br>  
     * 修改日期            修改人員       版本             修改內容<br>  
     * -------------------------------------------------<br>  
     * 2016年8月26日 上午11:21:39   user     1.0        初始化創建<br>
     * </p> 
     *
     * @author        Peng.Li 
     * @version        1.0  
     * @since        JDK1.7
     */
    static class MyTask extends Thread {

        private CyclicBarrier barrier;

        public MyTask(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {

            int time = random.nextInt(1000);
            System.out.println(Thread.currentThread().getId() + ":需要" + time + "毫秒的時間寫入數據");

            try {
                Thread.sleep(time);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            System.out.println(Thread.currentThread().getId() + ":寫入數據完畢,等待其他線程寫入數據");
            try {
                // 用來掛起當前線程,直至所有線程都到達barrier狀態再同時執行后續任務;
                // 等待所有線程都調用過此函數才能繼續后續動作
                // 只等待2s,那么最后一個線程3秒才執行,則必定會超時
                barrier.await(2000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getId() + ":所有線程寫入數據完畢,繼續處理其他任務...");

        }

    }

}

從結果分析:

可以看到,前面四個線程等待最后一個線程超時了,這個時候這四個線程不會再等待最后一個線程寫入完畢,而是直接拋出BrokenBarrierException

異常,繼續執行后續的動作。最后一個線程完成寫入數據操作后也繼續了后續的動作。

需要理解的是:最后一個線程發生超時的異常,其他的線程不會繼續等待,而是去執行其他的任務。

Thread-1:需要650毫秒的時間寫入數據
Thread-3:需要297毫秒的時間寫入數據
Thread-5:需要755毫秒的時間寫入數據
Thread-7:需要79毫秒的時間寫入數據
Thread-7:寫入數據完畢,等待其他線程寫入數據
Thread-3:寫入數據完畢,等待其他線程寫入數據
Thread-1:寫入數據完畢,等待其他線程寫入數據
Thread-5:寫入數據完畢,等待其他線程寫入數據
Thread-7:所有線程寫入數據完畢,繼續處理其他任務...
Thread-1:所有線程寫入數據完畢,繼續處理其他任務...
java.util.concurrent.TimeoutException
Thread-3:所有線程寫入數據完畢,繼續處理其他任務...
Thread-5:所有線程寫入數據完畢,繼續處理其他任務...
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
    at concurrentMy.CountAndCyclicAndSemaphore.cyclibarrier.test.CyclicBarrierWriteDataWaitTimeOutTest$MyTask.run(CyclicBarrierWriteDataWaitTimeOutTest.java:299)
    at java.lang.Thread.run(Thread.java:745)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:243)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
    at concurrentMy.CountAndCyclicAndSemaphore.cyclibarrier.test.CyclicBarrierWriteDataWaitTimeOutTest$MyTask.run(CyclicBarrierWriteDataWaitTimeOutTest.java:299)
    at java.lang.Thread.run(Thread.java:745)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:243)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
    at concurrentMy.CountAndCyclicAndSemaphore.cyclibarrier.test.CyclicBarrierWriteDataWaitTimeOutTest$MyTask.run(CyclicBarrierWriteDataWaitTimeOutTest.java:299)
    at java.lang.Thread.run(Thread.java:745)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:243)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
    at concurrentMy.CountAndCyclicAndSemaphore.cyclibarrier.test.CyclicBarrierWriteDataWaitTimeOutTest$MyTask.run(CyclicBarrierWriteDataWaitTimeOutTest.java:299)
    at java.lang.Thread.run(Thread.java:745)
Thread-9:需要164毫秒的時間寫入數據
Thread-9:寫入數據完畢,等待其他線程寫入數據java.util.concurrent.BrokenBarrierException
Thread-9:所有線程寫入數據完畢,繼續處理其他任務...

    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:200)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:427)
    at concurrentMy.CountAndCyclicAndSemaphore.cyclibarrier.test.CyclicBarrierWriteDataWaitTimeOutTest$MyTask.run(CyclicBarrierWriteDataWaitTimeOutTest.java:299)
    at java.lang.Thread.run(Thread.java:745)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM