概述
CyclicBarrier是一個同步工具類,它允許一組線程互相等待,直到到達某個公共屏障點。與CountDownLatch不同的是該barrier在釋放等待線程后可以重用,所以稱它為循環(Cyclic)的屏障(Barrier)。
CyclicBarrier支持一個可選的Runnable命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若在繼續所有參與線程之前更新共享狀態,此屏障操作很有用。
使用
提供的方法:
1 //parties表示屏障攔截的線程數量,當屏障撤銷時,先執行barrierAction,然后在釋放所有線程 2 public CyclicBarrier(int parties, Runnable barrierAction) 3 //barrierAction默認為null 4 public CyclicBarrier(int parties) 5 6 /* 7 *當前線程等待直到所有線程都調用了該屏障的await()方法 8 *如果當前線程不是將到達的最后一個線程,將會被阻塞。解除阻塞的情況有以下幾種 9 * 1)最后一個線程調用await() 10 * 2)當前線程被中斷 11 3)其他正在該CyclicBarrier上等待的線程被中斷 12 4)其他正在該CyclicBarrier上等待的線程超時 13 5)其他某個線程調用該CyclicBarrier的reset()方法 14 *如果當前線程在進入此方法時已經設置了該線程的中斷狀態或者在等待時被中斷,將拋出InterruptedException,並且清除當前線程的已中斷狀態。 15 *如果在線程處於等待狀態時barrier被reset()或者在調用await()時 barrier 被損壞,將拋出 BrokenBarrierException 異常。 16 *如果任何線程在等待時被中斷,則其他所有等待線程都將拋出 BrokenBarrierException 異常,並將 barrier 置於損壞狀態。 *如果當前線程是最后一個將要到達的線程,並且構造方法中提供了一個非空的屏障操作(barrierAction),那么在允許其他線程繼續運行之前,當前線程將運行該操作。如果在執行屏障操作過程中發生異常,則該異常將傳播到當前線程中,並將 barrier 置於損壞狀態。 17 * 18 *返回值為當前線程的索引,0表示當前線程是最后一個到達的線程 19 */ 20 public int await() throws InterruptedException, BrokenBarrierException 21 //在await()的基礎上增加超時機制,如果超出指定的等待時間,則拋出 TimeoutException 異常。如果該時間小於等於零,則此方法根本不會等待。 22 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException 23 24 //將屏障重置為其初始狀態。如果所有參與者目前都在屏障處等待,則它們將返回,同時拋出一個BrokenBarrierException。 25 public void reset()
對於失敗的同步嘗試,CyclicBarrier 使用了一種要么全部要么全不 (all-or-none) 的破壞模式:如果因為中斷、失敗或者超時等原因,導致線程過早地離開了屏障點,那么在該屏障點等待的其他所有線程也將通過 BrokenBarrierException(如果它們幾乎同時被中斷,則用 InterruptedException)以反常的方式離開。
使用示例:
每個Worker處理矩陣中的一行,在處理完所有的行之前,該線程將一直在屏障處等待。在各個WOrker處理完所有行后,將執行提供的Runnable屏障操作。
1 class Solver { 2 final int N; //矩陣的行數 3 final float[][] data; //要處理的矩陣 4 final CyclicBarrier barrier; //循環屏障 5 6 class Worker implements Runnable { 7 int myRow; 8 Worker(int row) { myRow = row; } 9 public void run() { 10 while (!done()) { 11 processRow(myRow); //處理指定一行數據 12 13 try { 14 barrier.await(); //在屏障處等待直到 15 } catch (InterruptedException ex) { 16 return; 17 } catch (BrokenBarrierException ex) { 18 return; 19 } 20 } 21 } 22 } 23 24 public Solver(float[][] matrix) { 25 data = matrix; 26 N = matrix.length; 27 //初始化CyclicBarrier 28 barrier = new CyclicBarrier(N, new Runnable() { 29 public void run() { 30 mergeRows(...); //合並行 31 } 32 }); 33 for (int i = 0; i < N; ++i) 34 new Thread(new Worker(i)).start(); 35 36 waitUntilDone(); 37 } 38 }
實現原理
基於ReentrantLock和Condition機制實現。除了getParties()方法,CyclicBarrier的其他方法都需要獲取鎖。
域
1 /** The lock for guarding barrier entry */ 2 private final ReentrantLock lock = new ReentrantLock(); //可重入鎖 3 /** Condition to wait on until tripped */ 4 private final Condition trip = lock.newCondition(); 5 /** The number of parties */ 6 private final int parties; //攔截的線程數量 7 /* The command to run when tripped */ 8 private final Runnable barrierCommand; //當屏障撤銷時,需要執行的屏障操作 9 //當前的Generation。每當屏障失效或者開閘之后都會自動替換掉。從而實現重置的功能。 10 private Generation generation = new Generation(); 11 12 /** 13 * Number of parties still waiting. Counts down from parties to 0 14 * on each generation. It is reset to parties on each new 15 * generation or when broken. 16 */ 17 private int count; //還能阻塞的線程數(即parties-當前阻塞的線程數),當新建generation或generation被破壞時,count會被重置。因為對Count的操作都是在獲取鎖之后,所以不需要其他同步措施。 18 19 //靜態內聯類 20 private static class Generation { 21 boolean broken = false; //當前的屏障是否破壞 22 }
await()
1 public int await() throws InterruptedException, BrokenBarrierException { 2 try { 3 return dowait(false, 0L); 4 } catch (TimeoutException toe) { 5 throw new Error(toe); // cannot happen; 6 } 7 } 8 9 private int dowait(boolean timed, long nanos) 10 throws InterruptedException, BrokenBarrierException, 11 TimeoutException { 12 final ReentrantLock lock = this.lock; 13 lock.lock(); //獲取鎖 14 try { 15 //保存此時的generation 16 final Generation g = generation; 17 //判斷屏障是否被破壞 18 if (g.broken) 19 throw new BrokenBarrierException(); 20 //判斷線程是否被中斷,如果被中斷,調用breakBarrier()進行屏障破壞處理,並拋出InterruptedException 21 if (Thread.interrupted()) { 22 breakBarrier(); 23 throw new InterruptedException(); 24 } 25 26 int index = --count; //剩余count遞減,並賦值給線程索引,作為方法的返回值 27 //如果線程索引將為0,說明當前線程是最后一個到達的線程。執行可能存在的屏障操作 barrierCommand,設置下一個Generation。相當於每次開閘之后都進行了一次reset。 28 if (index == 0) { // tripped 29 boolean ranAction = false; 30 try { 31 final Runnable command = barrierCommand; 32 if (command != null) 33 command.run(); //同步執行barrierCommand 34 ranAction = true; 35 nextGeneration(); //執行成功設置下一個nextGeneration 36 return 0; 37 } finally { 38 if (!ranAction) //如果barrierCommand執行失敗,進行屏障破壞處理 39 breakBarrier(); 40 } 41 } 42 43 //如果當前線程不是最后一個到達的線程 44 // loop until tripped, broken, interrupted, or timed out 45 for (;;) { 46 try { 47 if (!timed) 48 trip.await(); //調用Condition的await()方法阻塞 49 else if (nanos > 0L) 50 nanos = trip.awaitNanos(nanos); //調用Condition的awaitNanos()方法阻塞 51 } catch (InterruptedException ie) { 52 //如果當前線程被中斷,則判斷是否有其他線程已經使屏障破壞。若沒有則進行屏障破壞處理,並拋出異常;否則再次中斷當前線程 53 if (g == generation && ! g.broken) { 54 breakBarrier(); 55 throw ie; 56 } else { 57 // We're about to finish waiting even if we had not 58 // been interrupted, so this interrupt is deemed to 59 // "belong" to subsequent execution. 60 Thread.currentThread().interrupt(); 61 //這種捕獲了InterruptException之后調用Thread.currentThread().interrupt()是一種通用的方式。其實就是為了保存中斷狀態,從而讓其他更高層次的代碼注意到這個中斷。 62 } 63 } 64 //如果屏障被破壞,當前線程拋BrokenBarrierException 65 if (g.broken) 66 throw new BrokenBarrierException(); 67 68 //如果已經換代,直接返回index(last thread已經執行的nextGeneration,但當前線程還沒有執行到該語句) 69 if (g != generation) 70 return index; 71 72 //超時,進行屏障破壞處理,並拋TimeoutException 73 if (timed && nanos <= 0L) { 74 breakBarrier(); 75 throw new TimeoutException(); 76 } 77 } 78 } finally { 79 lock.unlock(); //釋放鎖 80 } 81 } 82 83 //將當前屏障置為破壞狀態、重置count、並喚醒所有被阻塞的線程。 84 //必須先獲取鎖,才能調用此方法 85 private void breakBarrier() { 86 generation.broken = true; 87 count = parties; 88 trip.signalAll(); 89 } 90 91 //喚醒trip上等待的所有線程,設置下一個Generation 92 private void nextGeneration() { 93 // signal completion of last generation 94 trip.signalAll(); 95 // set up next generation 96 count = parties; 97 generation = new Generation(); 98 }
reset()
1 //重置屏障,先進行屏障破壞處理,再設置下一代generation 2 public void reset() { 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 breakBarrier(); // break the current generation 7 nextGeneration(); // start a new generation 8 } finally { 9 lock.unlock(); 10 } 11 }
CyclicBarrier與CountDownLatch比較
1)CountDownLatch:一個線程(或者多個),等待另外N個線程完成某個事情之后才能執行;CyclicBarrier:N個線程相互等待,任何一個線程完成之前,所有的線程都必須等待。
2)CountDownLatch:一次性的;CyclicBarrier:可以重復使用。
3)CountDownLatch基於AQS;CyclicBarrier基於鎖和Condition。本質上都是依賴於volatile和CAS實現的。
參考資料
JDK Doc
《Java並發編程的藝術》
