CountdownLatch和CyclicBarrier都屬於線程同步的工具,不過具體的實現以及使用的情況有所不同,我們先來看看不同的使用情況
CountdownLatch 使用場景
顧名思義CountdownLatch可以當做一個計數器來使用,比如某線程需要等待其他幾個線程都執行過某個時間節點后才能繼續執行 我們來模擬一個場景,某公司一共有十個人,門衛要等十個人都來上班以后,才可以休息,代碼實現如下
public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { //lambda中只能只用final的變量 final int times = i; new Thread(() -> { try { System.out.println("子線程" + Thread.currentThread().getName() + "正在趕路"); Thread.sleep(1000 * times); System.out.println("子線程" + Thread.currentThread().getName() + "到公司了"); //調用latch的countDown方法使計數器-1 latch.countDown(); System.out.println("子線程" + Thread.currentThread().getName() + "開始工作"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } try { System.out.println("門衛等待員工上班中..."); //主線程阻塞等待計數器歸零 latch.await(); System.out.println("員工都來了,門衛去休息了"); } catch (InterruptedException e) { e.printStackTrace(); } }
運行后結果如下
子線程Thread-0正在趕路 子線程Thread-2正在趕路 子線程Thread-0到公司了 子線程Thread-0開始工作 子線程Thread-1正在趕路 門衛等待員工上班中... 子線程Thread-4正在趕路 子線程Thread-9正在趕路 子線程Thread-5正在趕路 子線程Thread-6正在趕路 子線程Thread-7正在趕路 子線程Thread-8正在趕路 子線程Thread-3正在趕路 子線程Thread-1到公司了 子線程Thread-1開始工作 子線程Thread-2到公司了 子線程Thread-2開始工作 子線程Thread-3到公司了 子線程Thread-3開始工作 子線程Thread-4到公司了 子線程Thread-4開始工作 子線程Thread-5到公司了 子線程Thread-5開始工作 子線程Thread-6到公司了 子線程Thread-6開始工作 子線程Thread-7到公司了 子線程Thread-7開始工作 子線程Thread-8到公司了 子線程Thread-8開始工作 子線程Thread-9到公司了 子線程Thread-9開始工作 員工都來了,門衛去休息了
可以看到子線程並沒有因為調用latch.countDown而阻塞,會繼續進行該做的工作,只是通知計數器-1,即完成了我們如上說的場景,只需要在所有進程都進行到某一節點后才會執行被阻塞的進程.如果我們想要多個線程在同一時間進行就要用到CyclicBarrier了
CyclicBarrier 使用場景
我們重新模擬一個新的場景,就用已經被說爛的跑步場景吧,十名運動員各自准備比賽,需要等待所有運動員都准備好以后,裁判才能說開始然后所有運動員一起跑,代碼實現如下
public static void main(String[] args) { final CyclicBarrier cyclicBarrier = new CyclicBarrier(10,()->{ System.out.println("所有人都准備好了裁判開始了"); }); for (int i = 0; i < 10; i++) { //lambda中只能只用final的變量 final int times = i; new Thread(() -> { try { System.out.println("子線程" + Thread.currentThread().getName() + "正在准備"); Thread.sleep(1000 * times); System.out.println("子線程" + Thread.currentThread().getName() + "准備好了"); cyclicBarrier.await(); System.out.println("子線程" + Thread.currentThread().getName() + "開始跑了"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } }
執行結果如下
子線程Thread-0正在准備 子線程Thread-2正在准備 子線程Thread-1正在准備 子線程Thread-3正在准備 子線程Thread-4正在准備 子線程Thread-0准備好了 子線程Thread-5正在准備 子線程Thread-6正在准備 子線程Thread-7正在准備 子線程Thread-8正在准備 子線程Thread-9正在准備 子線程Thread-1准備好了 子線程Thread-2准備好了 子線程Thread-3准備好了 子線程Thread-4准備好了 子線程Thread-5准備好了 子線程Thread-6准備好了 子線程Thread-7准備好了 子線程Thread-8准備好了 子線程Thread-9准備好了 所有人都准備好了裁判開始了 子線程Thread-9開始跑了 子線程Thread-0開始跑了 子線程Thread-2開始跑了 子線程Thread-1開始跑了 子線程Thread-7開始跑了 子線程Thread-6開始跑了 子線程Thread-5開始跑了 子線程Thread-4開始跑了 子線程Thread-3開始跑了 子線程Thread-8開始跑了
可以看到所有線程在其他線程沒有准備好之前都在被阻塞中,等到所有線程都准備好了才繼續執行 我們在創建CyclicBarrier對象時傳入了一個方法,當調用CyclicBarrier的await方法后,當前線程會被阻塞等到所有線程都調用了await方法后 調用傳入CyclicBarrier的方法,然后讓所有的被阻塞的線程一起運行
應用場景我們說完了,接下來看看兩個工具的具體實現
CountdownLatch 底層實現
我們先來看看CountdownLatch的構造方法
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
首先保證了count一定要大於零,然后初始化了一個Sync對象,在看看這個Sync對象是個什么
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
Sync是CountdownLatch的靜態內部類,繼承了AbstractQueuedSynchronizer(即AQS,提供了一種實現阻塞鎖和一系列依賴FIFO等待隊列的同步器的工具,回頭單講)抽象類, 在Sync的構造方法中,調用了setState方法,可以視作初始化了一個標記來記錄當前計數器的數量
我們來看CountdownLatch的兩個核心方法,await和countdown,先來看await
public void await() throws InterruptedException { //可以視作將線程阻塞 sync.acquireSharedInterruptibly(1); }
await調用的是AQS的方法,可以視作阻塞線程,具體實現在分析AQS的章節中展開 再來看看countdown方法
public void countDown() { sync.releaseShared(1); }
調用了sync的一個方法,再來看看這個方法的實現
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
再來看這個tryReleaseShared方法
protected boolean tryReleaseShared(int releases) { for (;;) { //獲取標記位 int c = getState(); if (c == 0) return false; int nextc = c-1; //用cas的方式更新標記位 if (compareAndSetState(c, nextc)) return nextc == 0; } }
可以看到在調用tryReleaseShared實際上是將標記位-1並且返回標記位是否為0,如果標記位為0 那么調用的doReleaseShared可以視作將阻塞的線程放行,這樣整個的流程就通了
CyclicBarrier 底層實現
老規矩先看構造方法
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
這邊傳入了兩個對象簡單的記錄了一下存值,我們直接查看一下關鍵的await方法
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
再來看dowait的實現
/** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** 省略部分代碼 **/ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; //判斷是否被打斷 if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //將計數器-1 即在構造方法中賦值的count int index = --count; if (index == 0) { // tripped //如果所有的線程都執行完畢即count=0時 boolean ranAction = false; try { //執行傳入的方法 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //喚醒所有線程 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } //如果count沒有到0那么阻塞當前線程 for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
從代碼中可以看到,CyclicBarrier是利用Lock的condition方法來進行線程的阻塞和喚醒,類似Object.wait()和notifyAll()在count不為0時阻塞,在count=0時喚醒所有線程
總結
1,CountdownLatch適用於所有線程通過某一點后通知方法,而CyclicBarrier則適合讓所有線程在同一點同時執行 2,CountdownLatch利用繼承AQS的共享鎖來進行線程的通知,利用CAS來進行--,而CyclicBarrier則利用ReentrantLock的Condition來阻塞和通知線程