java中CyclicBarrier的使用
CyclicBarrier是java 5中引入的線程安全的組件。它有一個barrier的概念,主要用來等待所有的線程都執行完畢,然后再去執行特定的操作。
假如我們有很多個線程,每個線程都計算出了一些數據,然后我們需要等待所有的線程都執行完畢,再把各個線程計算出來的數據加起來,的到最終的結果,那么我們就可以使用CyclicBarrier。
CyclicBarrier的方法
我們先看下CyclicBarrier的構造函數:
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
CyclicBarrier有兩個構造函數,第一個只接受一個參數,表示需要統一行動的線程個數。第二個參數叫做barrierAction,表示出發barrier是需要執行的方法。
其中barrierAction是一個Runnable,我們可以在其中定義最后需要執行的工作。
再看下重要await方法:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
await也有兩個方法,一個是帶時間參數的,一個是不帶時間參數的。
await本質上調用了lock.newCondition().await()方法。
因為有多個parties,下面我們考慮兩種情況。
- 該線程不是最后一個調用await的線程
在這種情況下,該線程將會進入等待狀態,直到下面的情況發送:
- 最后一個線程調用await()
- 其他線程中斷了當前線程
- 其他線程中斷了其他正在等待的線程
- 其他線程在等待barrier的時候超時
- 其他線程在該barrier上調用的reset()方法
如果該線程在調用await()的時候已經設置了interrupted的狀態,或者在等待的時候被interrupted,那么將會拋出InterruptedException異常,並清除中斷狀態。(這里和Thread的interrupt()方法保持一致)
如果任何線程正在等待狀態中,這時候barrier被重置。或者在線程調用await方法或者正在等待中,barrier被broken,那么將會拋出BrokenBarrierException。
如果任何線程在等待的時候被中斷,那么所有其他等待的線程將會拋出BrokenBarrierException,barrier將會被置為broken狀態。
- 如果該線程是最后一個調用await方法的
在這種情況,如果barrierAction不為空,那么該線程將會在其他線程繼續執行前調用這個barrierAction。
如果該操作拋出異常,那么barrier的狀態將會被置為broken狀態。
再看看這個reset() 方法:
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
該方法將會將barrier置為broken狀態,並且開啟一個新的generation,來進行下一輪的操作。
CyclicBarrier的使用
我們在子線程中生成一個隨機的整數隊列,當所有的線程都生成完畢之后,我們再將生成的整數全都加起來。看下怎么實現。
定義生成整數隊列的子線程:
public class CyclicBarrierUsage implements Runnable {
private CyclicBarrier cyclicBarrier;
private List<List<Integer>> partialResults;
private Random random = new Random();
public CyclicBarrierUsage(CyclicBarrier cyclicBarrier,List<List<Integer>> partialResults){
this.cyclicBarrier=cyclicBarrier;
this.partialResults=partialResults;
}
@Override
public void run() {
String thisThreadName = Thread.currentThread().getName();
List<Integer> partialResult = new ArrayList<>();
// Crunch some numbers and store the partial result
for (int i = 0; i < 10; i++) {
Integer num = random.nextInt(10);
System.out.println(thisThreadName
+ ": Crunching some numbers! Final result - " + num);
partialResult.add(num);
}
partialResults.add(partialResult);
try {
System.out.println(thisThreadName
+ " waiting for others to reach barrier.");
cyclicBarrier.await();
} catch (InterruptedException e) {
// ...
} catch (BrokenBarrierException e) {
// ...
}
}
}
上面的子線程接收外部傳入的cyclicBarrier和保存數據的partialResults,並在運行完畢調用cyclicBarrier.await()來等待其他線程執行完畢。
看下CyclicBarrier的構建:
CyclicBarrier cyclicBarrier=new CyclicBarrier(5,()->{
String thisThreadName = Thread.currentThread().getName();
System.out.println(
thisThreadName + ": Computing sum of 5 workers, having 10 results each.");
int sum = 0;
for (List<Integer> threadResult : partialResults) {
System.out.print("Adding ");
for (Integer partialResult : threadResult) {
System.out.print(partialResult+" ");
sum += partialResult;
}
System.out.println();
}
System.out.println(thisThreadName + ": Final result = " + sum);
});
在CyclicBarrier中,我們定義了一個BarrierAction來做最后數據的匯總處理。
運行:
for (int i = 0; i < 5; i++) {
Thread worker = new Thread(new CyclicBarrierUsage(cyclicBarrier,partialResults));
worker.setName("Thread " + i);
worker.start();
}
輸出結果如下:
Spawning 5 worker threads to compute 10 partial results each
Thread 0: Crunching some numbers! Final result - 5
Thread 0: Crunching some numbers! Final result - 3
Thread 1: Crunching some numbers! Final result - 1
Thread 0: Crunching some numbers! Final result - 7
Thread 1: Crunching some numbers! Final result - 8
Thread 0: Crunching some numbers! Final result - 4
Thread 0: Crunching some numbers! Final result - 6
Thread 0: Crunching some numbers! Final result - 9
Thread 1: Crunching some numbers! Final result - 3
Thread 2: Crunching some numbers! Final result - 1
Thread 0: Crunching some numbers! Final result - 0
Thread 2: Crunching some numbers! Final result - 9
Thread 1: Crunching some numbers! Final result - 3
Thread 2: Crunching some numbers! Final result - 7
Thread 0: Crunching some numbers! Final result - 2
Thread 2: Crunching some numbers! Final result - 6
Thread 1: Crunching some numbers! Final result - 6
Thread 2: Crunching some numbers! Final result - 5
Thread 0: Crunching some numbers! Final result - 0
Thread 2: Crunching some numbers! Final result - 1
Thread 1: Crunching some numbers! Final result - 5
Thread 2: Crunching some numbers! Final result - 1
Thread 0: Crunching some numbers! Final result - 7
Thread 2: Crunching some numbers! Final result - 8
Thread 1: Crunching some numbers! Final result - 2
Thread 2: Crunching some numbers! Final result - 4
Thread 0 waiting for others to reach barrier.
Thread 2: Crunching some numbers! Final result - 0
Thread 2 waiting for others to reach barrier.
Thread 1: Crunching some numbers! Final result - 7
Thread 1: Crunching some numbers! Final result - 6
Thread 1: Crunching some numbers! Final result - 9
Thread 1 waiting for others to reach barrier.
Thread 3: Crunching some numbers! Final result - 9
Thread 3: Crunching some numbers! Final result - 3
Thread 3: Crunching some numbers! Final result - 8
Thread 3: Crunching some numbers! Final result - 8
Thread 3: Crunching some numbers! Final result - 1
Thread 3: Crunching some numbers! Final result - 8
Thread 3: Crunching some numbers! Final result - 0
Thread 3: Crunching some numbers! Final result - 5
Thread 3: Crunching some numbers! Final result - 9
Thread 3: Crunching some numbers! Final result - 1
Thread 3 waiting for others to reach barrier.
Thread 4: Crunching some numbers! Final result - 2
Thread 4: Crunching some numbers! Final result - 2
Thread 4: Crunching some numbers! Final result - 5
Thread 4: Crunching some numbers! Final result - 5
Thread 4: Crunching some numbers! Final result - 3
Thread 4: Crunching some numbers! Final result - 7
Thread 4: Crunching some numbers! Final result - 4
Thread 4: Crunching some numbers! Final result - 8
Thread 4: Crunching some numbers! Final result - 4
Thread 4: Crunching some numbers! Final result - 3
Thread 4 waiting for others to reach barrier.
Thread 4: Computing sum of 5 workers, having 10 results each.
Adding 5 3 7 4 6 9 0 2 0 7
Adding 1 9 7 6 5 1 1 8 4 0
Adding 1 8 3 3 6 5 2 7 6 9
Adding 9 3 8 8 1 8 0 5 9 1
Adding 2 2 5 5 3 7 4 8 4 3
Thread 4: Final result = 230
Process finished with exit code 0
本文的例子可以參考https://github.com/ddean2009/learn-java-concurrency/tree/master/CyclicBarrier
更多教程請參考 flydean的博客