Phaser這個類的使用場景為N個線程分階段並行的問題。有這么一個任務為“做3道題“,每個學生一個進程,5個學生可以並行做,這個就是常規的並發,但是如果加一個額外的 限制條件,必須等所有人都做完類第一題,才能開始做第二題,必須等所有人都做完了第二題,才能做第三題,這個問題就轉變成了分階段並發的問題,最適合用Phaser來解題,下面給出源代碼,大家可以自己嘗試:
MyPhaser.java
import java.util.concurrent.Phaser; public class MyPhaser extends Phaser { @Override protected boolean onAdvance(int phase, int registeredParties) { //在每個階段執行完成后回調的方法 switch (phase) { case 0: return studentArrived(); case 1: return finishFirstExercise(); case 2: return finishSecondExercise(); case 3: return finishExam(); default: return true; } } private boolean studentArrived(){ System.out.println("學生准備好了,學生人數:"+getRegisteredParties()); return false; } private boolean finishFirstExercise(){ System.out.println("第一題所有學生做完"); return false; } private boolean finishSecondExercise(){ System.out.println("第二題所有學生做完"); return false; } private boolean finishExam(){ System.out.println("第三題所有學生做完,結束考試"); return true; } }
StudentTask.java
import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; public class StudentTask implements Runnable { private Phaser phaser; public StudentTask(Phaser phaser) { this.phaser = phaser; } @Override public void run() { System.out.println(Thread.currentThread().getName()+"到達考試"); phaser.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName()+"做第1題時間..."); doExercise1(); System.out.println(Thread.currentThread().getName()+"做第1題完成..."); phaser.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName()+"做第2題時間..."); doExercise2(); System.out.println(Thread.currentThread().getName()+"做第2題完成..."); phaser.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName()+"做第3題時間..."); doExercise3(); System.out.println(Thread.currentThread().getName()+"做第3題完成..."); phaser.arriveAndAwaitAdvance(); } private void doExercise1() { long duration = (long)(Math.random()*10); try { TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } } private void doExercise2() { long duration = (long)(Math.random()*10); try { TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } } private void doExercise3() { long duration = (long)(Math.random()*10); try { TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } } }
Main.java
public class Main { public static void main(String[] args) { MyPhaser phaser = new MyPhaser(); StudentTask[] studentTask = new StudentTask[5]; for (int i = 0; i < studentTask.length; i++) { studentTask[i] = new StudentTask(phaser); phaser.register(); //注冊一次表示phaser維護的線程個數 } Thread[] threads = new Thread[studentTask.length]; for (int i = 0; i < studentTask.length; i++) { threads[i] = new Thread(studentTask[i], "Student "+i); threads[i].start(); } //等待所有線程執行結束 for (int i = 0; i < studentTask.length; i++) { try { threads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Phaser has finished:"+phaser.isTerminated()); } }
注意這里的arriveAndAwaitAdvance方法,
可以拆分成兩個方法:arrive(), awaitAdvance, 拆分的目的是可以在這兩個方法之間插入一些額外的代碼構造一個叫fuzzy barrier的概念,增加程序的並發性(程序和barrier的並發),來看一段解釋: