在Java多線程編程中,經常會需要我們控制並發流程,等其他線程執行完畢,或者分階段執行。Java在1.5的juc中引入了CountDownLatch
和CyclicBarrier
,1.7中又引入了Phaser
。
CountDownLatch
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
一個或多個線程等待其他線程完成一系列操作后才執行。
流程圖:
基本使用:
使用兩個 countdown latches的示例。
第一個是開始信號,在發出執行命令前,阻止線程開始執行。
第二個是完成信號,直到所有線程執行完畢,主線程再開始執行。
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}}
CyclicBarrier
A synchronizati on aid that allows a set of threads to all wait for each other to reach a common barrier point.
多個線程互相等待,直到到達同一個同步點,再繼續一起執行。CyclicBarrier適用於多個線程有固定的多步需要執行,線程間互相等待,當都執行完了,在一起執行下一步。
流程圖:
基本使用:
public class CyclicBarrierTest {
static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
static class A implements Runnable {
@Override
public void run() {
System.out.println(3);
}
}
}
以上的例子利用了CyclicBarrier提供的一個更高級的構造函數CyclicBarrier(int parties, Runnable barrierAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。
Phaser
A reusable synchronization barrier, similar in functionality to* {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and* {@link java.util.concurrent.CountDownLatch CountDownLatch}* but supporting more flexible usage.
Phaser
擁有與CyclicBarrier
和CountDownLatch
類似的功勞.但是這個類提供了更加靈活的應用。它支持任務在多個點都進行同步,支持動態調整注冊任務的數量。當然你也可以使用CountDownLatch,但你必須創建多個CountDownLatch對象,每一階段都需要一個CountDownLatch對象。
流程圖:
基本使用:
public class Match {
// 模擬了100米賽跑,10名選手,只等裁判一聲令下。當所有人都到達終點時,比賽結束。
public static void main(String[] args) throws InterruptedException {
final Phaser phaser=new Phaser(1) ;
// 十名選手
for (int index = 0; index < 10; index++) {
phaser.register();
new Thread(new player(phaser),"player"+index).start();
}
System.out.println("Game Start");
//注銷當前線程,比賽開始
phaser.arriveAndDeregister();
//是否非終止態一直等待
while(!phaser.isTerminated()){
}
System.out.println("Game Over");
}
}
class player implements Runnable{
private final Phaser phaser ;
player(Phaser phaser){
this.phaser=phaser;
}
@Override
public void run() {
try {
// 第一階段——等待創建好所有線程再開始
phaser.arriveAndAwaitAdvance();
// 第二階段——等待所有選手准備好再開始
Thread.sleep((long) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() + " ready");
phaser.arriveAndAwaitAdvance();
// 第三階段——等待所有選手准備好到達,到達后,該線程從phaser中注銷,不在進行下面的階段。
Thread.sleep((long) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() + " arrived");
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}