若有不正之處請多多諒解,並歡迎批評指正。
請尊重作者勞動成果,轉載請標明原文鏈接:
http://www.cnblogs.com/go2sea/p/5615531.html
CyclicBarrier是java.util.concurrent包中提供的同步工具。通過這個工具我們可以實現n個線程相互等待。我們可以通過參數指定達到公共屏障點之后的行為。
先上源碼:

1 package java.util.concurrent; 2 import java.util.concurrent.locks.*; 3 4 public class CyclicBarrier { 5 6 private static class Generation { 7 boolean broken = false; 8 } 9 10 private final ReentrantLock lock = new ReentrantLock(); 11 private final Condition trip = lock.newCondition(); 12 private final int parties; 13 private final Runnable barrierCommand; 14 private Generation generation = new Generation(); 15 private int count; 16 17 private void nextGeneration() { 18 // signal completion of last generation 19 trip.signalAll(); 20 // set up next generation 21 count = parties; 22 generation = new Generation(); 23 } 24 25 26 private void breakBarrier() { 27 generation.broken = true; 28 count = parties; 29 trip.signalAll(); 30 } 31 32 private int dowait(boolean timed, long nanos) 33 throws InterruptedException, BrokenBarrierException, TimeoutException { 34 final ReentrantLock lock = this.lock; 35 lock.lock(); 36 try { 37 final Generation g = generation; 38 39 //小概率事件:該線程在等待鎖的過程中,barrier被破壞 40 if (g.broken) 41 throw new BrokenBarrierException(); 42 43 //小概率事件:該線程在等待鎖的過程中被中斷 44 if (Thread.interrupted()) { 45 breakBarrier(); 46 throw new InterruptedException(); 47 } 48 49 int index = --count; 50 //當有parties個線程到達barrier 51 if (index == 0) { // tripped 52 boolean ranAction = false; 53 try { 54 final Runnable command = barrierCommand; 55 //如果設置了barrierCommand,令最后到達的barrier的線程執行它 56 if (command != null) 57 command.run(); 58 ranAction = true; 59 nextGeneration(); 60 return 0; 61 } finally { 62 //注意:當執行barrierCommand出現異常時,ranAction派上用場 63 if (!ranAction) 64 breakBarrier(); 65 } 66 } 67 68 // loop until tripped, broken, interrupted, or timed out 69 for (;;) { 70 try { 71 if (!timed) 72 trip.await(); 73 else if (nanos > 0L) 74 //注意:nanos值標識了是否超時,后續用這個nanos值判斷是否breakBarrier 75 nanos = trip.awaitNanos(nanos); 76 } catch (InterruptedException ie) { 77 if (g == generation && ! g.broken) { 78 breakBarrier(); 79 throw ie; 80 } else { 81 //小概率事件:該線程被中斷,進入鎖等待隊列 82 //在等待過程中,另一個線程更新或破壞了generation 83 //當該線程獲取鎖之后,應重置interrupt標志而不是拋出異常 84 //原因在於:它中斷的太晚了,generation已更新或破壞,它拋出InterruptedException的時機已經過去, 85 //兩種情況: 86 //①g被破壞。已經有一個線程拋出了InterruptedException(也只能由第一個拋),與它同時等待的都拋BrokenBarrierException(后續檢查broken標志會拋)。 87 //②g被更新:此時拋異常沒意義(后續檢查g更新后會return index),這里重置interrupt標志,讓線程繼續執行,讓這個標志由上層處理 88 Thread.currentThread().interrupt(); 89 } 90 } 91 92 //barrier被破壞,拋出異常 93 if (g.broken) 94 throw new BrokenBarrierException(); 95 96 //barrier正常進入下一循環,上一代await的線程繼續執行 97 if (g != generation) 98 return index; 99 100 //只要有一個超時,就breakBarrier,后續線程拋的就是barrier損壞異常 101 if (timed && nanos <= 0L) { 102 breakBarrier(); 103 throw new TimeoutException(); 104 } 105 } 106 } finally { 107 lock.unlock(); 108 } 109 } 110 111 112 public CyclicBarrier(int parties, Runnable barrierAction) { 113 if (parties <= 0) throw new IllegalArgumentException(); 114 this.parties = parties; 115 this.count = parties; 116 this.barrierCommand = barrierAction; 117 } 118 119 public CyclicBarrier(int parties) { 120 this(parties, null); 121 } 122 123 124 public int getParties() { 125 return parties; 126 } 127 128 129 public int await() throws InterruptedException, BrokenBarrierException { 130 try { 131 return dowait(false, 0L); 132 } catch (TimeoutException toe) { 133 throw new Error(toe); // cannot happen; 134 } 135 } 136 137 138 public int await(long timeout, TimeUnit unit) 139 throws InterruptedException, 140 BrokenBarrierException, 141 TimeoutException { 142 return dowait(true, unit.toNanos(timeout)); 143 } 144 145 146 public boolean isBroken() { 147 final ReentrantLock lock = this.lock; 148 lock.lock(); 149 try { 150 return generation.broken; 151 } finally { 152 lock.unlock(); 153 } 154 } 155 156 public void reset() { 157 final ReentrantLock lock = this.lock; 158 lock.lock(); 159 try { 160 breakBarrier(); // break the current generation 161 nextGeneration(); // start a new generation 162 } finally { 163 lock.unlock(); 164 } 165 } 166 167 public int getNumberWaiting() { 168 final ReentrantLock lock = this.lock; 169 lock.lock(); 170 try { 171 return parties - count; 172 } finally { 173 lock.unlock(); 174 } 175 } 176 }
我們先來看一下CyclicBarrier的成員變量:
1 private final ReentrantLock lock = new ReentrantLock(); 2 private final Condition trip = lock.newCondition(); 3 private final int parties; 4 private final Runnable barrierCommand; 5 private Generation generation = new Generation(); 6 private int count;
CyclicBarrier是通過獨占鎖lock和Condition對象trip來實現的,成員parties表示必須有parties個線程到達barrier,成員barrierCommand表示當parties個線程到達之后要執行的代碼,成員count表示離觸發barrierCommand還差count個線程(還有count個線程未到達barrier),成員generation表示當前的“代數”,“cyclic”表示可循環使用,generation是對一次循環的標識。注意:Generation是CyclicBarrier的一個私有內部類,他只有一個成員變量來標識當前的barrier是否已“損壞”:
1 private static class Generation { 2 boolean broken = false; 3 }
構造函數
1 public CyclicBarrier(int parties, Runnable barrierAction) { 2 if (parties <= 0) throw new IllegalArgumentException(); 3 this.parties = parties; 4 this.count = parties; 5 this.barrierCommand = barrierAction; 6 } 7
8 public CyclicBarrier(int parties) { 9 this(parties, null); 10 }
CyclicBarrier提供了兩種構造函數,沒有指定barrierCommand的構造函數是調用第二個構造函數實現的。第二個構造函數有兩個參數:parties和barrierAction,分別用來初始化成員parties和barrierCommand。注意,parties必須大於0,否則會拋出IllegalArgumentException。
await()方法
1 public int await() throws InterruptedException, BrokenBarrierException { 2 try { 3 return dowait(false, 0L); 4 } catch (TimeoutException toe) { 5 throw new Error(toe); // cannot happen; 6 } 7 }
await方法是由調用dowait方法實現的,兩個參數分別代表是否定時等待和等待的時長。
doawait()方法
1 private int dowait(boolean timed, long nanos) 2 throws InterruptedException, BrokenBarrierException, TimeoutException { 3 final ReentrantLock lock = this.lock; 4 lock.lock(); 5 try { 6 final Generation g = generation; 7
8 //小概率事件:該線程在等待鎖的過程中,barrier被破壞
9 if (g.broken) 10 throw new BrokenBarrierException(); 11
12 //小概率事件:該線程在等待鎖的過程中被中斷
13 if (Thread.interrupted()) { 14 breakBarrier(); 15 throw new InterruptedException(); 16 } 17
18 int index = --count; 19 //當有parties個線程到達barrier
20 if (index == 0) { // tripped
21 boolean ranAction = false; 22 try { 23 final Runnable command = barrierCommand; 24 //如果設置了barrierCommand,令最后到達的barrier的線程執行它
25 if (command != null) 26 command.run(); 27 ranAction = true; 28 nextGeneration(); 29 return 0; 30 } finally { 31 //注意:當執行barrierCommand出現異常時,ranAction派上用場
32 if (!ranAction) 33 breakBarrier(); 34 } 35 } 36
37 // loop until tripped, broken, interrupted, or timed out
38 for (;;) { 39 try { 40 if (!timed) 41 trip.await(); 42 else if (nanos > 0L) 43 //注意:nanos值標識了是否超時,后續用這個nanos值判斷是否breakBarrier
44 nanos = trip.awaitNanos(nanos); 45 } catch (InterruptedException ie) { 46 if (g == generation && ! g.broken) { 47 breakBarrier(); 48 throw ie; 49 } else { 50 //小概率事件:該線程被中斷,進入鎖等待隊列 51 //在等待過程中,另一個線程更新或破壞了generation 52 //當該線程獲取鎖之后,應重置interrupt標志而不是拋出異常 53 //原因在於:它中斷的太晚了,generation已更新或破壞,它拋出InterruptedException的時機已經過去, 54 //兩種情況: 55 //①g被破壞:已有一個線程拋出InterruptedException(只能由第一個拋),與它同時等待的都拋BrokenBarrierException(后續檢查broken標志會拋)。 56 //②g被更新:此時拋異常沒意義(后續檢查g更新后會return index),這里重置interrupt標志,讓線程繼續執行,讓這個標志由上層處理
57 Thread.currentThread().interrupt(); 58 } 59 } 60
61 //barrier被破壞,拋出異常
62 if (g.broken) 63 throw new BrokenBarrierException(); 64
65 //barrier正常進入下一循環,上一代await的線程繼續執行
66 if (g != generation) 67 return index; 68
69 //只要有一個超時,就breakBarrier,后續線程拋的就是barrier損壞異常
70 if (timed && nanos <= 0L) { 71 breakBarrier(); 72 throw new TimeoutException(); 73 } 74 } 75 } finally { 76 lock.unlock(); 77 } 78 }
dowait方法是CyclicBarrier的精華。應該重點來理解。
方法開頭首先申請鎖,然后做了兩個判斷:g.broken和Thread.interrupted(),這兩個判斷是分別處理兩種小概率的事件:①該線程在等待鎖的過程中,barrier被破壞②該線程在等待鎖的過程中被中斷。這兩個事件應拋出相應的異常。接下來dowait方法修改了令count減1,如果此時count減為0,說明已經有parties個線程到達barrier,這時由最后到達barrier的線程去執行barrierCommand。注意,這里設置了一個布爾值ranAction,作用是來標識barrierCommand是否被正確執行完畢,如果執行失敗,finally中會執行breakBarrier操作。如果count尚未減為0,則在Condition對象trip上執行await操作,注意:這里有一個InterruptedException的catch子句。當前線程在await中被中斷時,會拋出InterruptedException,這時候如果g==generation&&!g.broken的話,我們執行breakBarrier操作,同時拋出這個異常;如果g!=generation或者g.broken==true的話,我們的操作是重置interrupt標志而不是拋出這個異常。這么做的原因我們分兩種情況討論:
①g被破壞,這也是一個小概率事件,當前線程被中斷后進入鎖等待隊列,此時另一個線程由於某種原因(超時或者被中斷)在他之前獲取了鎖並執行了breakBarrier方法,那么當前線程持有鎖之后就不應再拋InterruptedException,邏輯上應該處理barrier被破壞事件,事實上在后續g.broken的檢查中,他會拋出一個BrokenBarrierException。而當前的InterruptedException被我們捕獲卻沒有做出處理,所以執行interrupt方法重置中斷標志,交由上層程序處理。
②g被更新:說明當前線程在即將完成等待之際被中斷,此時拋異常沒意義(后續檢查g更新后會return index),這里重置interrupt標志,讓線程繼續執行,讓這個標志由上層處理。
后續對g.broken和g!=generation的判斷,分表代表了被喚醒線程(非最后一個到達barrier的線程,也不是被中斷或第一個超時的線程)的兩種退出方法的方式:第一種是以barrier被破壞告終(然后拋異常),第二個是barrier等到parties個線程,壽終正寢(返回該線程的到達次序index)。
最后一個if是第一個超時線程執行breakBarrier操作並跑出異常。最后finally子句要釋放鎖。
至此,整個doawait方法流程就分析完畢了,我們可以發現,在barrier上等待的線程,如果以拋異常結束的話,只有第一個線程會拋InterruptedException或TimeoutException並執行breakBarrier操作,其他等待線程只能拋BrokenBarrierException,邏輯上這也是合理的:一個barrier只能因超時或中斷被破壞一次。
1 private void nextGeneration() { 2 trip.signalAll(); 3 count = parties; 4 generation = new Generation(); 5 } 6
7 private void breakBarrier() { 8 generation.broken = true; 9 count = parties; 10 trip.signalAll(); 11 }
doawait方法中用到的nextGeneration方法將所有等待線程喚醒,更新generation對象,復位count,進入下一輪任務。breakBarrier方法將generation狀態值為broken,復位count(這個復位看上去沒有用,但實際上,在broken之后reset之前,如果調用getNumberWaiting方法查看等待線程數的話,復位count是合理的),並喚醒所有等待線程。在調用reset更新generation之前,barrier將處於不可用狀態。
reset()方法
1 public void reset() { 2 final ReentrantLock lock = this.lock; 3 lock.lock(); 4 try { 5 breakBarrier(); // break the current generation
6 nextGeneration(); // start a new generation
7 } finally { 8 lock.unlock(); 9 } 10 }
reset方法先break當執行breakBarrier操作(如果有線程在barrier上等待,調用reset會導致BrokenBarrierException),再更新generation對象。