CyclicBarrier 柵欄 原理,應用場景


柵欄類似於閉鎖,它能阻塞一組線程直到某個事件發生。

柵欄與閉鎖的關鍵區別 CyclicBarrier和CountDownLatch的區別

在於,所有線程必須同時到達柵欄位置,才能繼續執行。閉鎖用於等待事件,而柵欄用於等待其他線程。

我自己寫了幾個例子,加上自己的理解總結出幾個不同。

1. CyclicBarrier 方法多,可以用reset()方法來重置CyclicBarrier,讓柵欄可以反復用。而CountDownLatch如果count變為0了,那么只能保持在這個0的最終狀態,不能重新再用。

2. CyclicBarrier 是讓一組線程等待某個事件發生,如果發生了,這組線程可以繼續執行;CountDownLatch是一個線程或多個線程等待一組線程執行完畢。不同的點就在於當count變為0之后,CyclicBarrier是讓這組線程不再阻塞,而繼續執行;而CountDownLatch是讓等待的線程不阻塞,繼續執行。

 下面是CyclicBarrier例子。

有三個類。下面是一個開會的例子。首先是組員到達會議室。等到所有組員都到了之后,領導才開始開會。

MeetingLeaderTask:  領導線程。

OpenMeetingTask, 組員線程。

TestOpenMeeting 測試線程

package com.citi.test.mutiplethread.demo5;

public class MeetingLeaderTask implements Runnable {
    @Override
    public void run() {
        System.out.println("**********領導開始開會***********");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
View Code
package com.citi.test.mutiplethread.demo5;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class OpenMeetingTask implements Runnable {
    private final CyclicBarrier barrier;
    private final String name;
    private final int arriveTime;

    public OpenMeetingTask(CyclicBarrier barrier,String name,int arriveTime) {
        this.barrier=barrier;
        this.name=name;
        this.arriveTime=arriveTime;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(arriveTime*1000);
            System.out.println(name+"到達會議室");
            barrier.await();
            System.out.println(name+"開始開會。。。");
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
View Code
package com.citi.test.mutiplethread.demo5;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestOpenMeeting {
    public static void main(String[] args) {
        CyclicBarrier barrier=new CyclicBarrier(3,new MeetingLeaderTask());
//        Executor executor=Executors.newFixedThreadPool(3);
        ExecutorService executor=Executors.newFixedThreadPool(3);
        executor.execute(new OpenMeetingTask(barrier,"C羅", 5));
        executor.execute(new OpenMeetingTask(barrier,"小羅", 3));
        executor.execute(new OpenMeetingTask(barrier,"卡卡", 1));
        executor.shutdown();
    }
}
View Code

下面是代碼原理 。主要講解幾個重要方法,還有成員變量的意義。

  1 /**
  2  * A synchronization aid that allows a set of threads to all wait for
  3  * each other to reach a common barrier point.  CyclicBarriers are
  4  * useful in programs involving a fixed sized party of threads that
  5  * must occasionally wait for each other. The barrier is called
  6  * <em>cyclic</em> because it can be re-used after the waiting threads
  7  * are released.
  8  循環屏障是一個同步工具類,允許一系列的線程互相等待直到到達一個公共的屏障點。
  9  柵欄在涉及固定大小的線程組必須偶爾互相等待的程序中很有用。
 10  屏障被叫做循環的是因為它可以在等待的線程被釋放之后反復利用。
 11  *
 12  * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
 13  * that is run once per barrier point, after the last thread in the party
 14  * arrives, but before any threads are released.
 15  * This <em>barrier action</em> is useful
 16  * for updating shared-state before any of the parties continue.
 17  *
 18  * <p><b>Sample usage:</b> Here is an example of
 19  *  using a barrier in a parallel decomposition design:
 20  * <pre>
 21  * class Solver {
 22  *   final int N;
 23  *   final float[][] data;
 24  *   final CyclicBarrier barrier;
 25  *
 26  *   class Worker implements Runnable {
 27  *     int myRow;
 28  *     Worker(int row) { myRow = row; }
 29  *     public void run() {
 30  *       while (!done()) {
 31  *         processRow(myRow);
 32  *
 33  *         try {
 34  *           barrier.await();
 35  *         } catch (InterruptedException ex) {
 36  *           return;
 37  *         } catch (BrokenBarrierException ex) {
 38  *           return;
 39  *         }
 40  *       }
 41  *     }
 42  *   }
 43  *
 44  *   public Solver(float[][] matrix) {
 45  *     data = matrix;
 46  *     N = matrix.length;
 47  *     barrier = new CyclicBarrier(N,
 48  *                                 new Runnable() {
 49  *                                   public void run() {
 50  *                                     mergeRows(...);
 51  *                                   }
 52  *                                 });
 53  *     for (int i = 0; i < N; ++i)
 54  *       new Thread(new Worker(i)).start();
 55  *
 56  *     waitUntilDone();
 57  *   }
 58  * }
 59  * </pre>
 60  * Here, each worker thread processes a row of the matrix then waits at the
 61  * barrier until all rows have been processed. When all rows are processed
 62  * the supplied {@link Runnable} barrier action is executed and merges the
 63  * rows. If the merger
 64  * determines that a solution has been found then <tt>done()</tt> will return
 65  * <tt>true</tt> and each worker will terminate.
 66  *
 67  * <p>If the barrier action does not rely on the parties being suspended when
 68  * it is executed, then any of the threads in the party could execute that
 69  * action when it is released. To facilitate this, each invocation of
 70  * {@link #await} returns the arrival index of that thread at the barrier.
 71  * You can then choose which thread should execute the barrier action, for
 72  * example:
 73  * <pre>  if (barrier.await() == 0) {
 74  *     // log the completion of this iteration
 75  *   }</pre>
 76  *
 77  * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
 78  * for failed synchronization attempts: If a thread leaves a barrier
 79  * point prematurely because of interruption, failure, or timeout, all
 80  * other threads waiting at that barrier point will also leave
 81  * abnormally via {@link BrokenBarrierException} (or
 82  * {@link InterruptedException} if they too were interrupted at about
 83  * the same time).
 84  *
 85  * <p>Memory consistency effects: Actions in a thread prior to calling
 86  * {@code await()}
 87  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 88  * actions that are part of the barrier action, which in turn
 89  * <i>happen-before</i> actions following a successful return from the
 90  * corresponding {@code await()} in other threads.
 91  *
 92  * @since 1.5
 93  * @see CountDownLatch
 94  *
 95  * @author Doug Lea
 96  */
 97  public class CyclicBarrier {
 98      /**
 99      * Each use of the barrier is represented as a generation instance.
100      * The generation changes whenever the barrier is tripped, or
101      * is reset. There can be many generations associated with threads
102      * using the barrier - due to the non-deterministic way the lock
103      * may be allocated to waiting threads - but only one of these
104      * can be active at a time (the one to which <tt>count</tt> applies)
105      * and all the rest are either broken or tripped.
106      * There need not be an active generation if there has been a break
107      * but no subsequent reset.
108      每用一次屏障就意味着建立一個代的實例。 這個意思我理解的是 每一次所有線程都到達屏障點了的這個過程就對應一個代的實例。
109      世代會改變無論是所有線程到達屏障點或者被重置。
110      在線程調用屏障時會有很多代關聯。由於鎖會以不確定的方式被分配來等待線程,但是只有這些中的一個在一次是激活的。並且所有剩下的不是在broken就是在阻塞。如果有一個break,沒有后續重置,則不必有一個活躍的世代。
111      
112      
113      */
114     private static class Generation {
115         boolean broken = false;
116     }
117 
118     
119     /** The lock for guarding barrier entry 
120     可重入鎖。
121     */
122     private final ReentrantLock lock = new ReentrantLock();
123     /** Condition to wait on until tripped
124         線程攔截器*/
125     private final Condition trip = lock.newCondition();
126     /** The number of parties
127      要攔截線程的數量
128     */
129     private final int parties;
130     /** The command to run when tripped 
131     當所有線程到達屏障點時,要執行的線程。
132     */
133     private final Runnable barrierCommand;
134     /** The current generation 當前代,是柵欄的內部類,有一個broken屬性。表示broken的狀態
135     什么時候這個狀態會被設置成true呢, 
136     通過看源碼可以看到:
137     1.在線程被中斷的時候,
138     2.在調用reset方法時
139     
140     歸根結底就是通過generation來標志線程是否被中斷或者是否被重置,如果被中斷了或者被重置了,那么線程在調用await方法
141     時,就會進行相應的邏輯處理,會拋出BrokenBarrierException這個異常。
142     */
143     private Generation generation = new Generation();
144 
145     /**
146      * Number of parties still waiting. Counts down from parties to 0
147      * on each generation.  It is reset to parties on each new
148      * generation or when broken.
149     處於等待到屏障點的線程數量。每當有一個線程到達屏障點時,count值就會減1,直到等於0.
150     當一次新的運算開始后,count值會被重置為parties。
151     
152     這個類有一個方法getNumberWaiting, 這個方法是返回到達屏障點處於等待的線程數量。
153     而這個count 是表示等待到達屏障點的線程數量,就是沒有到達屏障點的線程數量。
154      */
155     private int count;
156      
157 /**
158  * Waits until all {@linkplain #getParties parties} have invoked
159  * <tt>await</tt> on this barrier.
160  * 等待直到所有的線程都調用了這個屏障的await方法。
161  * <p>If the current thread is not the last to arrive then it is
162  * disabled for thread scheduling purposes and lies dormant until
163  * one of the following things happens:
164  如果當前線程不是最后一個到達的,那么它會為了線程調度和休眠變得不可用直到下面中的一個條件發生:
165  * <ul>
166  * <li>The last thread arrives; or
167  * <li>Some other thread {@linkplain Thread#interrupt interrupts}
168  * the current thread; or
169  * <li>Some other thread {@linkplain Thread#interrupt interrupts}
170  * one of the other waiting threads; or
171  * <li>Some other thread times out while waiting for barrier; or
172  * <li>Some other thread invokes {@link #reset} on this barrier.
173  * </ul>
174  *最后一個線程到達,或者其他線程打斷當前線程;
175  或者其他線程打斷等待中的線程中的一個。
176  或者其他線程在等待屏障的時候超時了。
177  或者其他線程調用了屏障的reset方法。
178  * <p>If the current thread:
179  * <ul>
180  * <li>has its interrupted status set on entry to this method; or
181  * <li>is {@linkplain Thread#interrupt interrupted} while waiting
182  * </ul>
183  * then {@link InterruptedException} is thrown and the current thread's
184  * interrupted status is cleared.
185  *如果當前線程在進入這個方法時候有打斷狀態;或者當等待的時候被打斷,則拋出InterruptedException
186  中斷異常,並且當前線程的中斷狀態被清空。
187  * <p>If the barrier is {@link #reset} while any thread is waiting,
188  * or if the barrier {@linkplain #isBroken is broken} when
189  * <tt>await</tt> is invoked, or while any thread is waiting, then
190  * {@link BrokenBarrierException} is thrown.
191  *如果屏障當任何線程在等待的時候被重置,
192  或者await方法被調用時,屏障被破壞掉。
193  或者當任何線程等待時 會拋出中斷異常。
194  * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
195  * then all other waiting threads will throw
196  * {@link BrokenBarrierException} and the barrier is placed in the broken
197  * state.
198  *如果任何線程在等待的時候被中斷,所有其他等待的線程會拋出中斷異常,並且屏障會變成broken狀態。
199  * <p>If the current thread is the last thread to arrive, and a
200  * non-null barrier action was supplied in the constructor, then the
201  * current thread runs the action before allowing the other threads to
202  * continue.
203  如果當前線程是最后一個到達的線程,並且一個非空的屏障操作在構造方法中被提供,
204  那么在允許其他線程繼續執行之前,當前線程會執行這個動作。
205  * If an exception occurs during the barrier action then that exception
206  * will be propagated in the current thread and the barrier is placed in
207  * the broken state.
208  *如果在屏障等待期間有異常發生,這個異常會在當前線程中傳播,並且屏障會處於broken狀態。
209  * @return the arrival index of the current thread, where index
210  *         <tt>{@link #getParties()} - 1</tt> indicates the first
211  *         to arrive and zero indicates the last to arrive
212  * @throws InterruptedException if the current thread was interrupted
213  *         while waiting
214  * @throws BrokenBarrierException if <em>another</em> thread was
215  *         interrupted or timed out while the current thread was
216  *         waiting, or the barrier was reset, or the barrier was
217  *         broken when {@code await} was called, or the barrier
218  *         action (if present) failed due an exception.
219  */
220 public int await() throws InterruptedException, BrokenBarrierException {
221     try {
222         return dowait(false, 0L);
223     } catch (TimeoutException toe) {
224         throw new Error(toe); // cannot happen;
225     }
226 }
227 /**
228 這個方法是比較重要的方法。 會涵蓋各種的策略。
229  * Main barrier code, covering the various policies.
230  */
231 private int dowait(boolean timed, long nanos)
232     throws InterruptedException, BrokenBarrierException,
233            TimeoutException {
234     //獲取獨占鎖。
235     final ReentrantLock lock = this.lock;
236     lock.lock();
237     try {
238         //取得當前代的引用
239         final Generation g = generation;
240         //如果該代損壞了,拋出異常。
241         if (g.broken)
242             throw new BrokenBarrierException();
243         //如果被打斷,拋出異常。
244         if (Thread.interrupted()) {
245             //則將損壞狀態設置為true,並通知其他阻塞在此柵欄上的線程。
246             breakBarrier();
247             throw new InterruptedException();
248         }
249 
250        //這個count表示等待的線程數,假如第一個線程進來,就把count減減,賦值給index。如果index不為0,表示不是最后一個
251        //線程進來,則走下面的自旋過程。如果index為0時,表示最后一個線程進來,那么這時候需要執行if里面的過程。
252        // index==0時, 
253        //如果是第一個線程進入到這個方法。假設在調用時候傳入10,則這個index就是9.往下看,可以看到如果
254        int index = --count;
255        if (index == 0) {  // tripped
256             //是否運行到達屏障點時要調用的線程,即barrierCommand。
257            boolean ranAction = false;
258            try {
259                //這個地方barrierCommand表示所有的線程到達屏障點時,要運行的線程。
260                final Runnable command = barrierCommand;
261                //如果在調用cyclicBarrier時,沒有指定到達屏障點要執行的線程,則command為null, 就不用執行了,反之,需要執行這個線程。
262                if (command != null)
263                    command.run();
264                //無論如何都設置為true。但是這個地方我有點沒看明白,如果這個地方無論如何都設置為true的話,那么在finally中如果這個值
265                //為false時候,會執行breakBarrier方法。那么什么時候才能為false呢?
266                ranAction = true;
267                //這個方法是喚醒所有由於condition.await方法等待的線程。
268                //重置count為parties,就是把等待線程的數量又設置為柵欄能屏障線程初始數量。
269                //將generation重新創建。
270                nextGeneration();
271                return 0;
272            } finally {
273                //最終執行下面的邏輯。
274                //判斷是否運行了barrierCommand線程。如果是,則
275                if (!ranAction)
276                    //設置generation的broken為true,重置count數量,喚醒所有線程。
277                    breakBarrier();
278            }
279        }
280 
281         // loop until tripped, broken, interrupted, or timed out
282         for (;;) {
283             try {
284                 //如果調用的時候不是調用的有timeout時長的方法時,timed為false,所以走trip.await().
285                 //如果調用的時候是調用有timeout時長的方法,則timed為true,所以走else里面的流程,等待指定時間。
286                 if (!timed)
287                     trip.await();
288                 else if (nanos > 0L)
289                     nanos = trip.awaitNanos(nanos);
290             } catch (InterruptedException ie) {
291                 //如果是當前代,並且broken為false
292                 if (g == generation && ! g.broken) {
293                     breakBarrier();
294                     throw ie;
295                 } else {
296                     // We're about to finish waiting even if we had not
297                     // been interrupted, so this interrupt is deemed to
298                     // "belong" to subsequent execution.
299                     Thread.currentThread().interrupt();
300                 }
301             }
302 
303             if (g.broken)
304                 throw new BrokenBarrierException();
305 
306             if (g != generation)
307                 return index;
308 
309             if (timed && nanos <= 0L) {
310                 breakBarrier();
311                 throw new TimeoutException();
312             }
313         }
314     } finally {
315         lock.unlock();
316     }
317 }
318 
319 /**
320  * Sets current barrier generation as broken and wakes up everyone.
321  破壞柵欄,設置當前代的broken狀態為true,設置等待線程的數量為 並且喚醒所有線程。
322  * Called only while holding lock.
323  */
324 private void breakBarrier() {
325     generation.broken = true;
326     count = parties;
327     trip.signalAll();
328 }
329 
330 /**
331  * Updates state on barrier trip and wakes up everyone.
332  * Called only while holding lock.
333  什么時候調用呢,
334  1.在所有線程到達屏障點之后會調用
335  2. 在調用reset方法時。
336  有什么用處?
337  喚醒所有等待的線程。重置count,創建新的代。
338  */
339 private void nextGeneration() {
340     // signal completion of last generation
341     trip.signalAll();
342     // set up next generation
343     count = parties;
344     generation = new Generation();
345 }
346 
347 /**
348  * Resets the barrier to its initial state.  If any parties are
349  * currently waiting at the barrier, they will return with a
350  * {@link BrokenBarrierException}. Note that resets <em>after</em>
351  * a breakage has occurred for other reasons can be complicated to
352  * carry out; threads need to re-synchronize in some other way,
353  * and choose one to perform the reset.  It may be preferable to
354  * instead create a new barrier for subsequent use.
355  */
356 public void reset() {
357     final ReentrantLock lock = this.lock;
358     lock.lock();
359     try {
360         breakBarrier();   // break the current generation
361         nextGeneration(); // start a new generation
362     } finally {
363         lock.unlock();
364     }
365 }
View Code

 其中對一個地方有疑問。

265                //為false時候,會執行breakBarrier方法。那么什么時候才能為false呢?
這一行代碼,如果有人直到怎么回事,請留下評論。

主要是用ReentrantLock和Condition來實現柵欄的。

參考資料: https://blog.csdn.net/qq_38293564/article/details/80558157

https://www.jianshu.com/p/ddaecc027ba4

https://blog.csdn.net/qq_39241239/article/details/87030142

這個帖子講的很通俗易懂。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM