一、前言
有了前面分析的基礎,現在,接着分析CyclicBarrier源碼,CyclicBarrier類在進行多線程編程時使用很多,比如,你希望創建一組任務,它們並行執行工作,然后在進行下一個步驟之前等待,直至所有的任務都完成,和join很類似,下面,開始分析源碼。
二、CyclicBarrier數據結構
分析源碼可以知道,CyclicBarrier底層是基於ReentrantLock和AbstractQueuedSynchronizer來實現的,所以,CyclicBarrier的數據結構也依托於AQS的數據結構,在前面對AQS的分析中已經指出了其數據結構,在這里不再累贅。
三、CyclicBarrier源碼分析
3.1 類的繼承關系
public class CyclicBarrier {}
說明:可以看到CyclicBarrier沒有顯示繼承哪個父類或者實現哪個父接口,根據Java語言規定,可知其父類是Object。
3.2 類的內部類
CyclicBarrier類存在一個內部類Generation,每一次使用的CycBarrier可以當成Generation的實例,其源代碼如下
private static class Generation { boolean broken = false; }
說明:Generation類有一個屬性broken,用來表示當前屏障是否被損壞。
3.3 類的屬性

public class CyclicBarrier { /** The lock for guarding barrier entry */ // 可重入鎖 private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ // 條件隊列 private final Condition trip = lock.newCondition(); /** The number of parties */ // 參與的線程數量 private final int parties; /* The command to run when tripped */ // 由最后一個進入 barrier 的線程執行的操作 private final Runnable barrierCommand; /** The current generation */ // 當前代 private Generation generation = new Generation(); // 正在等待進入屏障的線程數量 private int count; }
說明:該屬性有一個為ReentrantLock對象,有一個為Condition對象,而Condition對象又是基於AQS的,所以,歸根到底,底層還是由AQS提供支持。
3.4 類的構造函數
1. CyclicBarrier(int, Runnable)型構造函數

public CyclicBarrier(int parties, Runnable barrierAction) { // 參與的線程數量小於等於0,拋出異常 if (parties <= 0) throw new IllegalArgumentException(); // 設置parties this.parties = parties; // 設置count this.count = parties; // 設置barrierCommand this.barrierCommand = barrierAction; }
說明:該構造函數可以指定關聯該CyclicBarrier的線程數量,並且可以指定在所有線程都進入屏障后的執行動作,該執行動作由最后一個進行屏障的線程執行。
2. CyclicBarrier(int)型構造函數

public CyclicBarrier(int parties) { // 調用含有兩個參數的構造函數 this(parties, null); }
說明:該構造函數僅僅執行了關聯該CyclicBarrier的線程數量,沒有設置執行動作。
3.5 核心函數分析
1. dowait函數
此函數為CyclicBarrier類的核心函數,CyclicBarrier類對外提供的await函數在底層都是調用該了doawait函數,其源代碼如下。

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 保存當前鎖 final ReentrantLock lock = this.lock; // 鎖定 lock.lock(); try { // 保存當前代 final Generation g = generation; if (g.broken) // 屏障被破壞,拋出異常 throw new BrokenBarrierException(); if (Thread.interrupted()) { // 線程被中斷 // 損壞當前屏障,並且喚醒所有的線程,只有擁有鎖的時候才會調用 breakBarrier(); // 拋出異常 throw new InterruptedException(); } // 減少正在等待進入屏障的線程數量 int index = --count; if (index == 0) { // 正在等待進入屏障的線程數量為0,所有線程都已經進入 // 運行的動作標識 boolean ranAction = false; try { // 保存運行動作 final Runnable command = barrierCommand; if (command != null) // 動作不為空 // 運行 command.run(); // 設置ranAction狀態 ranAction = true; // 進入下一代 nextGeneration(); return 0; } finally { if (!ranAction) // 沒有運行的動作 // 損壞當前屏障 breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out // 無限循環 for (;;) { try { if (!timed) // 沒有設置等待時間 // 等待 trip.await(); else if (nanos > 0L) // 設置了等待時間,並且等待時間大於0 // 等待指定時長 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { // 等於當前代並且屏障沒有被損壞 // 損壞當前屏障 breakBarrier(); // 拋出異常 throw ie; } else { // 不等於當前帶后者是屏障被損壞 // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. // 中斷當前線程 Thread.currentThread().interrupt(); } } if (g.broken) // 屏障被損壞,拋出異常 throw new BrokenBarrierException(); if (g != generation) // 不等於當前代 // 返回索引 return index; if (timed && nanos <= 0L) { // 設置了等待時間,並且等待時間小於0 // 損壞屏障 breakBarrier(); // 拋出異常 throw new TimeoutException(); } } } finally { // 釋放鎖 lock.unlock(); } }
說明:dowait方法的邏輯會進行一系列的判斷,大致流程如下。
2. nextGeneration函數
此函數在所有線程進入屏障后會被調用,即生成下一個版本,所有線程又可以重新進入到屏障中,其源代碼如下

private void nextGeneration() { // signal completion of last generation // 喚醒所有線程 trip.signalAll(); // set up next generation // 恢復正在等待進入屏障的線程數量 count = parties; // 新生一代 generation = new Generation(); }
在此函數中會調用AQS的signalAll方法,即喚醒所有等待線程。如果所有的線程都在等待此條件,則喚醒所有線程。其源代碼如下

public final void signalAll() { if (!isHeldExclusively()) // 不被當前線程獨占,拋出異常 throw new IllegalMonitorStateException(); // 保存condition隊列頭結點 Node first = firstWaiter; if (first != null) // 頭結點不為空 // 喚醒所有等待線程 doSignalAll(first); }
說明:此函數判斷頭結點是否為空,即條件隊列是否為空,然后會調用doSignalAll函數,doSignalAll函數源碼如下

private void doSignalAll(Node first) { // condition隊列的頭結點尾結點都設置為空 lastWaiter = firstWaiter = null; // 循環 do { // 獲取first結點的nextWaiter域結點 Node next = first.nextWaiter; // 設置first結點的nextWaiter域為空 first.nextWaiter = null; // 將first結點從condition隊列轉移到sync隊列 transferForSignal(first); // 重新設置first first = next; } while (first != null); }
說明:此函數會依次將條件隊列中的節點轉移到同步隊列中,會調用到transferForSignal函數,其源碼如下

final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
說明:此函數的作用就是將處於條件隊列中的節點轉移到同步隊列中,並設置結點的狀態信息,其中會調用到enq函數,其源代碼如下。

private Node enq(final Node node) { for (;;) { // 無限循環,確保結點能夠成功入隊列 // 保存尾結點 Node t = tail; if (t == null) { // 尾結點為空,即還沒被初始化 if (compareAndSetHead(new Node())) // 頭結點為空,並設置頭結點為新生成的結點 tail = head; // 頭結點與尾結點都指向同一個新生結點 } else { // 尾結點不為空,即已經被初始化過 // 將node結點的prev域連接到尾結點 node.prev = t; if (compareAndSetTail(t, node)) { // 比較結點t是否為尾結點,若是則將尾結點設置為node // 設置尾結點的next域為node t.next = node; return t; // 返回尾結點 } } } }
說明:此函數完成了結點插入同步隊列的過程,也很好理解。
綜合上面的分析可知,newGeneration函數的主要方法的調用如下,之后會通過一個例子詳細講解。
3. breakBarrier函數
此函數的作用是損壞當前屏障,會喚醒所有在屏障中的線程。源代碼如下

private void breakBarrier() { // 設置狀態 generation.broken = true; // 恢復正在等待進入屏障的線程數量 count = parties; // 喚醒所有線程 trip.signalAll(); }
說明:可以看到,此函數也調用了AQS的signalAll函數,由signal函數提供支持。
四、示例
下面通過一個例子來詳解CyclicBarrier的使用和內部工作機制,源代碼如下
package com.hust.grid.leesf.cyclicbarrier; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * * @author leesf * @time 2016.4.16 */ class MyThread extends Thread { private CyclicBarrier cb; public MyThread(String name, CyclicBarrier cb) { super(name); this.cb = cb; } public void run() { System.out.println(Thread.currentThread().getName() + " going to await"); try { cb.await(); System.out.println(Thread.currentThread().getName() + " continue"); } catch (Exception e) { e.printStackTrace(); } } } public class CyclicBarrierDemo { public static void main(String[] args) throws InterruptedException, BrokenBarrierException { CyclicBarrier cb = new CyclicBarrier(3, new Thread("barrierAction") { public void run() { System.out.println(Thread.currentThread().getName() + " barrier action"); } }); MyThread t1 = new MyThread("t1", cb); MyThread t2 = new MyThread("t2", cb); t1.start(); t2.start(); System.out.println(Thread.currentThread().getName() + " going to await"); cb.await(); System.out.println(Thread.currentThread().getName() + " continue"); } }
運行結果(某一次):
t1 going to await main going to await t2 going to await t2 barrier action t2 continue t1 continue main continue
說明:根據結果可知,可能會存在如下的調用時序。
說明:由上圖可知,假設t1線程的cb.await是在main線程的cb.barrierAction動作是由最后一個進入屏障的線程執行的。根據時序圖,進一步分析出其內部工作流程。
① main(主)線程執行cb.await操作,主要調用的函數如下。
說明:由於ReentrantLock的默認采用非公平策略,所以在dowait函數中調用的是ReentrantLock.NonfairSync的lock函數,由於此時AQS的狀態是0,表示還沒有被任何線程占用,故main線程可以占用,之后在dowait中會調用trip.await函數,最終的結果是條件隊列中存放了一個包含main線程的結點,並且被禁止運行了,同時,main線程所擁有的資源也被釋放了,可以供其他線程獲取。
② t1線程執行cb.await操作,其中假設t1線程的lock.lock操作在main線程釋放了資源之后,則其主要調用的函數如下。
說明:可以看到,之后condition queue(條件隊列)里面有兩個節點,包含t1線程的結點插入在隊列的尾部,並且t1線程也被禁止了,因為執行了park操作,此時兩個線程都被禁止了。
③ t2線程執行cb.await操作,其中假設t2線程的lock.lock操作在t1線程釋放了資源之后,則其主要調用的函數如下。
說明:由上圖可知,在t2線程執行await操作后,會直接執行command.run方法,不是重新開啟一個線程,而是最后進入屏障的線程執行。同時,會將Condition queue中的所有節點都轉移到Sync queue中,並且最后main線程會被unpark,可以繼續運行。main線程獲取cpu資源,繼續運行。
④ main線程獲取cpu資源,繼續運行,下圖給出了主要的方法調用。
說明:其中,由於main線程是在AQS.CO的wait中被park的,所以恢復時,會繼續在該方法中運行。運行過后,t1線程被unpark,它獲得cpu資源可以繼續運行。
⑤ t1線程獲取cpu資源,繼續運行,下圖給出了主要的方法調用。
說明:其中,由於t1線程是在AQS.CO的wait方法中被park,所以恢復時,會繼續在該方法中運行。運行過后,Sync queue中保持着一個空節點。頭結點與尾節點均指向它。
注意:在線程await過程中中斷線程會拋出異常,所有進入屏障的線程都將被釋放。至於CyclicBarrier的其他用法,讀者可以自行查閱API,不再累贅。
五、總結
有了AQS與ReentrantLock的基礎,分析CyclicBarrier就會非常簡單,因為其底層就是由兩者支撐的,關於CycylicBarrier的源碼就分析到此,有疑問的讀者,歡迎交流,謝謝各位園友的觀看~