前言
下面介紹協調讓多線程步調一致的兩個工具類:CountDownLatch
和CyclicBarrier
。
CountDownLatch和CyclicBarrier的用途介紹
CountDownLatch
// API
void await(); // 使當前線程在閉鎖計數器到零之前一直等待,除非線程被中斷。
boolean await(long timeout, TimeUnit unit); // 使當前線程在閉鎖計數器至零之前一直等待,除非線程被中斷或超出了指定的等待時間。
void countDown(); // 遞減閉鎖計數器,如果計數到達零,則釋放所有等待的線程。
long getCount(); // 返回當前計數。
String toString(); // 返回標識此閉鎖及其狀態的字符串。
CountDownLatch
是一個同步工具類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。可以指定計數初始化CountDownLatch,當調用countDown()
方法后,在當前計數到達零之前,await()
方法會一直受阻塞。計數到達零之后,所有被阻塞的線程都會被釋放,await()
的所有后續調用都會立即返回。CountDownLatch的計數只能被使用一次,如果需要重復計數使用,則要考慮使用CyclicBarrier
。
CountDownLatch的用途有很多。將計數為1初始化的CountDownLatch可用作一個簡單的開/關或入口:在通過調用countDown()的線程打開入口前,所有調用await()的線程都一直在入口出等待。而用N初始化CountDownLatch可以使一個線程在N個線程完成某項操作之前一直等待,或者使其在某項操作完成N次之前一直等待。
COuntDownLatch的內存一致性語義:線程中調用 countDown()
之前的操作 Happens-Before緊跟在從另一個線程中對應 await()
成功返回的操作。
CyclicBarrier
// API
int await(); // 線程將一直等待直到所有參與者都在此 barrier 上調用 await 方法
int await(long timeout, TimeUnit unit); // 線程將一直等待直到所有參與者都在此 barrier 上調用 await 方法, 或者超出了指定的等待時間。
int getNumberWaiting(); // 返回當前在屏障處等待的參與者數目。
int getParties(); // 返回要求啟動此 barrier 的參與者數目。
boolean isBroken(); // 查詢此屏障是否處於損壞狀態。
void reset(); // 將屏障重置為其初始狀態。
CyclicBarrier
是一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點(barrier也可被翻譯為柵欄) (common barrier point)。 CyclicBarrier 適用於在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待的情況。即所有線程都必須到達屏障位置后,下面的程序才能繼續執行,適於在迭代算法中使用。因為 barrier 在釋放等待線程后可以計數器會被重置可繼續使用,所以稱它為循環 的 barrier。
CyclicBarrier支持一個可選的 Runnable
命令(也就是可以傳入一個線程執行其他操作),在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令將只在每個 barrier point 運行一次。這對所有參與線程繼續運行之前更新它們的共享狀態將十分有用。
CyclicBarrier的內存一致性語義:線程中調用 await()
之前的操作 Happens-Before 那些是屏障操作的一部份的操作,后者依次 Happens-Before 緊跟在從另一個線程中對應 await()
成功返回的操作。
Actions in a thread prior to calling await() happen-before actions that are part of the barrier action, which in turn happen-before actions following a successful return from the corresponding await() in other threads.
在對賬系統中使用CountDownLatch和CyclicBarrier
對賬系統流程圖如下:
目前對賬系統的處理流程是:先查詢訂單,然后查詢派送單,之后對比訂單和派送單,將差異寫入差異庫。對賬系統的代碼抽象后如下:
while(存在未對賬訂單){
// 查詢未對賬訂單
pos = getPOrders();
// 查詢派送單
dos = getDOrders();
// 執行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
利用並行優化對賬系統
目前的對賬系統,由於訂單量和派送單量巨大,所以查詢未對賬訂單getPOrder()
和查詢派送單getDOrder()
都相對比較慢。目前對賬系統是單線程執行的,示意圖如下(圖來自參考[1]):

對於串行化的系統,優化性能首先想到的就是能否利用多線程並行處理。
如果我們能將getPOrders()和getDOrders()這兩個操作並行處理,那么將會提升效率很多。因為這兩個操作並沒有先后順序的依賴,所以,我們可以並行處理這兩個耗時的操作。
並行后的示意圖如下(圖來自參考[1]):

對比單線程的執行示意圖,我們發現在同等時間里,並行執行的吞吐量近乎單線程的2倍,優化效果還是相對明顯的。
優化后的代碼如下:
while(存在未對賬訂單){
// 查詢未對賬訂單
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查詢派送單
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 要等待線程T1和T2執行完才能執行check()和save()這兩個操作
// 通過調用T1.join()和T2.join()來實現等待
// 當T2和T2線程退出時,調用T1.jion()和T2.join()的主線程就會從阻塞態被喚醒,從而執行check()和save()
T1.join();
T2.join();
// 執行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
使用CountDownLatch實現線程等待
上面的解決方案美中不足的地方在於:每一次while循環都會創建新的線程,而線程的創建是一個耗時操作。所以,最好能使創建出來的線程能夠循環使用。一個自然而然的方案便是線程池。
// 創建 2 個線程的線程池
Executor executor =Executors.newFixedThreadPool(2);
while(存在未對賬訂單){
// 查詢未對賬訂單
executor.execute(()-> {
pos = getPOrders();
});
// 查詢派送單
executor.execute(()-> {
dos = getDOrders();
});
/* ??如何實現等待??*/
// 執行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
於是我們就創建兩個固定大小為2的線程池,之后在while循環里重復利用。
但是問題也出來了:主線程如何得知getPOrders()和getDOrders()這兩個操作什么時候執完?
前面主線程通過調用線程T1和T2的join()
方法來等待T1和T2退出,但是在線程池的方案里,線程根本就不會退出,所以,join()方法不可取。
這時我們就可以使用CountDownLatch工具類,將其初始計數值設置為2。當執行完pos = getPOrders();
后,將計數器減一,執行完dos = getDOrders();
后也將計數器減一。當計數器為0時,被阻塞的主線程就可以繼續執行了。
// 創建 2 個線程的線程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未對賬訂單){
// 計數器初始化為 2
CountDownLatch latch = new CountDownLatch(2);
// 查詢未對賬訂單
executor.execute(()-> {
pos = getPOrders();
latch.countDown(); // 實現對計數器減1
});
// 查詢派送單
executor.execute(()-> {
dos = getDOrders();
latch.countDown(); // 實現對計數器減1
});
// 等待兩個查詢操作結束
latch.await(); // 在await()返回之前,主線程會一直被阻塞
// 執行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
使用CyclicBarrier進一步優化對賬系統
除了getPOrders()和getDOrders()這兩個操作可以並行,這兩個查詢操作和check()
、save()
這兩個對賬操作之間也可以並行。

兩次查詢操作和對賬操作並行,對賬操作還依賴查詢操作的結果,有點像生產者-消費者的意思,兩次查詢操作是生產者,對賬操作是消費者。那么,我們就需要一個隊列,來保存生產者生產的數據,而消費者則從這個隊列消費數據。
不過,針對對賬系統,可以設計兩個隊列,並且這兩個隊列之間還有對應關系。訂單查詢操作將訂單查詢結果插入訂單隊列,派送單查詢操作將派送單插入派送單隊列,這兩個隊列的元素之間是有一一對應關系。這樣的好處在於:對賬操作可以每次從訂單隊列出一個元素和從派送單隊列出一個元素,然后對這兩個元素執行對賬操作,這樣數據一定不會亂掉。

如何使兩個隊列實現完全的並行?
兩個查詢操作所需時間並不相同,那么一個簡單的想法便是,一個線程T1執行訂單的查詢工程,一個線程T2執行派送單的查詢工作,僅當線程T1和T2各自都生產完1條數據的時候,通知線程T3執行對賬操作。

先查詢完的一方需要在設置的屏障點等待另一方,直到雙方都到達屏障點,才開始繼續下一步任務。
於是我們可以使用CyclicBarrier來實現這個功能。創建一個計數器初始值為2的CyclicBarrier,同時傳入一個回調函數,當計數器減為0的時候,便調用這個函數。
Vector<P> pos; // 訂單隊列
Vector<D> dos; // 派送單隊列
// 執行回調的線程池
// 固定線程數量為1是因為只有單線程取獲取兩個隊列中的數據才不會出現數據匹配不一致問題
Executor executor = Executors.newFixedThreadPool(1);
// 創建CyclicBarrier的計數器為2,傳入一個線程另外執行對賬操作
// 當計數器為0時,會運行傳入線程執行對賬操作
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0); // 從訂單隊列中獲取訂單
D d = dos.remove(0); // 從派送單隊列中獲取派送單
// 執行對賬操作
diff = check(p, d);
// 差異寫入差異庫
save(diff);
}
void checkAll(){
// 循環查詢訂單庫
Thread T1 = new Thread(()->{
while(存在未對賬訂單){
pos.add(getPOrders()); // 查詢訂單庫
barrier.await(); // 將計數器減一並等待直到計數器為0
}
});
T1.start();
// 循環查詢運單庫
Thread T2 = new Thread(()->{
while(存在未對賬訂單){
dos.add(getDOrders()); // 查詢運單庫
barrier.await(); // 將計數器減一並等待直到計數器為0
}
});
T2.start();
}
線程T1負責查詢訂單,當查出一條時,調用barrier.await()
來將計數器減1,同時等待計數器變為0;線程T2負責查詢派送訂單,當查出一條時,也調用barrier.await()
來將計數器減1,同時等待計數器變為0;當T1和T2都調用barrier.await()時,計數器就會減到0,此時T1和T2就可以執行下一條語句了,同時會調用barrier的回調函數來執行對賬操作。
CyclicBarrier的計數器有自動重置的功能,當減到0時,會自動重置你設置的初始值。於是,我們便可以重復使用CyclicBarrier。
小結
CountDownLatch
和CyclicBarrier
是Java並發包提供的兩個非常易用的線程同步工具類。它們的區別在於:CountDownLatch主要用來解決一個線程等待多個線程的場景(計數器一旦減到0,再有線程調用await(),該線程會直接通過,計數器不會被重置);CyclicBarrier是一組線程之間的相互等待(計數器可以重用,減到0會重置為設置的初始值),還可以傳入回調函數,當計數器為0時,執行回調函數。
參考:
[1] 極客時間專欄王寶令《Java並發編程實戰》
[2] Brian Goetz.Tim Peierls. et al.Java並發編程實戰[M].北京:機械工業出版社,2016
[3] Oracle Java API.https://docs.oracle.com/javase/8/docs/api/index.html?overview-summary.html