業務場景
需要同時從多個副本數據庫中查詢數據,並對查詢結果進行合並去重處理后返回前端。
實現過程涉及多數據源切換,這里不作過多討論。
編碼實現
實現過程:
1、定義異步查詢數據方法;
2、通過CompletableFuture的allOf方法對多個異步執行結果進行處理;
public class CompletableFutureTests { @Autowired private UserDao userDao; @Test public void testSomeTaskAndJoin() throws Exception { // DynamicDataSourceContextHolder.dataSourceIds根據動態數據源數量 // 異步執行每個數據源查詢方法 // 返回一個Future集合 List<CompletableFuture<List<User>>> futures = DynamicDataSourceContextHolder.dataSourceIds.stream() .map(this::queryUsers).collect(Collectors.toList()); // 多個異步執行結果合並到該集合 List<User> futureUsers = new ArrayList<>(); // 通過allOf對多個異步執行結果進行處理 CompletableFuture allFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) .whenComplete((v, t) -> { // 所有CompletableFuture執行完成后進行遍歷 futures.forEach(future -> { synchronized (this) { // 查詢結果合並 futureUsers.addAll(future.getNow(null)); } }); }); // 阻塞等待所有CompletableFuture執行完成 allFuture.get(); // 對合並后的結果集進行去重處理 List<User> result = futureUsers.stream().distinct().collect(Collectors.toList()); log.info(result.toString()); } /** * 用戶異步查詢方法 * @param datasourceKey 動態數據源Key * @return */ public CompletableFuture<List<User>> queryUsers(String datasourceKey) { // 定義異步查詢Future對象 CompletableFuture<List<User>> queryFuture = CompletableFuture.supplyAsync(() -> { // 切換數據源 DynamicDataSourceContextHolder.setDataSourceRouterKey(datasourceKey); // 執行ORM查詢方法 return userDao.selectAll(); }); // 異步完成執行方法 queryFuture.whenCompleteAsync(new BiConsumer<List<User>, Throwable>() { @Override public void accept(List<User> users, Throwable throwable) { // 這里主要記錄異步執行結果 log.info("數據源[{}]查詢完成,查詢記錄[{}]條", datasourceKey, users.size()); } }); // 返回future對象 return queryFuture; } }