Java多線程系列--“JUC鎖”10之 CyclicBarrier原理和示例


 

概要

本章介紹JUC包中的CyclicBarrier鎖。內容包括:
CyclicBarrier簡介
CyclicBarrier數據結構

CyclicBarrier源碼分析(基於JDK1.7.0_40)
CyclicBarrier示例

轉載請注明出處:http://www.cnblogs.com/skywang12345/p/3533995.html

 

CyclicBarrier簡介

CyclicBarrier是一個同步輔助類,允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。因為該 barrier 在釋放等待線程后可以重用,所以稱它為循環 的 barrier。

 

注意比較CountDownLatchCyclicBarrier
(01) CountDownLatch的作用是允許1或N個線程等待其他線程完成執行;而CyclicBarrier則是允許N個線程相互等待。
(02) CountDownLatch的計數器無法被重置;CyclicBarrier的計數器可以被重置后使用,因此它被稱為是循環的barrier。


CyclicBarrier函數列表

CyclicBarrier(int parties) 創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,但它不會在啟動 barrier 時執行預定義的操作。 CyclicBarrier(int parties, Runnable barrierAction) 創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,並在啟動 barrier 時執行給定的屏障操作,該操作由最后一個進入 barrier 的線程執行。 int await() 在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待。 int await(long timeout, TimeUnit unit) 在所有參與者都已經在此屏障上調用 await 方法之前將一直等待,或者超出了指定的等待時間。 int getNumberWaiting() 返回當前在屏障處等待的參與者數目。 int getParties() 返回要求啟動此 barrier 的參與者數目。 boolean isBroken() 查詢此屏障是否處於損壞狀態。 void reset() 將屏障重置為其初始狀態。

 

CyclicBarrier數據結構

CyclicBarrier的UML類圖如下:

CyclicBarrier是包含了"ReentrantLock對象lock"和"Condition對象trip",它是通過獨占鎖實現的。下面通過源碼去分析到底是如何實現的。

 

CyclicBarrier源碼分析(基於JDK1.7.0_40)

CyclicBarrier完整源碼(基於JDK1.7.0_40)

 1 /*
 2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.  3  *  4  *  5  *  6  *  7  *  8  *  9  *  10  *  11  *  12  *  13  *  14  *  15  *  16  *  17  *  18  *  19  *  20  *  21  *  22  *  23  */
 24 
 25 /*
 26  *  27  *  28  *  29  *  30  *  31  * Written by Doug Lea with assistance from members of JCP JSR-166  32  * Expert Group and released to the public domain, as explained at  33  * http://creativecommons.org/publicdomain/zero/1.0/
 34  */
 35 
 36 package java.util.concurrent;  37 import java.util.concurrent.locks.*;  38 
 39 /**
 40  * A synchronization aid that allows a set of threads to all wait for  41  * each other to reach a common barrier point. CyclicBarriers are  42  * useful in programs involving a fixed sized party of threads that  43  * must occasionally wait for each other. The barrier is called  44  * <em>cyclic</em> because it can be re-used after the waiting threads  45  * are released.  46  *  47  * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command  48  * that is run once per barrier point, after the last thread in the party  49  * arrives, but before any threads are released.  50  * This <em>barrier action</em> is useful  51  * for updating shared-state before any of the parties continue.  52  *  53  * <p><b>Sample usage:</b> Here is an example of  54  * using a barrier in a parallel decomposition design:  55  * <pre>  56  * class Solver {  57  * final int N;  58  * final float[][] data;  59  * final CyclicBarrier barrier;  60  *  61  * class Worker implements Runnable {  62  * int myRow;  63  * Worker(int row) { myRow = row; }  64  * public void run() {  65  * while (!done()) {  66  * processRow(myRow);  67  *  68  * try {  69  * barrier.await();  70  * } catch (InterruptedException ex) {  71  * return;  72  * } catch (BrokenBarrierException ex) {  73  * return;  74  * }  75  * }  76  * }  77  * }  78  *  79  * public Solver(float[][] matrix) {  80  * data = matrix;  81  * N = matrix.length;  82  * barrier = new CyclicBarrier(N,  83  * new Runnable() {  84  * public void run() {  85  * mergeRows(...);  86  * }  87  * });  88  * for (int i = 0; i < N; ++i)  89  * new Thread(new Worker(i)).start();  90  *  91  * waitUntilDone();  92  * }  93  * }  94  * </pre>  95  * Here, each worker thread processes a row of the matrix then waits at the  96  * barrier until all rows have been processed. When all rows are processed  97  * the supplied {@link Runnable} barrier action is executed and merges the  98  * rows. If the merger  99  * determines that a solution has been found then <tt>done()</tt> will return 100  * <tt>true</tt> and each worker will terminate. 101  * 102  * <p>If the barrier action does not rely on the parties being suspended when 103  * it is executed, then any of the threads in the party could execute that 104  * action when it is released. To facilitate this, each invocation of 105  * {@link #await} returns the arrival index of that thread at the barrier. 106  * You can then choose which thread should execute the barrier action, for 107  * example: 108  * <pre> if (barrier.await() == 0) { 109  * // log the completion of this iteration 110  * }</pre> 111  * 112  * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model 113  * for failed synchronization attempts: If a thread leaves a barrier 114  * point prematurely because of interruption, failure, or timeout, all 115  * other threads waiting at that barrier point will also leave 116  * abnormally via {@link BrokenBarrierException} (or 117  * {@link InterruptedException} if they too were interrupted at about 118  * the same time). 119  * 120  * <p>Memory consistency effects: Actions in a thread prior to calling 121  * {@code await()} 122  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 123  * actions that are part of the barrier action, which in turn 124  * <i>happen-before</i> actions following a successful return from the 125  * corresponding {@code await()} in other threads. 126  * 127  * @since 1.5 128  * @see CountDownLatch 129  * 130  * @author Doug Lea 131  */
132 public class CyclicBarrier { 133     /**
134  * Each use of the barrier is represented as a generation instance. 135  * The generation changes whenever the barrier is tripped, or 136  * is reset. There can be many generations associated with threads 137  * using the barrier - due to the non-deterministic way the lock 138  * may be allocated to waiting threads - but only one of these 139  * can be active at a time (the one to which <tt>count</tt> applies) 140  * and all the rest are either broken or tripped. 141  * There need not be an active generation if there has been a break 142  * but no subsequent reset. 143      */
144     private static class Generation { 145         boolean broken = false; 146  } 147 
148     /** The lock for guarding barrier entry */
149     private final ReentrantLock lock = new ReentrantLock(); 150     /** Condition to wait on until tripped */
151     private final Condition trip = lock.newCondition(); 152     /** The number of parties */
153     private final int parties; 154     /* The command to run when tripped */
155     private final Runnable barrierCommand; 156     /** The current generation */
157     private Generation generation = new Generation(); 158 
159     /**
160  * Number of parties still waiting. Counts down from parties to 0 161  * on each generation. It is reset to parties on each new 162  * generation or when broken. 163      */
164     private int count; 165 
166     /**
167  * Updates state on barrier trip and wakes up everyone. 168  * Called only while holding lock. 169      */
170     private void nextGeneration() { 171         // signal completion of last generation
172  trip.signalAll(); 173         // set up next generation
174         count = parties; 175         generation = new Generation(); 176  } 177 
178     /**
179  * Sets current barrier generation as broken and wakes up everyone. 180  * Called only while holding lock. 181      */
182     private void breakBarrier() { 183         generation.broken = true; 184         count = parties; 185  trip.signalAll(); 186  } 187 
188     /**
189  * Main barrier code, covering the various policies. 190      */
191     private int dowait(boolean timed, long nanos) 192         throws InterruptedException, BrokenBarrierException, 193  TimeoutException { 194         final ReentrantLock lock = this.lock; 195  lock.lock(); 196         try { 197             final Generation g = generation; 198 
199             if (g.broken) 200                 throw new BrokenBarrierException(); 201 
202             if (Thread.interrupted()) { 203  breakBarrier(); 204                 throw new InterruptedException(); 205  } 206 
207            int index = --count; 208            if (index == 0) {  // tripped
209                boolean ranAction = false; 210                try { 211                    final Runnable command = barrierCommand; 212                    if (command != null) 213  command.run(); 214                    ranAction = true; 215  nextGeneration(); 216                    return 0; 217                } finally { 218                    if (!ranAction) 219  breakBarrier(); 220  } 221  } 222 
223             // loop until tripped, broken, interrupted, or timed out
224             for (;;) { 225                 try { 226                     if (!timed) 227  trip.await(); 228                     else if (nanos > 0L) 229                         nanos = trip.awaitNanos(nanos); 230                 } catch (InterruptedException ie) { 231                     if (g == generation && ! g.broken) { 232  breakBarrier(); 233                         throw ie; 234                     } else { 235                         // We're about to finish waiting even if we had not 236                         // been interrupted, so this interrupt is deemed to 237                         // "belong" to subsequent execution.
238  Thread.currentThread().interrupt(); 239  } 240  } 241 
242                 if (g.broken) 243                     throw new BrokenBarrierException(); 244 
245                 if (g != generation) 246                     return index; 247 
248                 if (timed && nanos <= 0L) { 249  breakBarrier(); 250                     throw new TimeoutException(); 251  } 252  } 253         } finally { 254  lock.unlock(); 255  } 256  } 257 
258     /**
259  * Creates a new <tt>CyclicBarrier</tt> that will trip when the 260  * given number of parties (threads) are waiting upon it, and which 261  * will execute the given barrier action when the barrier is tripped, 262  * performed by the last thread entering the barrier. 263  * 264  * @param parties the number of threads that must invoke {@link #await} 265  * before the barrier is tripped 266  * @param barrierAction the command to execute when the barrier is 267  * tripped, or {@code null} if there is no action 268  * @throws IllegalArgumentException if {@code parties} is less than 1 269      */
270     public CyclicBarrier(int parties, Runnable barrierAction) { 271         if (parties <= 0) throw new IllegalArgumentException(); 272         this.parties = parties; 273         this.count = parties; 274         this.barrierCommand = barrierAction; 275  } 276 
277     /**
278  * Creates a new <tt>CyclicBarrier</tt> that will trip when the 279  * given number of parties (threads) are waiting upon it, and 280  * does not perform a predefined action when the barrier is tripped. 281  * 282  * @param parties the number of threads that must invoke {@link #await} 283  * before the barrier is tripped 284  * @throws IllegalArgumentException if {@code parties} is less than 1 285      */
286     public CyclicBarrier(int parties) { 287         this(parties, null); 288  } 289 
290     /**
291  * Returns the number of parties required to trip this barrier. 292  * 293  * @return the number of parties required to trip this barrier 294      */
295     public int getParties() { 296         return parties; 297  } 298 
299     /**
300  * Waits until all {@linkplain #getParties parties} have invoked 301  * <tt>await</tt> on this barrier. 302  * 303  * <p>If the current thread is not the last to arrive then it is 304  * disabled for thread scheduling purposes and lies dormant until 305  * one of the following things happens: 306  * <ul> 307  * <li>The last thread arrives; or 308  * <li>Some other thread {@linkplain Thread#interrupt interrupts} 309  * the current thread; or 310  * <li>Some other thread {@linkplain Thread#interrupt interrupts} 311  * one of the other waiting threads; or 312  * <li>Some other thread times out while waiting for barrier; or 313  * <li>Some other thread invokes {@link #reset} on this barrier. 314  * </ul> 315  * 316  * <p>If the current thread: 317  * <ul> 318  * <li>has its interrupted status set on entry to this method; or 319  * <li>is {@linkplain Thread#interrupt interrupted} while waiting 320  * </ul> 321  * then {@link InterruptedException} is thrown and the current thread's 322  * interrupted status is cleared. 323  * 324  * <p>If the barrier is {@link #reset} while any thread is waiting, 325  * or if the barrier {@linkplain #isBroken is broken} when 326  * <tt>await</tt> is invoked, or while any thread is waiting, then 327  * {@link BrokenBarrierException} is thrown. 328  * 329  * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting, 330  * then all other waiting threads will throw 331  * {@link BrokenBarrierException} and the barrier is placed in the broken 332  * state. 333  * 334  * <p>If the current thread is the last thread to arrive, and a 335  * non-null barrier action was supplied in the constructor, then the 336  * current thread runs the action before allowing the other threads to 337  * continue. 338  * If an exception occurs during the barrier action then that exception 339  * will be propagated in the current thread and the barrier is placed in 340  * the broken state. 341  * 342  * @return the arrival index of the current thread, where index 343  * <tt>{@link #getParties()} - 1</tt> indicates the first 344  * to arrive and zero indicates the last to arrive 345  * @throws InterruptedException if the current thread was interrupted 346  * while waiting 347  * @throws BrokenBarrierException if <em>another</em> thread was 348  * interrupted or timed out while the current thread was 349  * waiting, or the barrier was reset, or the barrier was 350  * broken when {@code await} was called, or the barrier 351  * action (if present) failed due an exception. 352      */
353     public int await() throws InterruptedException, BrokenBarrierException { 354         try { 355             return dowait(false, 0L); 356         } catch (TimeoutException toe) { 357             throw new Error(toe); // cannot happen;
358  } 359  } 360 
361     /**
362  * Waits until all {@linkplain #getParties parties} have invoked 363  * <tt>await</tt> on this barrier, or the specified waiting time elapses. 364  * 365  * <p>If the current thread is not the last to arrive then it is 366  * disabled for thread scheduling purposes and lies dormant until 367  * one of the following things happens: 368  * <ul> 369  * <li>The last thread arrives; or 370  * <li>The specified timeout elapses; or 371  * <li>Some other thread {@linkplain Thread#interrupt interrupts} 372  * the current thread; or 373  * <li>Some other thread {@linkplain Thread#interrupt interrupts} 374  * one of the other waiting threads; or 375  * <li>Some other thread times out while waiting for barrier; or 376  * <li>Some other thread invokes {@link #reset} on this barrier. 377  * </ul> 378  * 379  * <p>If the current thread: 380  * <ul> 381  * <li>has its interrupted status set on entry to this method; or 382  * <li>is {@linkplain Thread#interrupt interrupted} while waiting 383  * </ul> 384  * then {@link InterruptedException} is thrown and the current thread's 385  * interrupted status is cleared. 386  * 387  * <p>If the specified waiting time elapses then {@link TimeoutException} 388  * is thrown. If the time is less than or equal to zero, the 389  * method will not wait at all. 390  * 391  * <p>If the barrier is {@link #reset} while any thread is waiting, 392  * or if the barrier {@linkplain #isBroken is broken} when 393  * <tt>await</tt> is invoked, or while any thread is waiting, then 394  * {@link BrokenBarrierException} is thrown. 395  * 396  * <p>If any thread is {@linkplain Thread#interrupt interrupted} while 397  * waiting, then all other waiting threads will throw {@link
398  * BrokenBarrierException} and the barrier is placed in the broken 399  * state. 400  * 401  * <p>If the current thread is the last thread to arrive, and a 402  * non-null barrier action was supplied in the constructor, then the 403  * current thread runs the action before allowing the other threads to 404  * continue. 405  * If an exception occurs during the barrier action then that exception 406  * will be propagated in the current thread and the barrier is placed in 407  * the broken state. 408  * 409  * @param timeout the time to wait for the barrier 410  * @param unit the time unit of the timeout parameter 411  * @return the arrival index of the current thread, where index 412  * <tt>{@link #getParties()} - 1</tt> indicates the first 413  * to arrive and zero indicates the last to arrive 414  * @throws InterruptedException if the current thread was interrupted 415  * while waiting 416  * @throws TimeoutException if the specified timeout elapses 417  * @throws BrokenBarrierException if <em>another</em> thread was 418  * interrupted or timed out while the current thread was 419  * waiting, or the barrier was reset, or the barrier was broken 420  * when {@code await} was called, or the barrier action (if 421  * present) failed due an exception 422      */
423     public int await(long timeout, TimeUnit unit) 424         throws InterruptedException, 425  BrokenBarrierException, 426  TimeoutException { 427         return dowait(true, unit.toNanos(timeout)); 428  } 429 
430     /**
431  * Queries if this barrier is in a broken state. 432  * 433  * @return {@code true} if one or more parties broke out of this 434  * barrier due to interruption or timeout since 435  * construction or the last reset, or a barrier action 436  * failed due to an exception; {@code false} otherwise. 437      */
438     public boolean isBroken() { 439         final ReentrantLock lock = this.lock; 440  lock.lock(); 441         try { 442             return generation.broken; 443         } finally { 444  lock.unlock(); 445  } 446  } 447 
448     /**
449  * Resets the barrier to its initial state. If any parties are 450  * currently waiting at the barrier, they will return with a 451  * {@link BrokenBarrierException}. Note that resets <em>after</em> 452  * a breakage has occurred for other reasons can be complicated to 453  * carry out; threads need to re-synchronize in some other way, 454  * and choose one to perform the reset. It may be preferable to 455  * instead create a new barrier for subsequent use. 456      */
457     public void reset() { 458         final ReentrantLock lock = this.lock; 459  lock.lock(); 460         try { 461             breakBarrier();   // break the current generation
462             nextGeneration(); // start a new generation
463         } finally { 464  lock.unlock(); 465  } 466  } 467 
468     /**
469  * Returns the number of parties currently waiting at the barrier. 470  * This method is primarily useful for debugging and assertions. 471  * 472  * @return the number of parties currently blocked in {@link #await} 473      */
474     public int getNumberWaiting() { 475         final ReentrantLock lock = this.lock; 476  lock.lock(); 477         try { 478             return parties - count; 479         } finally { 480  lock.unlock(); 481  } 482  } 483 }
View Code

CyclicBarrier是通過ReentrantLock(獨占鎖)和Condition來實現的。下面,我們分析CyclicBarrier中3個核心函數: 構造函數, await()作出分析。

 

1. 構造函數

CyclicBarrier的構造函數共2個:CyclicBarrier 和 CyclicBarrier(int parties, Runnable barrierAction)。第1個構造函數是調用第2個構造函數來實現的,下面第2個構造函數的源碼。

public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); // parties表示“必須同時到達barrier的線程個數”。
    this.parties = parties; // count表示“處在等待狀態的線程個數”。
    this.count = parties; // barrierCommand表示“parties個線程到達barrier時,會執行的動作”。
    this.barrierCommand = barrierAction; }

 

2. 等待函數

CyclicBarrier.java中await()方法如下:

public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen;
 } }

說明:await()是通過dowait()實現的。

 

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 獲取“獨占鎖(lock)”
 lock.lock(); try { // 保存“當前的generation”
        final Generation g = generation; // 若“當前generation已損壞”,則拋出異常。
        if (g.broken) throw new BrokenBarrierException(); // 如果當前線程被中斷,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程。
        if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 將“count計數器”-1
       int index = --count; // 如果index=0,則意味着“有parties個線程到達barrier”。
       if (index == 0) {  // tripped
           boolean ranAction = false; try { // 如果barrierCommand不為null,則執行該動作。
               final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 喚醒所有等待線程,並更新generation。
 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // 當前線程一直阻塞,直到“有parties個線程到達barrier” 或 “當前線程被中斷” 或 “超時”這3者之一發生, // 當前線程才繼續執行。
        for (;;) { try { // 如果不是“超時等待”,則調用awati()進行等待;否則,調用awaitNanos()進行等待。
                if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 如果等待過程中,線程被中斷,則執行下面的函數。
                if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } // 如果“當前generation已經損壞”,則拋出異常。
            if (g.broken) throw new BrokenBarrierException(); // 如果“generation已經換代”,則返回index。
            if (g != generation) return index; // 如果是“超時等待”,並且時間已到,則通過breakBarrier()終止CyclicBarrier,喚醒CyclicBarrier中所有等待線程。
            if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 釋放“獨占鎖(lock)”
 lock.unlock(); } }

說明:dowait()的作用就是讓當前線程阻塞,直到“有parties個線程到達barrier” 或 “當前線程被中斷” 或 “超時”這3者之一發生,當前線程才繼續執行。
(01) generation是CyclicBarrier的一個成員遍歷,它的定義如下:

private Generation generation = new Generation(); private static class Generation { boolean broken = false; }

在CyclicBarrier中,同一批的線程屬於同一代,即同一個Generation;CyclicBarrier中通過generation對象,記錄屬於哪一代。
當有parties個線程到達barrier,generation就會被更新換代。

(02) 如果當前線程被中斷,即Thread.interrupted()為true;則通過breakBarrier()終止CyclicBarrier。breakBarrier()的源碼如下:

private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }

breakBarrier()會設置當前中斷標記broken為true,意味着“將該Generation中斷”;同時,設置count=parties,即重新初始化count;最后,通過signalAll()喚醒CyclicBarrier上所有的等待線程。

(03) 將“count計數器”-1,即--count;然后判斷是不是“有parties個線程到達barrier”,即index是不是為0。
當index=0時,如果barrierCommand不為null,則執行該barrierCommand,barrierCommand就是我們創建CyclicBarrier時,傳入的Runnable對象。然后,調用nextGeneration()進行換代工作,nextGeneration()的源碼如下:

private void nextGeneration() { trip.signalAll(); count = parties; generation = new Generation(); }

首先,它會調用signalAll()喚醒CyclicBarrier上所有的等待線程;接着,重新初始化count;最后,更新generation的值。

(04) 在for(;;)循環中。timed是用來表示當前是不是“超時等待”線程。如果不是,則通過trip.await()進行等待;否則,調用awaitNanos()進行超時等待。

 

CyclicBarrier的使用示例

示例1
新建5個線程,這5個線程達到一定的條件時,它們才繼續往后運行。

 1 import java.util.concurrent.CyclicBarrier;  2 import java.util.concurrent.BrokenBarrierException;  3 
 4 public class CyclicBarrierTest1 {  5 
 6     private static int SIZE = 5;  7     private static CyclicBarrier cb;  8     public static void main(String[] args) {  9 
10         cb = new CyclicBarrier(SIZE); 11 
12         // 新建5個任務
13         for(int i=0; i<SIZE; i++) 14             new InnerThread().start(); 15  } 16 
17     static class InnerThread extends Thread{ 18         public void run() { 19             try { 20                 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier."); 21 
22                 // 將cb的參與者數量加1
23  cb.await(); 24 
25                 // cb的參與者數量等於5時,才繼續往后執行
26                 System.out.println(Thread.currentThread().getName() + " continued."); 27             } catch (BrokenBarrierException e) { 28  e.printStackTrace(); 29             } catch (InterruptedException e) { 30  e.printStackTrace(); 31  } 32  } 33  } 34 }

運行結果

Thread-1 wait for CyclicBarrier. Thread-2 wait for CyclicBarrier. Thread-3 wait for CyclicBarrier. Thread-4 wait for CyclicBarrier. Thread-0 wait for CyclicBarrier. Thread-0 continued. Thread-4 continued. Thread-2 continued. Thread-3 continued. Thread-1 continued.

結果說明:主線程中新建了5個線程,所有的這些線程都調用cb.await()等待。所有這些線程一直等待,直到cb中所有線程都達到barrier時,這些線程才繼續運行!

 

示例2

新建5個線程,當這5個線程達到一定的條件時,執行某項任務。

 1 import java.util.concurrent.CyclicBarrier;  2 import java.util.concurrent.BrokenBarrierException;  3 
 4 public class CyclicBarrierTest2 {  5 
 6     private static int SIZE = 5;  7     private static CyclicBarrier cb;  8     public static void main(String[] args) {  9 
10         cb = new CyclicBarrier(SIZE, new Runnable () { 11             public void run() { 12                 System.out.println("CyclicBarrier's parties is: "+ cb.getParties()); 13  } 14  }); 15 
16         // 新建5個任務
17         for(int i=0; i<SIZE; i++) 18             new InnerThread().start(); 19  } 20 
21     static class InnerThread extends Thread{ 22         public void run() { 23             try { 24                 System.out.println(Thread.currentThread().getName() + " wait for CyclicBarrier."); 25 
26                 // 將cb的參與者數量加1
27  cb.await(); 28 
29                 // cb的參與者數量等於5時,才繼續往后執行
30                 System.out.println(Thread.currentThread().getName() + " continued."); 31             } catch (BrokenBarrierException e) { 32  e.printStackTrace(); 33             } catch (InterruptedException e) { 34  e.printStackTrace(); 35  } 36  } 37  } 38 }

運行結果

Thread-1 wait for CyclicBarrier. Thread-2 wait for CyclicBarrier. Thread-3 wait for CyclicBarrier. Thread-4 wait for CyclicBarrier. Thread-0 wait for CyclicBarrier. CyclicBarrier's parties is: 5
Thread-0 continued. Thread-4 continued. Thread-2 continued. Thread-3 continued. Thread-1 continued.

結果說明:主線程中新建了5個線程,所有的這些線程都調用cb.await()等待。所有這些線程一直等待,直到cb中所有線程都達到barrier時,執行新建cb時注冊的Runnable任務。

 


更多內容

1. Java多線程系列--“JUC鎖”01之 框架 

2. Java多線程系列--“JUC鎖”02之 互斥鎖ReentrantLock 

3. Java多線程系列--“JUC鎖”03之 公平鎖(一) 

4. Java多線程系列--“JUC鎖”04之 公平鎖(二)

5. Java多線程系列--“JUC鎖”05之 非公平鎖

6. Java多線程系列--“JUC鎖”06之 Condition條件

7. Java多線程系列--“JUC鎖”07之 LockSupport 

8. Java多線程系列--“JUC鎖”08之 共享鎖和ReentrantReadWriteLock

9. Java多線程系列--“JUC鎖”09之 CountDownLatch原理和示例

10. Java多線程系列目錄(共xx篇)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM