countdownlatch是java多線程包concurrent里的一個常見工具類,通過使用它可以借助線程能力極大提升處理響應速度,且實現方式非常優雅。今天我們用一個實際案例和大家來講解一下如何使用以及需要特別注意的點。
由於線程類的東西都比較抽象,我們換一種講解思路,先講解決問題的案例,然后再解釋下原理。
假設在微服務架構中,A服務會調用B服務處理一些事情,且每處理一次業務,A可能要調用B多次處理邏輯相同但數據不同的事情。為了提升整個鏈路的處理速度,我們自然會想到是否可以把A調用B的各個請求組成一個批次,這樣A服務只需要調用B服務一次,等B服務處理完一起返回即可,省了多次網絡傳輸的時間。代碼如下:
/** * 批次請求處理服務 * @param batchRequests 批次請求對象列表 * @return */ public List<DealResult> deal(List<DealRequest> batchRequests){ List<DealResult> resultList = new ArrayList<>(); if(batchRequests != null){ for(DealRequest request : batchRequests){ //遍歷順序處理單個請求 resultList.add(process(request)); } } return resultList; }
但是B服務順序處理批次里每一個請求的時間並沒有節省,假設批次里有3個請求,一個請求平均耗時100MS,則B服務還是要花費300MS來處理完。有什么辦法能立刻簡單提升3倍處理速度,令總花費時間只需要100MS?到我們的大將countdownlatch出場了!代碼如下:
/** * 使用countdownlatch的批次請求處理服務 * @param batchRequests 批次請求對象列表 * @return */ public List<DealResult> countDownDeal(List<DealRequest> batchRequests){ //定義線程安全的處理結果列表 List<DealResult> countDownResultList = Collections.synchronizedList(new ArrayList<DealResult>()); if(batchRequests != null){ //定義countdownlatch線程數,有多少個請求,我們就定義多少個 CountDownLatch runningThreadNum = new CountDownLatch(batchRequests.size()); for(DealRequest request : batchRequests){ //循環遍歷請求,並實例化線程(構造函數傳入CountDownLatch類型的runningThreadNum),立刻啟動 DealWorker dealWorker = new DealWorker(request, runningThreadNum, countDownResultList); new Thread(dealWorker).start(); } try { //調用CountDownLatch的await方法則當前主線程會等待,直到CountDownLatch類型的runningThreadNum清0 //每個DealWorker處理完成會對runningThreadNum減1 //如果等待1分鍾后當前主線程都等不到runningThreadNum清0,則認為超時,直接中斷,拋出中斷異常InterruptedException runningThreadNum.await(1, TimeUnit.MINUTES); } catch (InterruptedException e) { //此處簡化處理,非正常中斷應該拋出異常或返回錯誤結果 return null; } } return countDownResultList; } /** * 線程請求處理類 * */ private class DealWorker implements Runnable { /** 正在運行的線程數 */ private CountDownLatch runningThreadNum; /**待處理請求*/ private DealRequest request; /**待返回結果列表*/ private List<DealResult> countDownResultList; /** * 構造函數 * @param request 待處理請求 * @param runningThreadNum 正在運行的線程數 * @param countDownResultList 待返回結果列表 */ private DealWorker(DealRequest request, CountDownLatch runningThreadNum, List<DealResult> countDownResultList) { this.request = request; this.runningThreadNum = runningThreadNum; this.countDownResultList = countDownResultList; } @Override public void run() { try{ this.countDownResultList.add(process(this.request)); }finally{ //當前線程處理完成,runningThreadNum線程數減1,此操作必須在finally中完成,避免處理異常后造成runningThreadNum線程數無法清0 this.runningThreadNum.countDown(); } } }
是不是很簡單?下圖和上面的代碼又做了一個對應,假設有3個請求,則啟動3個子線程DealWorker,並實例化值數等於3的CountDownLatch。每當一個子線程處理完成后,則調用countDown操作減1。主線程處於awaiting狀態,直到CountDownLatch的值數減到0,則主線程繼續resume執行。

在API中是這樣描述的:
用給定的計數 初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之后,會釋放所有等待的線程,await 的所有后續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。
經典的java並發編程實戰一書中做了更深入的定義:CountDownLatch屬於閉鎖的范疇,閉鎖是一種同步工具類,可以延遲線程的進度直到其到達終止狀態。閉鎖的作用相當於一扇門:在閉鎖到達結束狀態之前(上面代碼中的runningThreadNumq清0),這扇門一直是關閉的,並且沒有任何線程能通過(上面代碼中的主線程一直await),當到達結束狀態時,這扇門會打開並允許所有線程通過(上面代碼中的主線程可以繼續執行)。當閉鎖到達結束狀態后,將不會再改變狀態,因此這扇門將永遠保持打開狀態。
像FutureTask,Semaphore這類在concurrent包里的類也屬於閉鎖,不過它們和CountDownLatch的應用場景還是有差別的,這個我們在后面的文章里再細說。
使用CountDownLatch有哪些需要注意的點
- 批次請求之間不能有執行順序要求,否則多個線程並發處理無法保證請求執行順序
- 各線程都要操作的結果列表必須是線程安全的,比如上面代碼范例的countDownResultList
- 各子線程的countDown操作要在finally中執行,確保一定可以執行
- 主線程的await操作需要設置超時時間,避免因子線程處理異常而長時間一直等待,如果中斷需要拋出異常或返回錯誤結果
使用CountDownLatch提高批次處理速度的問題
- 如果一個批次請求數很多,會瞬間占用服務器大量線程。此時必須使用線程池,並限定最大可處理線程數量,否則服務器不穩定性會大福提升。
- 主線程和子線程間的數據傳輸變得困難,稍不注意會造成線程不安全的問題,且代碼可讀性有一定下降
