Java並發之CyclicBarrier、CountDownLatch、Phaser


在Java多線程編程中,經常會需要我們控制並發流程,等其他線程執行完畢,或者分階段執行。Java在1.5的juc中引入了CountDownLatchCyclicBarrier,1.7中又引入了Phaser

CountDownLatch

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

一個或多個線程等待其他線程完成一系列操作后才執行。

流程圖

基本使用:

使用兩個 countdown latches的示例。

第一個是開始信號,在發出執行命令前,阻止線程開始執行。

第二個是完成信號,直到所有線程執行完畢,主線程再開始執行。

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();

        doSomethingElse();            // don't let run yet
        startSignal.countDown();      // let all threads proceed
        doSomethingElse();
        doneSignal.await();           // wait for all to finish
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }
    public void run() {
        try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {} // return;
    }

    void doWork() { ... }
}}

CyclicBarrier

A synchronizati on aid that allows a set of threads to all wait for each other to reach a common barrier point.

多個線程互相等待,直到到達同一個同步點,再繼續一起執行。CyclicBarrier適用於多個線程有固定的多步需要執行,線程間互相等待,當都執行完了,在一起執行下一步。

流程圖:

基本使用:

public class CyclicBarrierTest {

    static CyclicBarrier c = new CyclicBarrier(2, new A());

    public static void main(String[] args) {
        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {

                }
                System.out.println(1);
            }
        }).start();

        try {
            c.await();
        } catch (Exception e) {

        }
        System.out.println(2);
    }

    static class A implements Runnable {
        @Override
        public void run() {
            System.out.println(3);
        }
    }
}

以上的例子利用了CyclicBarrier提供的一個更高級的構造函數CyclicBarrier(int parties, Runnable barrierAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。

Phaser

A reusable synchronization barrier, similar in functionality to* {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and* {@link java.util.concurrent.CountDownLatch CountDownLatch}* but supporting more flexible usage.

Phaser擁有與CyclicBarrierCountDownLatch類似的功勞.但是這個類提供了更加靈活的應用。它支持任務在多個點都進行同步,支持動態調整注冊任務的數量。當然你也可以使用CountDownLatch,但你必須創建多個CountDownLatch對象,每一階段都需要一個CountDownLatch對象。

流程圖:

基本使用:

public class Match {

    // 模擬了100米賽跑,10名選手,只等裁判一聲令下。當所有人都到達終點時,比賽結束。
    public static void main(String[] args) throws InterruptedException {

        final Phaser phaser=new Phaser(1) ;
        // 十名選手
        for (int index = 0; index < 10; index++) {
            phaser.register();
            new Thread(new player(phaser),"player"+index).start();
        }
        System.out.println("Game Start");
        //注銷當前線程,比賽開始
        phaser.arriveAndDeregister();
        //是否非終止態一直等待
        while(!phaser.isTerminated()){
        }
        System.out.println("Game Over");
    }
}
class player implements Runnable{

    private  final Phaser phaser ;

    player(Phaser phaser){
        this.phaser=phaser;
    }
    @Override
    public void run() {
        try {
            // 第一階段——等待創建好所有線程再開始
            phaser.arriveAndAwaitAdvance();

            // 第二階段——等待所有選手准備好再開始
            Thread.sleep((long) (Math.random() * 10000));
            System.out.println(Thread.currentThread().getName() + " ready");
            phaser.arriveAndAwaitAdvance();

            // 第三階段——等待所有選手准備好到達,到達后,該線程從phaser中注銷,不在進行下面的階段。
            Thread.sleep((long) (Math.random() * 10000));
            System.out.println(Thread.currentThread().getName() + " arrived");
            phaser.arriveAndDeregister();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM