JDK 8的CompletionService相對於之前版本的Future而言,其優勢是能夠盡可能快的得到執行完成的任務。例如有4個並發任務要執行,正常情況下通過Future.get()獲取,通常只能按照提交的順序獲得結果,如果最后提交的最先完成的話,總執行時間會長很多。而通過CompletionService能夠降低總執行時間,如下所示:
package com.hundsun.ta.base.service; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** * @author zjhua * @description * @date 2020/1/28 21:07 */ public class CompletionServiceTest { public static void main(String[] args) throws ExecutionException, InterruptedException { testFuture(); testCompletionService(); } //結果的輸出和線程的放入順序 有關(如果前面的沒完成,就算后面的哪個完成了也得等到你的牌號才能輸出!),so阻塞耗時 public static void testFuture() throws InterruptedException, ExecutionException { long beg = System.currentTimeMillis(); System.out.println("testFuture()開始執行:" + beg); ExecutorService executor = Executors.newCachedThreadPool(); List<Future<String>> result = new ArrayList<Future<String>>(); for (int i = 5; i > 0; i--) { Future<String> submit = executor.submit(new Task(i)); result.add(submit); } executor.shutdown(); for (int i = 0; i < 5; i++) {//一個一個等待返回結果 Thread.sleep(500); System.out.println("線程" + i + "執行完成:" + result.get(i).get()); } System.out.println("testFuture()執行完成:" + System.currentTimeMillis() + "," + (System.currentTimeMillis()-beg)); } //結果的輸出和線程的放入順序 無關(誰完成了誰就先輸出!主線程總是能夠拿到最先完成的任務的返回值,而不管它們加入線程池的順序),so很大大縮短等待時間 private static void testCompletionService() throws InterruptedException, ExecutionException { long beg = System.currentTimeMillis(); System.out.println("testFuture()開始執行:" + beg); ExecutorService executor = Executors.newCachedThreadPool(); ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executor); for (int i = 5; i > 0; i--) { completionService.submit(new Task(i)); } executor.shutdown(); for (int i = 0; i < 5; i++) { // 檢索並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則等待。 Future<String> future = completionService.take(); //這一行沒有完成的任務就阻塞 Thread.sleep(500); System.out.println("線程" + i + "執行完成:" + future.get()); // 這一行在這里不會阻塞,引入放入隊列中的都是已經完成的任務 } System.out.println("testFuture()執行完成:" + System.currentTimeMillis() + "," + (System.currentTimeMillis() - beg)); } private static class Task implements Callable<String> { private volatile int i; public Task(int i) { this.i = i; } @Override public String call() throws Exception { Thread.sleep(i*500); return "任務 : " + i; } } }
// 執行結果 testFuture()開始執行:1580217876088 線程0執行完成:任務 : 5 線程1執行完成:任務 : 4 線程2執行完成:任務 : 3 線程3執行完成:任務 : 2 線程4執行完成:任務 : 1 testFuture()執行完成:1580217880596,4508 testFuture()開始執行:1580217880596 線程0執行完成:任務 : 1 線程1執行完成:任務 : 2 線程2執行完成:任務 : 3 線程3執行完成:任務 : 4 線程4執行完成:任務 : 5 testFuture()執行完成:1580217883605,3009
使用傳統的Future,需要執行4.5秒,使用CompleteService,則只需要3秒。但是如果子線程執行完成后不需要執行其他任務,則意義不是很大。
除了上述場景外,CompleteService還適合於N選1的場景,例如同時從兩個渠道查詢數據,返回任何一個可用的即可,從Future就實現不了。
CompletionService的定義如下:
其實現也比較簡單,利用了ThreadPoolExecutor。
看完CompleteService,再來看CompleteFuture。它實現了Future接口和CompletionStage接口(他代表某個異步或同步計算的階段,也就是計算流水線的一個節點,這樣多個CompletionStage可以作為和過濾器一樣鏈式執行,一個計算單元完成后出發下一個計算單元),和CompleteService的區別在於CompleteFuture知道當前完成的是誰,並采用編程式回調提高代碼可讀性,CompleteService只知道哪個最快完成了,具體是誰需要應用自己去關聯上下文。同時在編程模式上,很大程度上利用了JDK 8的Lambda表達式,這樣一個完整服務的多個步驟就能夠和同步的的寫法一樣自然,不用為了實現異步處理而將邏輯合並為一個超大的方法。在並行處理中,如果每個分片的處理時間相差比較大,例如有些1分鍾,有些3分鍾,有些10秒鍾,這樣將每個服務的粒度細分為很多個子步驟,每個服務的子步驟通過CompleteFuture串聯起來,整體的完成時間就能夠下降,每個分片的處理完成時間也將趨於接近。同時在異常的處理上,CompleteFuture也要友好的多。
下面來看一個例子:
static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() { int count = 1; @Override public Thread newThread(Runnable runnable) { return new Thread(runnable, "custom-executor-" + count++); } }); static void thenApplyAsyncWithExecutorExample() {
// 簡單的異步執行 CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { assertTrue(Thread.currentThread().getName().startsWith("custom-executor-")); assertFalse(Thread.currentThread().isDaemon()); randomSleep(); return s.toUpperCase(); }, executor); assertNull(cf.getNow(null)); assertEquals("MESSAGE", cf.join()); }
異常處理:
static void completeExceptionallyExample() { CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); CompletableFuture<String> exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; }); cf.completeExceptionally(new RuntimeException("completed exceptionally")); // 模擬拋出異常 assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally()); try { cf.join(); fail("Should have thrown an exception"); } catch(CompletionException ex) { // just for testing assertEquals("completed exceptionally", ex.getCause().getMessage()); } assertEquals("message upon cancel", exceptionHandler.join()); }
鏈式調用:
public void completableFutureApplyAsync() { ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2); ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(); CompletableFuture<Integer> completableFuture = CompletableFuture .supplyAsync(this::findAccountNumber,newFixedThreadPool)//will run on thread obtain from newFixedThreadPool .thenApplyAsync(this::calculateBalance,newSingleThreadScheduledExecutor) //will run on thread obtain from newSingleThreadScheduledExecutor .thenApplyAsync(this::notifyBalance);//will run on thread obtain from common pool Integer balance = completableFuture.join(); assertEquals(Integer.valueOf(balance), Integer.valueOf(100)); }
就實際應用而言,CompletableFuture的作用更加有價值的地方在於其他的一些方法,比如allOf、anyOf、xxxToEither等需要多對一的場景,他們可以大大簡化代碼。
參考:
https://dzone.com/articles/20-examples-of-using-javas-completablefuture