直接上代碼:
import com.google.common.collect.Lists; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.List; import java.util.concurrent.*; import java.util.function.BiConsumer; /** * CompletableFuture的AllOf功能測試,等待所有任務執行完 * */ public class CompletableFutureAllOfTest { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 100L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10)); method1(executor); method2(executor); method3(executor); } /** * 拆解寫法 * @param executor */ public static void method1 (ExecutorService executor) { long start = System.currentTimeMillis(); // 定義第一個任務 CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } return "cf1"; }, executor); cf1.whenComplete(new BiConsumer<String, Throwable>() { @Override public void accept(String t, Throwable u) { System.out.println("hello " + t); } }); // 定義第二個任務 CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } return "cf2"; }, executor); cf2.whenComplete(new BiConsumer<String, Throwable>() { @Override public void accept(String t, Throwable u) { System.out.println("hello " + t); } }); // 開始等待所有任務執行完成 CompletableFuture<Void> all = CompletableFuture.allOf(cf1, cf2); System.out.println("start block"); all.join(); System.out.println("block finish, consume time:" + (System.currentTimeMillis() - start)); } /** * 合並寫法 * @param executor */ public static void method2 (ExecutorService executor) { List<String> testList = Lists.newArrayList(); testList.add("cf1"); testList.add("cf2"); long start = System.currentTimeMillis(); CompletableFuture<Void> all = null; for (String str : testList) { // 定義任務 CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } return str; }, executor); cf.whenComplete(new BiConsumer<String, Throwable>() { @Override public void accept(String t, Throwable u) { System.out.println("hello " + t); } }); all = CompletableFuture.allOf(cf); } System.out.println("start block"); // 開始等待所有任務執行完成 all.join(); System.out.println("block finish, consume time:" + (System.currentTimeMillis() - start)); } /** * 通過Java8的stream實現,非常簡潔 * @param executor */ @SuppressWarnings("rawtypes") public static void method3 (ExecutorService executor) { List<String> testList = Lists.newArrayList(); testList.add("cf1"); testList.add("cf2"); long start = System.currentTimeMillis(); CompletableFuture[] cfArr = testList.stream(). map(t -> CompletableFuture .supplyAsync(() -> pause(t), executor) .whenComplete((result, th) -> { System.out.println("hello" + result); })).toArray(CompletableFuture[]::new); // 開始等待所有任務執行完成 System.out.println("start block"); CompletableFuture.allOf(cfArr).join(); System.out.println("block finish, consume time:" + (System.currentTimeMillis() - start)); } public static String pause (String name) { try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } return name; } }
參考: CompletableFuture實現異步獲取結果並且等待所有異步任務完成