使用 CompletableFuture 異步組裝數據


使用 CompletableFuture 異步組裝數據

一種快捷、優雅的異步組裝數據方式

實際項目中經常遇到這種情況: 從多個表中查找到數據然后拼裝成一個VO返回給前端。
這個過程有可能會非常耗時。因為最終每一條返回的VO數據是由多個表中的數據拼裝而成,如果項目還是微服務需要從其他服務獲取數據,那將會更加耗時,更加麻煩。簡單的幾十條、幾百條數據單個線程跑起來可能沒有什么壓力,但是當數量達到成千上萬,幾十萬,幾百萬,組裝的邏輯也變得非常復雜時,這個操作就非常耗時。

最近我在項目中就遇到這個的情況。項目中我們需要做一個相關流程數據的下載功能。
最初版本使用單線程,因為業務的復雜性,5000多條數據完全下載下來需要30min。以為是從數據庫分揀數據比較耗時,查詢日志后發現數據庫查詢並沒有耗時多久,反而是組裝數據占用了大多數時間。

因此機智的我就想起之前同組小伙伴分享的Java8一個新的類CompletableFuture。

CompletableFuture 簡介

CompletableFuture 是Java 8 新增加的Api,該類實現,Future和CompletionStage兩個接口,提供了非常強大的Future的擴展功能,可以幫助我們簡化異步編程的復雜性,提供了函數式編程的能力,可以通過回調的方式處理計算結果,並且提供了轉換和組合CompletableFuture的方法。

具體大家可以查看Java Api 文檔,或者閱讀網上一些博客。

CompletableFuture 異步組裝數據

代碼示例如下

/**
     * 功能描述: 拼裝數據
     * @author lkb
     * @date 2019/12/25
     * @param
     * @return java.util.List<com.laidian.erp.crm.vo.DeviceProcessListExportVO>
     */
    private List<DeviceProcessListExportVO> listByFlowJobIds(List<String> flowJobIds, Map<String, ProcessInfoVo> map, Map<Integer,UserInfoDTO> userInfoDTOMap, Map<Integer,HatCity> cityMap){
        //result 列表保存組裝完成的數據
        List<DeviceProcessListExportVO> result = new LinkedList<>();
        //每次組裝100條數據
        List<List<String>> partition = Lists.partition(flowJobIds,100);
        List<CompletableFuture> futures = partition.stream().map(subList -> CompletableFuture.supplyAsync(() -> {
            //packVOs 方法就是組裝數據
            return packVOs(subList,map,userInfoDTOMap,cityMap);
        },ASYNC_IO_POOL).whenCompleteAsync((r,e)->result.addAll(r))
                        .exceptionally(e->{
                            log.error(e.getMessage(),e);
                            log.error("listByFlowJobIds error.");
                            return result;
                        })).collect(Collectors.toList());

        CompletableFuture<Void> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        log.info("任務阻塞 ");
        Instant start = Instant.now();
        //阻塞,直到所有任務結束。
        all.join();
        log.info("任務阻塞結束 耗時 = {}",ChronoUnit.MILLIS.between(start, Instant.now()));
        return result;
    }

具體步驟如下:

  1. 將原始數據按照每組100條進行拆分。(具體每組拆分多少條需要根據實際的業務情況和服務器性能,多測試一下應該就知道了)
  2. 多線程組成數據,每個線程組裝一組數據(上面拆分的100條原始數據)。packVOs 方法就是組裝數據。為了高效,我建議 在組裝數據的時候多采用批量,緩存的思想,能批量盡量批量,重復數據就盡量緩存下來。
  3. CompletableFuture.supplyAsync() 方法說明如下。第一個參數是線程需要執行的動作,第二個參數是線程執行用的Executor,可以填自定義的,也可以不填寫,不填寫程序會使用默認的執行器。

public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
返回由給定執行程序中運行的任務異步完成的新CompletableFuture,其中包含通過調用給定供應商獲得的值。

  1. whenCompleteAsync 方法含義和名字一樣,將上一步執行的結果或者異常作為參數傳給指定的參數。這里我們希望分批組裝的結果能過add進result中。
  2. exceptionally 是用來處理異常。當一個線程執行出現異常的時候應該執行怎樣的操作。
  3. all.join() 這個方法是等待所有的任務(所有的CompletableFuture)完成。組裝數據是耗時的,如果我們不等待所有組裝任務完成,直接返回result,相信result中不會有數據,或者數據是不完整的。我們期待的結果是所有的數據都正常組裝完成,添加進result。

使用了CompletableFuture方式實現多線程分批組裝,並且在組裝時采用 “批量+緩存” 的思想,原來5000條數據30min縮短為3min。當然還有優化的空間,但是能達到這個效果已經讓我非常滿意了。

下次遇到類似的情況,我會優先考慮CompletableFuture分批組裝的方式,快捷、優雅。你們有好的方法呢?


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM