使用 CompletableFuture 異步組裝數據
一種快捷、優雅的異步組裝數據方式
實際項目中經常遇到這種情況: 從多個表中查找到數據然后拼裝成一個VO返回給前端。
這個過程有可能會非常耗時。因為最終每一條返回的VO數據是由多個表中的數據拼裝而成,如果項目還是微服務需要從其他服務獲取數據,那將會更加耗時,更加麻煩。簡單的幾十條、幾百條數據單個線程跑起來可能沒有什么壓力,但是當數量達到成千上萬,幾十萬,幾百萬,組裝的邏輯也變得非常復雜時,這個操作就非常耗時。
最近我在項目中就遇到這個的情況。項目中我們需要做一個相關流程數據的下載功能。
最初版本使用單線程,因為業務的復雜性,5000多條數據完全下載下來需要30min。以為是從數據庫分揀數據比較耗時,查詢日志后發現數據庫查詢並沒有耗時多久,反而是組裝數據占用了大多數時間。
因此機智的我就想起之前同組小伙伴分享的Java8一個新的類CompletableFuture。
CompletableFuture 簡介
CompletableFuture 是Java 8 新增加的Api,該類實現,Future和CompletionStage兩個接口,提供了非常強大的Future的擴展功能,可以幫助我們簡化異步編程的復雜性,提供了函數式編程的能力,可以通過回調的方式處理計算結果,並且提供了轉換和組合CompletableFuture的方法。
具體大家可以查看Java Api 文檔,或者閱讀網上一些博客。
CompletableFuture 異步組裝數據
代碼示例如下
/**
* 功能描述: 拼裝數據
* @author lkb
* @date 2019/12/25
* @param
* @return java.util.List<com.laidian.erp.crm.vo.DeviceProcessListExportVO>
*/
private List<DeviceProcessListExportVO> listByFlowJobIds(List<String> flowJobIds, Map<String, ProcessInfoVo> map, Map<Integer,UserInfoDTO> userInfoDTOMap, Map<Integer,HatCity> cityMap){
//result 列表保存組裝完成的數據
List<DeviceProcessListExportVO> result = new LinkedList<>();
//每次組裝100條數據
List<List<String>> partition = Lists.partition(flowJobIds,100);
List<CompletableFuture> futures = partition.stream().map(subList -> CompletableFuture.supplyAsync(() -> {
//packVOs 方法就是組裝數據
return packVOs(subList,map,userInfoDTOMap,cityMap);
},ASYNC_IO_POOL).whenCompleteAsync((r,e)->result.addAll(r))
.exceptionally(e->{
log.error(e.getMessage(),e);
log.error("listByFlowJobIds error.");
return result;
})).collect(Collectors.toList());
CompletableFuture<Void> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
log.info("任務阻塞 ");
Instant start = Instant.now();
//阻塞,直到所有任務結束。
all.join();
log.info("任務阻塞結束 耗時 = {}",ChronoUnit.MILLIS.between(start, Instant.now()));
return result;
}
具體步驟如下:
- 將原始數據按照每組100條進行拆分。(具體每組拆分多少條需要根據實際的業務情況和服務器性能,多測試一下應該就知道了)
- 多線程組成數據,每個線程組裝一組數據(上面拆分的100條原始數據)。packVOs 方法就是組裝數據。為了高效,我建議 在組裝數據的時候多采用批量,緩存的思想,能批量盡量批量,重復數據就盡量緩存下來。
- CompletableFuture.supplyAsync() 方法說明如下。第一個參數是線程需要執行的動作,第二個參數是線程執行用的Executor,可以填自定義的,也可以不填寫,不填寫程序會使用默認的執行器。
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
返回由給定執行程序中運行的任務異步完成的新CompletableFuture,其中包含通過調用給定供應商獲得的值。
- whenCompleteAsync 方法含義和名字一樣,將上一步執行的結果或者異常作為參數傳給指定的參數。這里我們希望分批組裝的結果能過add進result中。
- exceptionally 是用來處理異常。當一個線程執行出現異常的時候應該執行怎樣的操作。
- all.join() 這個方法是等待所有的任務(所有的CompletableFuture)完成。組裝數據是耗時的,如果我們不等待所有組裝任務完成,直接返回result,相信result中不會有數據,或者數據是不完整的。我們期待的結果是所有的數據都正常組裝完成,添加進result。
使用了CompletableFuture方式實現多線程分批組裝,並且在組裝時采用 “批量+緩存” 的思想,原來5000條數據30min縮短為3min。當然還有優化的空間,但是能達到這個效果已經讓我非常滿意了。
下次遇到類似的情況,我會優先考慮CompletableFuture分批組裝的方式,快捷、優雅。你們有好的方法呢?