使用CompletableFuture實現多個異步任務並行完成后合並結果


業務場景

需要同時從多個副本數據庫中查詢數據,並對查詢結果進行合並去重處理后返回前端。

實現過程涉及多數據源切換,這里不作過多討論。

 

編碼實現

實現過程:

1、定義異步查詢數據方法;

2、通過CompletableFuture的allOf方法對多個異步執行結果進行處理;

public class CompletableFutureTests {

    @Autowired
    private UserDao userDao;

    @Test
    public void testSomeTaskAndJoin() throws Exception {

        // DynamicDataSourceContextHolder.dataSourceIds根據動態數據源數量
        // 異步執行每個數據源查詢方法
        // 返回一個Future集合
        List<CompletableFuture<List<User>>> futures = DynamicDataSourceContextHolder.dataSourceIds.stream()
                .map(this::queryUsers).collect(Collectors.toList());

        // 多個異步執行結果合並到該集合
        List<User> futureUsers = new ArrayList<>();

        // 通過allOf對多個異步執行結果進行處理
        CompletableFuture allFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                                .whenComplete((v, t) -> {
                                    // 所有CompletableFuture執行完成后進行遍歷
                                    futures.forEach(future -> {
                                        synchronized (this) {
                                            // 查詢結果合並
                                            futureUsers.addAll(future.getNow(null));
                                        }
                                    });
                                });


        // 阻塞等待所有CompletableFuture執行完成
        allFuture.get();
        // 對合並后的結果集進行去重處理
        List<User> result = futureUsers.stream().distinct().collect(Collectors.toList());

        log.info(result.toString());

    }

    /**
     * 用戶異步查詢方法
     * @param datasourceKey 動態數據源Key
     * @return
     */
    public CompletableFuture<List<User>> queryUsers(String datasourceKey) {

        // 定義異步查詢Future對象
        CompletableFuture<List<User>> queryFuture = CompletableFuture.supplyAsync(() -> {
            // 切換數據源
            DynamicDataSourceContextHolder.setDataSourceRouterKey(datasourceKey);
            // 執行ORM查詢方法
            return userDao.selectAll();
        });

        // 異步完成執行方法
        queryFuture.whenCompleteAsync(new BiConsumer<List<User>, Throwable>() {
            @Override
            public void accept(List<User> users, Throwable throwable) {
                // 這里主要記錄異步執行結果
                log.info("數據源[{}]查詢完成,查詢記錄[{}]條", datasourceKey, users.size());
            }
        });

        // 返回future對象
        return queryFuture;
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM