CountDownLatch
CountDownLatch適用於在多線程的場景需要等待所有子線程全部執行完畢之后再做操作的場景。
舉個例子,早上部門開會,有人在上廁所,這時候需要等待所有人從廁所回來之后才能開始會議。
public class CountDownLatchTest { private static int num = 3; private static CountDownLatch countDownLatch = new CountDownLatch(num); private static ExecutorService executorService = Executors.newFixedThreadPool(num); public static void main(String[] args) throws Exception{ executorService.submit(() -> { System.out.println("A在上廁所"); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); System.out.println("A上完了"); } }); executorService.submit(()->{ System.out.println("B在上廁所"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); System.out.println("B上完了"); } }); executorService.submit(()->{ System.out.println("C在上廁所"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); System.out.println("C上完了"); } }); System.out.println("等待所有人從廁所回來開會..."); countDownLatch.await(); System.out.println("所有人都好了,開始開會..."); executorService.shutdown(); } }
代碼執行結果:
A在上廁所
B在上廁所
等待所有人從廁所回來開會...
C在上廁所
B上完了
C上完了
A上完了
所有人都好了,開始開會...
初始化一個CountDownLatch實例傳參3,因為我們有3個子線程,每次子線程執行完畢之后調用countDown()方法給計數器-1,主線程調用await()方法后會被阻塞,直到最后計數器變為0,await()方法返回,執行完畢。他和join()方法的區別就是join會阻塞子線程直到運行結束,而CountDownLatch可以在任何時候讓await()返回,而且用ExecutorService沒法用join了,相比起來,CountDownLatch更靈活。
CountDownLatch基於AQS實現,volatile變量state維持倒數狀態,多線程共享變量可見。
- CountDownLatch通過構造函數初始化傳入參數實際為AQS的state變量賦值,維持計數器倒數狀態
- 當主線程調用await()方法時,當前線程會被阻塞,當state不為0時進入AQS阻塞隊列等待。
- 其他線程調用countDown()時,state值原子性遞減,當state值為0的時候,喚醒所有調用await()方法阻塞的線程
CyclicBarrier
CyclicBarrier叫做回環屏障,它的作用是讓一組線程全部達到一個狀態之后再全部同時執行,而且他有一個特點就是所有線程執行完畢之后是可以重用的。
public class CyclicBarrierTest { private static int num = 3; private static CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> { System.out.println("所有人都好了,開始開會..."); System.out.println("-------------------"); }); private static ExecutorService executorService = Executors.newFixedThreadPool(num); public static void main(String[] args) throws Exception{ executorService.submit(() -> { System.out.println("A在上廁所"); try { Thread.sleep(4000); System.out.println("A上完了"); cyclicBarrier.await(); System.out.println("會議結束,A退出"); } catch (Exception e) { e.printStackTrace(); }finally { } }); executorService.submit(()->{ System.out.println("B在上廁所"); try { Thread.sleep(2000); System.out.println("B上完了"); cyclicBarrier.await(); System.out.println("會議結束,B退出"); } catch (Exception e) { e.printStackTrace(); }finally { } }); executorService.submit(()->{ System.out.println("C在上廁所"); try { Thread.sleep(3000); System.out.println("C上完了"); cyclicBarrier.await(); System.out.println("會議結束,C退出"); } catch (Exception e) { e.printStackTrace(); }finally { } }); executorService.shutdown(); } }
輸出結果為:
A在上廁所 B在上廁所 C在上廁所 B上完了 C上完了 A上完了 所有人都好了,開始開會... ------------------- 會議結束,A退出 會議結束,B退出 會議結束,C退出
從結果來看和CountDownLatch非常相似,初始化傳入3個線程和一個任務,線程調用await()之后進入阻塞,計數器-1,當計數器為0時,就去執行CyclicBarrier中構造函數的任務,當任務執行完畢后,喚醒所有阻塞中的線程。這驗證了CyclicBarrier讓一組線程全部達到一個狀態之后再全部同時執行的效果。
再舉個例子來驗證CyclicBarrier可重用的效果。
public class CyclicBarrierTest2 { private static int num = 3; private static CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> { System.out.println("-------------------"); }); private static ExecutorService executorService = Executors.newFixedThreadPool(num); public static void main(String[] args) throws Exception { executorService.submit(() -> { System.out.println("A在上廁所"); try { Thread.sleep(4000); System.out.println("A上完了"); cyclicBarrier.await(); System.out.println("會議結束,A退出,開始擼代碼"); cyclicBarrier.await(); System.out.println("C工作結束,下班回家"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } finally { } }); executorService.submit(() -> { System.out.println("B在上廁所"); try { Thread.sleep(2000); System.out.println("B上完了"); cyclicBarrier.await(); System.out.println("會議結束,B退出,開始摸魚"); cyclicBarrier.await(); System.out.println("B摸魚結束,下班回家"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } finally { } }); executorService.submit(() -> { System.out.println("C在上廁所"); try { Thread.sleep(3000); System.out.println("C上完了"); cyclicBarrier.await(); System.out.println("會議結束,C退出,開始摸魚"); cyclicBarrier.await(); System.out.println("C摸魚結束,下班回家"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } finally { } }); executorService.shutdown(); } }
輸出結果:
A在上廁所 B在上廁所 C在上廁所 B上完了 C上完了 A上完了 ------------------- 會議結束,A退出,開始擼代碼 會議結束,B退出,開始摸魚 會議結束,C退出,開始摸魚 ------------------- C摸魚結束,下班回家 C工作結束,下班回家 B摸魚結束,下班回家 -------------------
從結果來看,每個子線程調用await()計數器減為0之后才開始繼續一起往下執行,會議結束之后一起進入摸魚狀態,最后一天結束一起下班,這就是可重用。
CyclicBarrier還是基於AQS實現的,內部維護parties記錄總線程數,count用於計數,最開始count=parties,調用await()之后count原子遞減,當count為0之后,再次將parties賦值給count,這就是復用的原理。
- 當子線程調用await()方法時,獲取獨占鎖,同時對count遞減,進入阻塞隊列,然后釋放鎖
- 當第一個線程被阻塞同時釋放鎖之后,其他子線程競爭獲取鎖,操作同1
- 直到最后count為0,執行CyclicBarrier構造函數中的任務,執行完畢之后子線程繼續向下執行
Semaphore
Semaphore叫做信號量,和前面兩個不同的是,他的計數器是遞增的。
public class SemaphoreTest { private static int num = 3; private static int initNum = 0; private static Semaphore semaphore = new Semaphore(initNum); private static ExecutorService executorService = Executors.newFixedThreadPool(num); public static void main(String[] args) throws Exception{ executorService.submit(() -> { System.out.println("A在上廁所"); try { Thread.sleep(4000); semaphore.release(); System.out.println("A上完了"); } catch (Exception e) { e.printStackTrace(); }finally { } }); executorService.submit(()->{ System.out.println("B在上廁所"); try { Thread.sleep(2000); semaphore.release(); System.out.println("B上完了"); } catch (Exception e) { e.printStackTrace(); }finally { } }); executorService.submit(()->{ System.out.println("C在上廁所"); try { Thread.sleep(3000); semaphore.release(); System.out.println("C上完了"); } catch (Exception e) { e.printStackTrace(); }finally { } }); System.out.println("等待所有人從廁所回來開會..."); semaphore.acquire(num); System.out.println("所有人都好了,開始開會..."); executorService.shutdown(); } }
輸出結果為:
A在上廁所
B在上廁所
等待所有人從廁所回來開會...
C在上廁所
B上完了
C上完了
A上完了
所有人都好了,開始開會...
稍微和前兩個有點區別,構造函數傳入的初始值為0,當子線程調用release()方法時,計數器遞增,主線程acquire()傳參為3則說明主線程一直阻塞,直到計數器為3才會返回。
Semaphore還還還是基於AQS實現的,同時獲取信號量有公平和非公平兩種策略
- 主線程調用acquire()方法時,用當前信號量值-需要獲取的值,如果小於0,則進入同步阻塞隊列,大於0則通過CAS設置當前信號量為剩余值,同時返回剩余值
- 子線程調用release()給當前信號量值計數器+1(增加的值數量由傳參決定),同時不停的嘗試因為調用acquire()進入阻塞的線程
總結
CountDownLatch通過計數器提供了比join更靈活的多線程控制方式,CyclicBarrier也可以達到CountDownLatch的效果,而且有可復用的特點,Semaphore則是采用信號量遞增的方式,開始的時候並不需要關注需要同步的線程個數,並且提供獲取信號的公平和非公平策略。
- END -