栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生。
栅栏与闭锁的关键区别 CyclicBarrier和CountDownLatch的区别
在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。
我自己写了几个例子,加上自己的理解总结出几个不同。
1. CyclicBarrier 方法多,可以用reset()方法来重置CyclicBarrier,让栅栏可以反复用。而CountDownLatch如果count变为0了,那么只能保持在这个0的最终状态,不能重新再用。
2. CyclicBarrier 是让一组线程等待某个事件发生,如果发生了,这组线程可以继续执行;CountDownLatch是一个线程或多个线程等待一组线程执行完毕。不同的点就在于当count变为0之后,CyclicBarrier是让这组线程不再阻塞,而继续执行;而CountDownLatch是让等待的线程不阻塞,继续执行。
下面是CyclicBarrier例子。
有三个类。下面是一个开会的例子。首先是组员到达会议室。等到所有组员都到了之后,领导才开始开会。
MeetingLeaderTask: 领导线程。
OpenMeetingTask, 组员线程。
TestOpenMeeting 测试线程

package com.citi.test.mutiplethread.demo5; public class MeetingLeaderTask implements Runnable { @Override public void run() { System.out.println("**********领导开始开会***********"); try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }

package com.citi.test.mutiplethread.demo5; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class OpenMeetingTask implements Runnable { private final CyclicBarrier barrier; private final String name; private final int arriveTime; public OpenMeetingTask(CyclicBarrier barrier,String name,int arriveTime) { this.barrier=barrier; this.name=name; this.arriveTime=arriveTime; } @Override public void run() { try { Thread.sleep(arriveTime*1000); System.out.println(name+"到达会议室"); barrier.await(); System.out.println(name+"开始开会。。。"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }

package com.citi.test.mutiplethread.demo5; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestOpenMeeting { public static void main(String[] args) { CyclicBarrier barrier=new CyclicBarrier(3,new MeetingLeaderTask()); // Executor executor=Executors.newFixedThreadPool(3); ExecutorService executor=Executors.newFixedThreadPool(3); executor.execute(new OpenMeetingTask(barrier,"C罗", 5)); executor.execute(new OpenMeetingTask(barrier,"小罗", 3)); executor.execute(new OpenMeetingTask(barrier,"卡卡", 1)); executor.shutdown(); } }
下面是代码原理 。主要讲解几个重要方法,还有成员变量的意义。

1 /**
2 * A synchronization aid that allows a set of threads to all wait for
3 * each other to reach a common barrier point. CyclicBarriers are
4 * useful in programs involving a fixed sized party of threads that
5 * must occasionally wait for each other. The barrier is called
6 * <em>cyclic</em> because it can be re-used after the waiting threads
7 * are released.
8 循环屏障是一个同步工具类,允许一系列的线程互相等待直到到达一个公共的屏障点。
9 栅栏在涉及固定大小的线程组必须偶尔互相等待的程序中很有用。
10 屏障被叫做循环的是因为它可以在等待的线程被释放之后反复利用。
11 *
12 * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
13 * that is run once per barrier point, after the last thread in the party
14 * arrives, but before any threads are released.
15 * This <em>barrier action</em> is useful
16 * for updating shared-state before any of the parties continue.
17 *
18 * <p><b>Sample usage:</b> Here is an example of
19 * using a barrier in a parallel decomposition design:
20 * <pre>
21 * class Solver {
22 * final int N;
23 * final float[][] data;
24 * final CyclicBarrier barrier;
25 *
26 * class Worker implements Runnable {
27 * int myRow;
28 * Worker(int row) { myRow = row; }
29 * public void run() {
30 * while (!done()) {
31 * processRow(myRow);
32 *
33 * try {
34 * barrier.await();
35 * } catch (InterruptedException ex) {
36 * return;
37 * } catch (BrokenBarrierException ex) {
38 * return;
39 * }
40 * }
41 * }
42 * }
43 *
44 * public Solver(float[][] matrix) {
45 * data = matrix;
46 * N = matrix.length;
47 * barrier = new CyclicBarrier(N,
48 * new Runnable() {
49 * public void run() {
50 * mergeRows(...);
51 * }
52 * });
53 * for (int i = 0; i < N; ++i)
54 * new Thread(new Worker(i)).start();
55 *
56 * waitUntilDone();
57 * }
58 * }
59 * </pre>
60 * Here, each worker thread processes a row of the matrix then waits at the
61 * barrier until all rows have been processed. When all rows are processed
62 * the supplied {@link Runnable} barrier action is executed and merges the
63 * rows. If the merger
64 * determines that a solution has been found then <tt>done()</tt> will return
65 * <tt>true</tt> and each worker will terminate.
66 *
67 * <p>If the barrier action does not rely on the parties being suspended when
68 * it is executed, then any of the threads in the party could execute that
69 * action when it is released. To facilitate this, each invocation of
70 * {@link #await} returns the arrival index of that thread at the barrier.
71 * You can then choose which thread should execute the barrier action, for
72 * example:
73 * <pre> if (barrier.await() == 0) {
74 * // log the completion of this iteration
75 * }</pre>
76 *
77 * <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model
78 * for failed synchronization attempts: If a thread leaves a barrier
79 * point prematurely because of interruption, failure, or timeout, all
80 * other threads waiting at that barrier point will also leave
81 * abnormally via {@link BrokenBarrierException} (or
82 * {@link InterruptedException} if they too were interrupted at about
83 * the same time).
84 *
85 * <p>Memory consistency effects: Actions in a thread prior to calling
86 * {@code await()}
87 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
88 * actions that are part of the barrier action, which in turn
89 * <i>happen-before</i> actions following a successful return from the
90 * corresponding {@code await()} in other threads.
91 *
92 * @since 1.5
93 * @see CountDownLatch
94 *
95 * @author Doug Lea
96 */
97 public class CyclicBarrier {
98 /**
99 * Each use of the barrier is represented as a generation instance.
100 * The generation changes whenever the barrier is tripped, or
101 * is reset. There can be many generations associated with threads
102 * using the barrier - due to the non-deterministic way the lock
103 * may be allocated to waiting threads - but only one of these
104 * can be active at a time (the one to which <tt>count</tt> applies)
105 * and all the rest are either broken or tripped.
106 * There need not be an active generation if there has been a break
107 * but no subsequent reset.
108 每用一次屏障就意味着建立一个代的实例。 这个意思我理解的是 每一次所有线程都到达屏障点了的这个过程就对应一个代的实例。
109 世代会改变无论是所有线程到达屏障点或者被重置。
110 在线程调用屏障时会有很多代关联。由于锁会以不确定的方式被分配来等待线程,但是只有这些中的一个在一次是激活的。并且所有剩下的不是在broken就是在阻塞。如果有一个break,没有后续重置,则不必有一个活跃的世代。
111
112
113 */
114 private static class Generation {
115 boolean broken = false;
116 }
117
118
119 /** The lock for guarding barrier entry
120 可重入锁。
121 */
122 private final ReentrantLock lock = new ReentrantLock();
123 /** Condition to wait on until tripped
124 线程拦截器*/
125 private final Condition trip = lock.newCondition();
126 /** The number of parties
127 要拦截线程的数量
128 */
129 private final int parties;
130 /** The command to run when tripped
131 当所有线程到达屏障点时,要执行的线程。
132 */
133 private final Runnable barrierCommand;
134 /** The current generation 当前代,是栅栏的内部类,有一个broken属性。表示broken的状态
135 什么时候这个状态会被设置成true呢,
136 通过看源码可以看到:
137 1.在线程被中断的时候,
138 2.在调用reset方法时
139
140 归根结底就是通过generation来标志线程是否被中断或者是否被重置,如果被中断了或者被重置了,那么线程在调用await方法
141 时,就会进行相应的逻辑处理,会抛出BrokenBarrierException这个异常。
142 */
143 private Generation generation = new Generation();
144
145 /**
146 * Number of parties still waiting. Counts down from parties to 0
147 * on each generation. It is reset to parties on each new
148 * generation or when broken.
149 处于等待到屏障点的线程数量。每当有一个线程到达屏障点时,count值就会减1,直到等于0.
150 当一次新的运算开始后,count值会被重置为parties。
151
152 这个类有一个方法getNumberWaiting, 这个方法是返回到达屏障点处于等待的线程数量。
153 而这个count 是表示等待到达屏障点的线程数量,就是没有到达屏障点的线程数量。
154 */
155 private int count;
156
157 /**
158 * Waits until all {@linkplain #getParties parties} have invoked
159 * <tt>await</tt> on this barrier.
160 * 等待直到所有的线程都调用了这个屏障的await方法。
161 * <p>If the current thread is not the last to arrive then it is
162 * disabled for thread scheduling purposes and lies dormant until
163 * one of the following things happens:
164 如果当前线程不是最后一个到达的,那么它会为了线程调度和休眠变得不可用直到下面中的一个条件发生:
165 * <ul>
166 * <li>The last thread arrives; or
167 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
168 * the current thread; or
169 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
170 * one of the other waiting threads; or
171 * <li>Some other thread times out while waiting for barrier; or
172 * <li>Some other thread invokes {@link #reset} on this barrier.
173 * </ul>
174 *最后一个线程到达,或者其他线程打断当前线程;
175 或者其他线程打断等待中的线程中的一个。
176 或者其他线程在等待屏障的时候超时了。
177 或者其他线程调用了屏障的reset方法。
178 * <p>If the current thread:
179 * <ul>
180 * <li>has its interrupted status set on entry to this method; or
181 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
182 * </ul>
183 * then {@link InterruptedException} is thrown and the current thread's
184 * interrupted status is cleared.
185 *如果当前线程在进入这个方法时候有打断状态;或者当等待的时候被打断,则抛出InterruptedException
186 中断异常,并且当前线程的中断状态被清空。
187 * <p>If the barrier is {@link #reset} while any thread is waiting,
188 * or if the barrier {@linkplain #isBroken is broken} when
189 * <tt>await</tt> is invoked, or while any thread is waiting, then
190 * {@link BrokenBarrierException} is thrown.
191 *如果屏障当任何线程在等待的时候被重置,
192 或者await方法被调用时,屏障被破坏掉。
193 或者当任何线程等待时 会抛出中断异常。
194 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
195 * then all other waiting threads will throw
196 * {@link BrokenBarrierException} and the barrier is placed in the broken
197 * state.
198 *如果任何线程在等待的时候被中断,所有其他等待的线程会抛出中断异常,并且屏障会变成broken状态。
199 * <p>If the current thread is the last thread to arrive, and a
200 * non-null barrier action was supplied in the constructor, then the
201 * current thread runs the action before allowing the other threads to
202 * continue.
203 如果当前线程是最后一个到达的线程,并且一个非空的屏障操作在构造方法中被提供,
204 那么在允许其他线程继续执行之前,当前线程会执行这个动作。
205 * If an exception occurs during the barrier action then that exception
206 * will be propagated in the current thread and the barrier is placed in
207 * the broken state.
208 *如果在屏障等待期间有异常发生,这个异常会在当前线程中传播,并且屏障会处于broken状态。
209 * @return the arrival index of the current thread, where index
210 * <tt>{@link #getParties()} - 1</tt> indicates the first
211 * to arrive and zero indicates the last to arrive
212 * @throws InterruptedException if the current thread was interrupted
213 * while waiting
214 * @throws BrokenBarrierException if <em>another</em> thread was
215 * interrupted or timed out while the current thread was
216 * waiting, or the barrier was reset, or the barrier was
217 * broken when {@code await} was called, or the barrier
218 * action (if present) failed due an exception.
219 */
220 public int await() throws InterruptedException, BrokenBarrierException {
221 try {
222 return dowait(false, 0L);
223 } catch (TimeoutException toe) {
224 throw new Error(toe); // cannot happen;
225 }
226 }
227 /**
228 这个方法是比较重要的方法。 会涵盖各种的策略。
229 * Main barrier code, covering the various policies.
230 */
231 private int dowait(boolean timed, long nanos)
232 throws InterruptedException, BrokenBarrierException,
233 TimeoutException {
234 //获取独占锁。
235 final ReentrantLock lock = this.lock;
236 lock.lock();
237 try {
238 //取得当前代的引用
239 final Generation g = generation;
240 //如果该代损坏了,抛出异常。
241 if (g.broken)
242 throw new BrokenBarrierException();
243 //如果被打断,抛出异常。
244 if (Thread.interrupted()) {
245 //则将损坏状态设置为true,并通知其他阻塞在此栅栏上的线程。
246 breakBarrier();
247 throw new InterruptedException();
248 }
249
250 //这个count表示等待的线程数,假如第一个线程进来,就把count减减,赋值给index。如果index不为0,表示不是最后一个
251 //线程进来,则走下面的自旋过程。如果index为0时,表示最后一个线程进来,那么这时候需要执行if里面的过程。
252 // index==0时,
253 //如果是第一个线程进入到这个方法。假设在调用时候传入10,则这个index就是9.往下看,可以看到如果
254 int index = --count;
255 if (index == 0) { // tripped
256 //是否运行到达屏障点时要调用的线程,即barrierCommand。
257 boolean ranAction = false;
258 try {
259 //这个地方barrierCommand表示所有的线程到达屏障点时,要运行的线程。
260 final Runnable command = barrierCommand;
261 //如果在调用cyclicBarrier时,没有指定到达屏障点要执行的线程,则command为null, 就不用执行了,反之,需要执行这个线程。
262 if (command != null)
263 command.run();
264 //无论如何都设置为true。但是这个地方我有点没看明白,如果这个地方无论如何都设置为true的话,那么在finally中如果这个值
265 //为false时候,会执行breakBarrier方法。那么什么时候才能为false呢?
266 ranAction = true;
267 //这个方法是唤醒所有由于condition.await方法等待的线程。
268 //重置count为parties,就是把等待线程的数量又设置为栅栏能屏障线程初始数量。
269 //将generation重新创建。
270 nextGeneration();
271 return 0;
272 } finally {
273 //最终执行下面的逻辑。
274 //判断是否运行了barrierCommand线程。如果是,则
275 if (!ranAction)
276 //设置generation的broken为true,重置count数量,唤醒所有线程。
277 breakBarrier();
278 }
279 }
280
281 // loop until tripped, broken, interrupted, or timed out
282 for (;;) {
283 try {
284 //如果调用的时候不是调用的有timeout时长的方法时,timed为false,所以走trip.await().
285 //如果调用的时候是调用有timeout时长的方法,则timed为true,所以走else里面的流程,等待指定时间。
286 if (!timed)
287 trip.await();
288 else if (nanos > 0L)
289 nanos = trip.awaitNanos(nanos);
290 } catch (InterruptedException ie) {
291 //如果是当前代,并且broken为false
292 if (g == generation && ! g.broken) {
293 breakBarrier();
294 throw ie;
295 } else {
296 // We're about to finish waiting even if we had not
297 // been interrupted, so this interrupt is deemed to
298 // "belong" to subsequent execution.
299 Thread.currentThread().interrupt();
300 }
301 }
302
303 if (g.broken)
304 throw new BrokenBarrierException();
305
306 if (g != generation)
307 return index;
308
309 if (timed && nanos <= 0L) {
310 breakBarrier();
311 throw new TimeoutException();
312 }
313 }
314 } finally {
315 lock.unlock();
316 }
317 }
318
319 /**
320 * Sets current barrier generation as broken and wakes up everyone.
321 破坏栅栏,设置当前代的broken状态为true,设置等待线程的数量为 并且唤醒所有线程。
322 * Called only while holding lock.
323 */
324 private void breakBarrier() {
325 generation.broken = true;
326 count = parties;
327 trip.signalAll();
328 }
329
330 /**
331 * Updates state on barrier trip and wakes up everyone.
332 * Called only while holding lock.
333 什么时候调用呢,
334 1.在所有线程到达屏障点之后会调用
335 2. 在调用reset方法时。
336 有什么用处?
337 唤醒所有等待的线程。重置count,创建新的代。
338 */
339 private void nextGeneration() {
340 // signal completion of last generation
341 trip.signalAll();
342 // set up next generation
343 count = parties;
344 generation = new Generation();
345 }
346
347 /**
348 * Resets the barrier to its initial state. If any parties are
349 * currently waiting at the barrier, they will return with a
350 * {@link BrokenBarrierException}. Note that resets <em>after</em>
351 * a breakage has occurred for other reasons can be complicated to
352 * carry out; threads need to re-synchronize in some other way,
353 * and choose one to perform the reset. It may be preferable to
354 * instead create a new barrier for subsequent use.
355 */
356 public void reset() {
357 final ReentrantLock lock = this.lock;
358 lock.lock();
359 try {
360 breakBarrier(); // break the current generation
361 nextGeneration(); // start a new generation
362 } finally {
363 lock.unlock();
364 }
365 }
其中对一个地方有疑问。
265 //为false时候,会执行breakBarrier方法。那么什么时候才能为false呢?
这一行代码,如果有人直到怎么回事,请留下评论。
主要是用ReentrantLock和Condition来实现栅栏的。
参考资料: https://blog.csdn.net/qq_38293564/article/details/80558157
https://www.jianshu.com/p/ddaecc027ba4
https://blog.csdn.net/qq_39241239/article/details/87030142
这个帖子讲的很通俗易懂。