Java多線程之JUC包:CyclicBarrier源碼學習筆記


若有不正之處請多多諒解,並歡迎批評指正。

請尊重作者勞動成果,轉載請標明原文鏈接:

http://www.cnblogs.com/go2sea/p/5615531.html

 

CyclicBarrier是java.util.concurrent包中提供的同步工具。通過這個工具我們可以實現n個線程相互等待。我們可以通過參數指定達到公共屏障點之后的行為。

先上源碼:

 

  1 package java.util.concurrent;
  2 import java.util.concurrent.locks.*;
  3 
  4 public class CyclicBarrier {
  5 
  6     private static class Generation {
  7         boolean broken = false;
  8     }
  9 
 10     private final ReentrantLock lock = new ReentrantLock();
 11     private final Condition trip = lock.newCondition();
 12     private final int parties;
 13     private final Runnable barrierCommand;
 14     private Generation generation = new Generation();
 15     private int count;
 16 
 17     private void nextGeneration() {
 18         // signal completion of last generation
 19         trip.signalAll();
 20         // set up next generation
 21         count = parties;
 22         generation = new Generation();
 23     }
 24 
 25 
 26     private void breakBarrier() {
 27         generation.broken = true;
 28         count = parties;
 29         trip.signalAll();
 30     }
 31 
 32     private int dowait(boolean timed, long nanos) 
 33             throws InterruptedException, BrokenBarrierException, TimeoutException {
 34         final ReentrantLock lock = this.lock;
 35         lock.lock();
 36         try {
 37             final Generation g = generation;
 38 
 39             //小概率事件:該線程在等待鎖的過程中,barrier被破壞
 40             if (g.broken)
 41                 throw new BrokenBarrierException();
 42 
 43             //小概率事件:該線程在等待鎖的過程中被中斷
 44             if (Thread.interrupted()) {
 45                 breakBarrier();
 46                 throw new InterruptedException();
 47             }
 48 
 49            int index = --count;
 50            //當有parties個線程到達barrier
 51            if (index == 0) {  // tripped
 52                 boolean ranAction = false;
 53                 try {
 54                    final Runnable command = barrierCommand;
 55                    //如果設置了barrierCommand,令最后到達的barrier的線程執行它
 56                    if (command != null)
 57                         command.run();
 58                     ranAction = true;
 59                     nextGeneration();
 60                     return 0;
 61                } finally {
 62                     //注意:當執行barrierCommand出現異常時,ranAction派上用場
 63                     if (!ranAction)
 64                         breakBarrier();
 65                }
 66            }
 67 
 68             // loop until tripped, broken, interrupted, or timed out
 69             for (;;) {
 70                 try {
 71                     if (!timed)
 72                         trip.await();
 73                     else if (nanos > 0L)
 74                         //注意:nanos值標識了是否超時,后續用這個nanos值判斷是否breakBarrier
 75                         nanos = trip.awaitNanos(nanos);
 76                 } catch (InterruptedException ie) {
 77                     if (g == generation && ! g.broken) {
 78                         breakBarrier();
 79                         throw ie;
 80                     } else {
 81                         //小概率事件:該線程被中斷,進入鎖等待隊列
 82                         //在等待過程中,另一個線程更新或破壞了generation
 83                         //當該線程獲取鎖之后,應重置interrupt標志而不是拋出異常
 84                         //原因在於:它中斷的太晚了,generation已更新或破壞,它拋出InterruptedException的時機已經過去,
 85                         //兩種情況:
 86                         //①g被破壞。已經有一個線程拋出了InterruptedException(也只能由第一個拋),與它同時等待的都拋BrokenBarrierException(后續檢查broken標志會拋)。
 87                         //②g被更新:此時拋異常沒意義(后續檢查g更新后會return index),這里重置interrupt標志,讓線程繼續執行,讓這個標志由上層處理
 88                         Thread.currentThread().interrupt();
 89                     }
 90                 }
 91 
 92                 //barrier被破壞,拋出異常
 93                 if (g.broken)
 94                     throw new BrokenBarrierException();
 95                 
 96                 //barrier正常進入下一循環,上一代await的線程繼續執行
 97                 if (g != generation)
 98                     return index;
 99                 
100                 //只要有一個超時,就breakBarrier,后續線程拋的就是barrier損壞異常
101                 if (timed && nanos <= 0L) {
102                     breakBarrier();
103                     throw new TimeoutException();
104                 }
105             }
106         } finally {
107             lock.unlock();
108         }
109     }
110 
111 
112     public CyclicBarrier(int parties, Runnable barrierAction) {
113         if (parties <= 0) throw new IllegalArgumentException();
114         this.parties = parties;
115         this.count = parties;
116         this.barrierCommand = barrierAction;
117     }
118 
119     public CyclicBarrier(int parties) {
120         this(parties, null);
121     }
122 
123 
124     public int getParties() {
125         return parties;
126     }
127 
128 
129     public int await() throws InterruptedException, BrokenBarrierException {
130         try {
131             return dowait(false, 0L);
132         } catch (TimeoutException toe) {
133             throw new Error(toe); // cannot happen;
134         }
135     }
136 
137 
138     public int await(long timeout, TimeUnit unit)
139         throws InterruptedException,
140                BrokenBarrierException,
141                TimeoutException {
142         return dowait(true, unit.toNanos(timeout));
143     }
144 
145     
146     public boolean isBroken() {
147         final ReentrantLock lock = this.lock;
148         lock.lock();
149         try {
150             return generation.broken;
151         } finally {
152             lock.unlock();
153         }
154     }
155 
156     public void reset() {
157         final ReentrantLock lock = this.lock;
158         lock.lock();
159         try {
160             breakBarrier();   // break the current generation
161             nextGeneration(); // start a new generation
162         } finally {
163             lock.unlock();
164         }
165     }
166 
167     public int getNumberWaiting() {
168         final ReentrantLock lock = this.lock;
169         lock.lock();
170         try {
171             return parties - count;
172         } finally {
173             lock.unlock();
174         }
175     }
176 }
View Code

 

我們先來看一下CyclicBarrier的成員變量:

1 private final ReentrantLock lock = new ReentrantLock();
2 private final Condition trip = lock.newCondition();
3 private final int parties;
4 private final Runnable barrierCommand;
5 private Generation generation = new Generation();
6 private int count;

 

CyclicBarrier是通過獨占鎖lock和Condition對象trip來實現的,成員parties表示必須有parties個線程到達barrier,成員barrierCommand表示當parties個線程到達之后要執行的代碼,成員count表示離觸發barrierCommand還差count個線程(還有count個線程未到達barrier),成員generation表示當前的“代數”,“cyclic”表示可循環使用,generation是對一次循環的標識。注意:Generation是CyclicBarrier的一個私有內部類,他只有一個成員變量來標識當前的barrier是否已“損壞”:

1 private static class Generation {
2     boolean broken = false;
3 }

 

構造函數

 1 public CyclicBarrier(int parties, Runnable barrierAction) {  2     if (parties <= 0) throw new IllegalArgumentException();  3     this.parties = parties;  4     this.count = parties;  5     this.barrierCommand = barrierAction;  6 }  7 
 8 public CyclicBarrier(int parties) {  9     this(parties, null); 10 }

CyclicBarrier提供了兩種構造函數,沒有指定barrierCommand的構造函數是調用第二個構造函數實現的。第二個構造函數有兩個參數:parties和barrierAction,分別用來初始化成員parties和barrierCommand。注意,parties必須大於0,否則會拋出IllegalArgumentException。

 

await()方法

1 public int await() throws InterruptedException, BrokenBarrierException {
2     try {
3         return dowait(false, 0L);
4     } catch (TimeoutException toe) {
5      throw new Error(toe); // cannot happen;
6     }
7 }

await方法是由調用dowait方法實現的,兩個參數分別代表是否定時等待和等待的時長。

 

doawait()方法

 1     private int dowait(boolean timed, long nanos)  2             throws InterruptedException, BrokenBarrierException, TimeoutException {  3         final ReentrantLock lock = this.lock;  4  lock.lock();  5         try {  6             final Generation g = generation;  7 
 8             //小概率事件:該線程在等待鎖的過程中,barrier被破壞
 9             if (g.broken) 10                 throw new BrokenBarrierException(); 11 
12             //小概率事件:該線程在等待鎖的過程中被中斷
13             if (Thread.interrupted()) { 14  breakBarrier(); 15                 throw new InterruptedException(); 16  } 17 
18            int index = --count; 19            //當有parties個線程到達barrier
20            if (index == 0) {  // tripped
21                 boolean ranAction = false; 22                 try { 23                    final Runnable command = barrierCommand; 24                    //如果設置了barrierCommand,令最后到達的barrier的線程執行它
25                    if (command != null) 26  command.run(); 27                     ranAction = true; 28  nextGeneration(); 29                     return 0; 30                } finally { 31                     //注意:當執行barrierCommand出現異常時,ranAction派上用場
32                     if (!ranAction) 33  breakBarrier(); 34  } 35  } 36 
37             // loop until tripped, broken, interrupted, or timed out
38             for (;;) { 39                 try { 40                     if (!timed) 41  trip.await(); 42                     else if (nanos > 0L) 43                         //注意:nanos值標識了是否超時,后續用這個nanos值判斷是否breakBarrier
44                         nanos = trip.awaitNanos(nanos); 45                 } catch (InterruptedException ie) { 46                     if (g == generation && ! g.broken) { 47  breakBarrier(); 48                         throw ie; 49                     } else { 50                         //小概率事件:該線程被中斷,進入鎖等待隊列 51                         //在等待過程中,另一個線程更新或破壞了generation 52                         //當該線程獲取鎖之后,應重置interrupt標志而不是拋出異常 53                         //原因在於:它中斷的太晚了,generation已更新或破壞,它拋出InterruptedException的時機已經過去, 54                         //兩種情況: 55                         //①g被破壞:已有一個線程拋出InterruptedException(只能由第一個拋),與它同時等待的都拋BrokenBarrierException(后續檢查broken標志會拋)。 56                         //②g被更新:此時拋異常沒意義(后續檢查g更新后會return index),這里重置interrupt標志,讓線程繼續執行,讓這個標志由上層處理
57  Thread.currentThread().interrupt(); 58  } 59  } 60 
61                 //barrier被破壞,拋出異常
62                 if (g.broken) 63                     throw new BrokenBarrierException(); 64                 
65                 //barrier正常進入下一循環,上一代await的線程繼續執行
66                 if (g != generation) 67                     return index; 68                 
69                 //只要有一個超時,就breakBarrier,后續線程拋的就是barrier損壞異常
70                 if (timed && nanos <= 0L) { 71  breakBarrier(); 72                     throw new TimeoutException(); 73  } 74  } 75         } finally { 76  lock.unlock(); 77  } 78     }

dowait方法是CyclicBarrier的精華。應該重點來理解。

方法開頭首先申請鎖,然后做了兩個判斷:g.broken和Thread.interrupted(),這兩個判斷是分別處理兩種小概率的事件:①該線程在等待鎖的過程中,barrier被破壞②該線程在等待鎖的過程中被中斷。這兩個事件應拋出相應的異常。接下來dowait方法修改了令count減1,如果此時count減為0,說明已經有parties個線程到達barrier,這時由最后到達barrier的線程去執行barrierCommand。注意,這里設置了一個布爾值ranAction,作用是來標識barrierCommand是否被正確執行完畢,如果執行失敗,finally中會執行breakBarrier操作。如果count尚未減為0,則在Condition對象trip上執行await操作,注意:這里有一個InterruptedException的catch子句。當前線程在await中被中斷時,會拋出InterruptedException,這時候如果g==generation&&!g.broken的話,我們執行breakBarrier操作,同時拋出這個異常;如果g!=generation或者g.broken==true的話,我們的操作是重置interrupt標志而不是拋出這個異常。這么做的原因我們分兩種情況討論:

①g被破壞,這也是一個小概率事件,當前線程被中斷后進入鎖等待隊列,此時另一個線程由於某種原因(超時或者被中斷)在他之前獲取了鎖並執行了breakBarrier方法,那么當前線程持有鎖之后就不應再拋InterruptedException,邏輯上應該處理barrier被破壞事件,事實上在后續g.broken的檢查中,他會拋出一個BrokenBarrierException。而當前的InterruptedException被我們捕獲卻沒有做出處理,所以執行interrupt方法重置中斷標志,交由上層程序處理。

②g被更新:說明當前線程在即將完成等待之際被中斷,此時拋異常沒意義(后續檢查g更新后會return index),這里重置interrupt標志,讓線程繼續執行,讓這個標志由上層處理。

后續對g.broken和g!=generation的判斷,分表代表了被喚醒線程(非最后一個到達barrier的線程,也不是被中斷或第一個超時的線程)的兩種退出方法的方式:第一種是以barrier被破壞告終(然后拋異常),第二個是barrier等到parties個線程,壽終正寢(返回該線程的到達次序index)。

最后一個if是第一個超時線程執行breakBarrier操作並跑出異常。最后finally子句要釋放鎖。

至此,整個doawait方法流程就分析完畢了,我們可以發現,在barrier上等待的線程,如果以拋異常結束的話,只有第一個線程會拋InterruptedException或TimeoutException並執行breakBarrier操作,其他等待線程只能拋BrokenBarrierException,邏輯上這也是合理的:一個barrier只能因超時或中斷被破壞一次。

 1 private void nextGeneration() {  2  trip.signalAll();  3     count = parties;  4     generation = new Generation();  5 }  6 
 7 private void breakBarrier() {  8     generation.broken = true;  9     count = parties; 10  trip.signalAll(); 11 }

doawait方法中用到的nextGeneration方法將所有等待線程喚醒,更新generation對象,復位count,進入下一輪任務。breakBarrier方法將generation狀態值為broken,復位count(這個復位看上去沒有用,但實際上,在broken之后reset之前,如果調用getNumberWaiting方法查看等待線程數的話,復位count是合理的),並喚醒所有等待線程。在調用reset更新generation之前,barrier將處於不可用狀態。

 

reset()方法

 1 public void reset() {  2     final ReentrantLock lock = this.lock;  3  lock.lock();  4     try {  5         breakBarrier();   // break the current generation
 6         nextGeneration(); // start a new generation
 7     } finally {  8  lock.unlock();  9  } 10 }

reset方法先break當執行breakBarrier操作(如果有線程在barrier上等待,調用reset會導致BrokenBarrierException),再更新generation對象。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM