java中CyclicBarrier的使用



java中CyclicBarrier的使用

CyclicBarrier是java 5中引入的線程安全的組件。它有一個barrier的概念,主要用來等待所有的線程都執行完畢,然后再去執行特定的操作。

假如我們有很多個線程,每個線程都計算出了一些數據,然后我們需要等待所有的線程都執行完畢,再把各個線程計算出來的數據加起來,的到最終的結果,那么我們就可以使用CyclicBarrier。

CyclicBarrier的方法

我們先看下CyclicBarrier的構造函數:

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

        public CyclicBarrier(int parties) {
        this(parties, null);
    }

CyclicBarrier有兩個構造函數,第一個只接受一個參數,表示需要統一行動的線程個數。第二個參數叫做barrierAction,表示出發barrier是需要執行的方法。

其中barrierAction是一個Runnable,我們可以在其中定義最后需要執行的工作。

再看下重要await方法:

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

        public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

await也有兩個方法,一個是帶時間參數的,一個是不帶時間參數的。

await本質上調用了lock.newCondition().await()方法。

因為有多個parties,下面我們考慮兩種情況。

  1. 該線程不是最后一個調用await的線程

在這種情況下,該線程將會進入等待狀態,直到下面的情況發送:

  • 最后一個線程調用await()
  • 其他線程中斷了當前線程
  • 其他線程中斷了其他正在等待的線程
  • 其他線程在等待barrier的時候超時
  • 其他線程在該barrier上調用的reset()方法

如果該線程在調用await()的時候已經設置了interrupted的狀態,或者在等待的時候被interrupted,那么將會拋出InterruptedException異常,並清除中斷狀態。(這里和Thread的interrupt()方法保持一致)

如果任何線程正在等待狀態中,這時候barrier被重置。或者在線程調用await方法或者正在等待中,barrier被broken,那么將會拋出BrokenBarrierException。

如果任何線程在等待的時候被中斷,那么所有其他等待的線程將會拋出BrokenBarrierException,barrier將會被置為broken狀態。

  1. 如果該線程是最后一個調用await方法的

在這種情況,如果barrierAction不為空,那么該線程將會在其他線程繼續執行前調用這個barrierAction。

如果該操作拋出異常,那么barrier的狀態將會被置為broken狀態。

再看看這個reset() 方法:

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

該方法將會將barrier置為broken狀態,並且開啟一個新的generation,來進行下一輪的操作。

CyclicBarrier的使用

我們在子線程中生成一個隨機的整數隊列,當所有的線程都生成完畢之后,我們再將生成的整數全都加起來。看下怎么實現。

定義生成整數隊列的子線程:

public class CyclicBarrierUsage implements Runnable {

    private CyclicBarrier cyclicBarrier;
    private List<List<Integer>> partialResults;
    private Random random = new Random();

    public CyclicBarrierUsage(CyclicBarrier cyclicBarrier,List<List<Integer>> partialResults){
        this.cyclicBarrier=cyclicBarrier;
        this.partialResults=partialResults;
    }

    @Override
    public void run() {
        String thisThreadName = Thread.currentThread().getName();
        List<Integer> partialResult = new ArrayList<>();

        // Crunch some numbers and store the partial result
        for (int i = 0; i < 10; i++) {
            Integer num = random.nextInt(10);
            System.out.println(thisThreadName
                    + ": Crunching some numbers! Final result - " + num);
            partialResult.add(num);
        }

        partialResults.add(partialResult);
        try {
            System.out.println(thisThreadName
                    + " waiting for others to reach barrier.");
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            // ...
        } catch (BrokenBarrierException e) {
            // ...
        }
    }

}

上面的子線程接收外部傳入的cyclicBarrier和保存數據的partialResults,並在運行完畢調用cyclicBarrier.await()來等待其他線程執行完畢。

看下CyclicBarrier的構建:

CyclicBarrier cyclicBarrier=new CyclicBarrier(5,()->{
            String thisThreadName = Thread.currentThread().getName();

            System.out.println(
                    thisThreadName + ": Computing sum of 5 workers, having 10 results each.");
            int sum = 0;

            for (List<Integer> threadResult : partialResults) {
                System.out.print("Adding ");
                for (Integer partialResult : threadResult) {
                    System.out.print(partialResult+" ");
                    sum += partialResult;
                }
                System.out.println();
            }
            System.out.println(thisThreadName + ": Final result = " + sum);
        });

在CyclicBarrier中,我們定義了一個BarrierAction來做最后數據的匯總處理。

運行:

        for (int i = 0; i < 5; i++) {
            Thread worker = new Thread(new CyclicBarrierUsage(cyclicBarrier,partialResults));
            worker.setName("Thread " + i);
            worker.start();
        }

輸出結果如下:

Spawning 5 worker threads to compute 10 partial results each
Thread 0: Crunching some numbers! Final result - 5
Thread 0: Crunching some numbers! Final result - 3
Thread 1: Crunching some numbers! Final result - 1
Thread 0: Crunching some numbers! Final result - 7
Thread 1: Crunching some numbers! Final result - 8
Thread 0: Crunching some numbers! Final result - 4
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 9
Thread 1: Crunching some numbers! Final result - 3
Thread 2: Crunching some numbers! Final result - 1
Thread 0: Crunching some numbers! Final result - 0
Thread 2: Crunching some numbers! Final result - 9
Thread 1: Crunching some numbers! Final result - 3
Thread 2: Crunching some numbers! Final result - 7
Thread 0: Crunching some numbers! Final result - 2
Thread 2: Crunching some numbers! Final result - 6
Thread 1: Crunching some numbers! Final result - 6
Thread 2: Crunching some numbers! Final result - 5
Thread 0: Crunching some numbers! Final result - 0
Thread 2: Crunching some numbers! Final result - 1
Thread 1: Crunching some numbers! Final result - 5
Thread 2: Crunching some numbers! Final result - 1
Thread 0: Crunching some numbers! Final result - 7
Thread 2: Crunching some numbers! Final result - 8
Thread 1: Crunching some numbers! Final result - 2
Thread 2: Crunching some numbers! Final result - 4
Thread 0 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 7
Thread 1: Crunching some numbers! Final result - 6
Thread 1: Crunching some numbers! Final result - 9
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 9
Thread 3: Crunching some numbers! Final result - 3
Thread 3: Crunching some numbers! Final result - 8
Thread 3: Crunching some numbers! Final result - 8
Thread 3: Crunching some numbers! Final result - 1
Thread 3: Crunching some numbers! Final result - 8
Thread 3: Crunching some numbers! Final result - 0
Thread 3: Crunching some numbers! Final result - 5
Thread 3: Crunching some numbers! Final result - 9
Thread 3: Crunching some numbers! Final result - 1
Thread 3 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 2
Thread 4: Crunching some numbers! Final result - 2
Thread 4: Crunching some numbers! Final result - 5
Thread 4: Crunching some numbers! Final result - 5
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 7
Thread 4: Crunching some numbers! Final result - 4
Thread 4: Crunching some numbers! Final result - 8
Thread 4: Crunching some numbers! Final result - 4
Thread 4: Crunching some numbers! Final result - 3
Thread 4 waiting for others to reach barrier.
Thread 4: Computing sum of 5 workers, having 10 results each.
Adding 5 3 7 4 6 9 0 2 0 7 
Adding 1 9 7 6 5 1 1 8 4 0 
Adding 1 8 3 3 6 5 2 7 6 9 
Adding 9 3 8 8 1 8 0 5 9 1 
Adding 2 2 5 5 3 7 4 8 4 3 
Thread 4: Final result = 230

Process finished with exit code 0

本文的例子可以參考https://github.com/ddean2009/learn-java-concurrency/tree/master/CyclicBarrier

更多教程請參考 flydean的博客


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM