jdk8之CompletableFuture與CompletionService


  JDK 8的CompletionService相對於之前版本的Future而言,其優勢是能夠盡可能快的得到執行完成的任務。例如有4個並發任務要執行,正常情況下通過Future.get()獲取,通常只能按照提交的順序獲得結果,如果最后提交的最先完成的話,總執行時間會長很多。而通過CompletionService能夠降低總執行時間,如下所示:

package com.hundsun.ta.base.service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author zjhua
 * @description
 * @date 2020/1/28 21:07
 */
public class CompletionServiceTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        testFuture();
        testCompletionService();
    }

    //結果的輸出和線程的放入順序 有關(如果前面的沒完成,就算后面的哪個完成了也得等到你的牌號才能輸出!),so阻塞耗時
    public static void testFuture() throws InterruptedException, ExecutionException {
        long beg = System.currentTimeMillis();
        System.out.println("testFuture()開始執行:" + beg);
        ExecutorService executor = Executors.newCachedThreadPool();
        List<Future<String>> result = new ArrayList<Future<String>>();
        for (int i = 5; i > 0; i--) {
            Future<String> submit = executor.submit(new Task(i));
            result.add(submit);
        }
        executor.shutdown();
        for (int i = 0; i < 5; i++) {//一個一個等待返回結果
            Thread.sleep(500);
            System.out.println("線程" + i + "執行完成:" + result.get(i).get());
        }
        System.out.println("testFuture()執行完成:" + System.currentTimeMillis() + "," + (System.currentTimeMillis()-beg));
    }

    //結果的輸出和線程的放入順序 無關(誰完成了誰就先輸出!主線程總是能夠拿到最先完成的任務的返回值,而不管它們加入線程池的順序),so很大大縮短等待時間
    private static void testCompletionService() throws InterruptedException, ExecutionException {
        long beg = System.currentTimeMillis();
        System.out.println("testFuture()開始執行:" + beg);
        ExecutorService executor = Executors.newCachedThreadPool();
        ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executor);
        for (int i = 5; i > 0; i--) {
            completionService.submit(new Task(i));
        }
        executor.shutdown();
        for (int i = 0; i < 5; i++) {
            // 檢索並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則等待。
            Future<String> future = completionService.take(); //這一行沒有完成的任務就阻塞
            Thread.sleep(500);
            System.out.println("線程" + i + "執行完成:" + future.get());   // 這一行在這里不會阻塞,引入放入隊列中的都是已經完成的任務
        }
        System.out.println("testFuture()執行完成:" + System.currentTimeMillis() + "," + (System.currentTimeMillis() - beg));
    }

    private static class Task implements Callable<String> {

        private volatile int i;

        public Task(int i) {
            this.i = i;
        }

        @Override
        public String call() throws Exception {
            Thread.sleep(i*500);
            return "任務 : " + i;
        }

    }
}
// 執行結果
testFuture()開始執行:1580217876088
線程0執行完成:任務 : 5
線程1執行完成:任務 : 4
線程2執行完成:任務 : 3
線程3執行完成:任務 : 2
線程4執行完成:任務 : 1
testFuture()執行完成:1580217880596,4508
testFuture()開始執行:1580217880596
線程0執行完成:任務 : 1
線程1執行完成:任務 : 2
線程2執行完成:任務 : 3
線程3執行完成:任務 : 4
線程4執行完成:任務 : 5
testFuture()執行完成:1580217883605,3009

使用傳統的Future,需要執行4.5秒,使用CompleteService,則只需要3秒。但是如果子線程執行完成后不需要執行其他任務,則意義不是很大。

除了上述場景外,CompleteService還適合於N選1的場景,例如同時從兩個渠道查詢數據,返回任何一個可用的即可,從Future就實現不了。

CompletionService的定義如下:

其實現也比較簡單,利用了ThreadPoolExecutor。

看完CompleteService,再來看CompleteFuture。它實現了Future接口和CompletionStage接口(他代表某個異步或同步計算的階段,也就是計算流水線的一個節點,這樣多個CompletionStage可以作為和過濾器一樣鏈式執行,一個計算單元完成后出發下一個計算單元),和CompleteService的區別在於CompleteFuture知道當前完成的是誰,並采用編程式回調提高代碼可讀性,CompleteService只知道哪個最快完成了,具體是誰需要應用自己去關聯上下文。同時在編程模式上,很大程度上利用了JDK 8的Lambda表達式,這樣一個完整服務的多個步驟就能夠和同步的的寫法一樣自然,不用為了實現異步處理而將邏輯合並為一個超大的方法。在並行處理中,如果每個分片的處理時間相差比較大,例如有些1分鍾,有些3分鍾,有些10秒鍾,這樣將每個服務的粒度細分為很多個子步驟,每個服務的子步驟通過CompleteFuture串聯起來,整體的完成時間就能夠下降,每個分片的處理完成時間也將趨於接近。同時在異常的處理上,CompleteFuture也要友好的多。

 

 下面來看一個例子:

static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
    int count = 1;
    @Override
    public Thread newThread(Runnable runnable) {
        return new Thread(runnable, "custom-executor-" + count++);
    }
});
static void thenApplyAsyncWithExecutorExample() {
// 簡單的異步執行 CompletableFuture
<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { assertTrue(Thread.currentThread().getName().startsWith("custom-executor-")); assertFalse(Thread.currentThread().isDaemon()); randomSleep(); return s.toUpperCase(); }, executor); assertNull(cf.getNow(null)); assertEquals("MESSAGE", cf.join()); }

異常處理:

static void completeExceptionallyExample() {
    CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase,
            CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
    CompletableFuture<String> exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
    cf.completeExceptionally(new RuntimeException("completed exceptionally"));  // 模擬拋出異常
assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
    try {
        cf.join();
        fail("Should have thrown an exception");
    } catch(CompletionException ex) { // just for testing
        assertEquals("completed exceptionally", ex.getCause().getMessage());
    }
    assertEquals("message upon cancel", exceptionHandler.join());
}

鏈式調用:

public void completableFutureApplyAsync() {
 ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
 ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
 CompletableFuture<Integer> completableFuture = 
 CompletableFuture
      .supplyAsync(this::findAccountNumber,newFixedThreadPool)//will run on thread obtain from newFixedThreadPool
      .thenApplyAsync(this::calculateBalance,newSingleThreadScheduledExecutor) //will run on thread obtain from newSingleThreadScheduledExecutor
      .thenApplyAsync(this::notifyBalance);//will run on thread obtain from common pool
   Integer balance = completableFuture.join();
    assertEquals(Integer.valueOf(balance), Integer.valueOf(100));
    }

  就實際應用而言,CompletableFuture的作用更加有價值的地方在於其他的一些方法,比如allOf、anyOf、xxxToEither等需要多對一的場景,他們可以大大簡化代碼。

參考:

https://dzone.com/articles/20-examples-of-using-javas-completablefuture


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM