Future和CompletableFuture的區別


1.Future

在執行多個任務的時候,使用Java標准庫提供的線程池是非常方便的。我們提交的任務只需要實現Runnable接口,就可以讓線程池去執行:

class Task implements Runnable {
    public String result;

    public void run() {
        this.result = longTimeCalculation(); 
    }
}

 

Runnable接口有個問題,它的方法沒有返回值。如果任務需要一個返回結果,那么只能保存到變量,還要提供額外的方法讀取,非常不便。所以,Java標准庫還提供了一個Callable接口,和Runnable接口比,它多了一個返回值:

class Task implements Callable<String> {
    public String call() throws Exception {
        return longTimeCalculation(); 
    }
}

 

並且Callable接口是一個泛型接口,可以返回指定類型的結果。

現在的問題是,如何獲得異步執行的結果?

如果仔細看ExecutorService.submit()方法,可以看到,它返回了一個Future類型,一個Future類型的實例代表一個未來能獲取結果的對象:

ExecutorService executor = Executors.newFixedThreadPool(4); 
// 定義任務:
Callable<String> task = new Task();
// 提交任務並獲得Future:
Future<String> future = executor.submit(task);
// 從Future獲取異步執行返回的結果:
String result = future.get(); // 可能阻塞

 

當我們提交一個Callable任務后,我們會同時獲得一個Future對象,然后,我們在主線程某個時刻調用Future對象的get()方法,就可以獲得異步執行的結果。在調用get()時,如果異步任務已經完成,我們就直接獲得結果。如果異步任務還沒有完成,那么get()會阻塞,直到任務完成后才返回結果。

一個Future<V>接口表示一個未來可能會返回的結果,它定義的方法有:

  • get():獲取結果(可能會等待)
  • get(long timeout, TimeUnit unit):獲取結果,但只等待指定的時間;
  • cancel(boolean mayInterruptIfRunning):取消當前任務;
  • isDone():判斷任務是否已完成。

小結

對線程池提交一個Callable任務,可以獲得一個Future對象;

可以用Future在將來某個時刻獲取結果。

2.CompletableFuture

使用Future獲得異步執行結果時,要么調用阻塞方法get(),要么輪詢看isDone()是否為true,這兩種方法都不是很好,因為主線程也會被迫等待。

從Java 8開始引入了CompletableFuture它針對Future做了改進,可以傳入回調對象,當異步任務完成或者發生異常時,自動調用回調對象的回調方法。

我們以獲取股票價格為例,看看如何使用CompletableFuture

 
         
import java.util.concurrent.CompletableFuture;

public
class Main { public static void main(String[] args) throws Exception { // 創建異步執行任務: CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice); // 如果執行成功: cf.thenAccept((result) -> { System.out.println("price: " + result); }); // 如果執行異常: cf.exceptionally((e) -> { e.printStackTrace(); return null; }); // 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉: Thread.sleep(200); } static Double fetchPrice() { try { Thread.sleep(100); } catch (InterruptedException e) { } if (Math.random() < 0.3) { throw new RuntimeException("fetch price failed!"); } return 5 + Math.random() * 20; } }
price: 7.468336731107743

 

創建一個CompletableFuture是通過CompletableFuture.supplyAsync()實現的它需要一個實現了Supplier接口的對象

public interface Supplier<T> {
    T get();
}

 

這里我們用lambda語法簡化了一下,直接傳入Main::fetchPrice,因為Main.fetchPrice()靜態方法的簽名符合Supplier接口的定義(除了方法名外)。

緊接着,CompletableFuture已經被提交給默認的線程池執行了,我們需要定義的是CompletableFuture完成時和異常時需要回調的實例。完成時,CompletableFuture會調用Consumer對象

public interface Consumer<T> {
    void accept(T t);
}

 

異常時,CompletableFuture會調用Function對象:

public interface Function<T, R> {
    R apply(T t);
}

 

這里我們都用lambda語法簡化了代碼。

可見CompletableFuture的優點是:

  • 異步任務結束時,會自動回調某個對象的方法;
  • 異步任務出錯時,會自動回調某個對象的方法;
  • 主線程設置好回調后,不再關心異步任務的執行。

如果只是實現了異步回調機制,我們還看不出CompletableFuture相比Future的優勢。CompletableFuture更強大的功能是,多個CompletableFuture可以串行執行,例如,定義兩個CompletableFuture,第一個CompletableFuture根據證券名稱查詢證券代碼,第二個CompletableFuture根據證券代碼查詢證券價格,這兩個CompletableFuture實現串行操作如下:

 
         
import java.util.concurrent.CompletableFuture;

public
class Main { public static void main(String[] args) throws Exception { // 第一個任務: CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> { return queryCode("中國石油"); }); // cfQuery成功后繼續執行下一個任務: CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> { return fetchPrice(code); }); // cfFetch成功后打印結果: cfFetch.thenAccept((result) -> { System.out.println("price: " + result); }); // 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉: Thread.sleep(2000); } static String queryCode(String name) { try { Thread.sleep(100); } catch (InterruptedException e) { } return "601857"; } static Double fetchPrice(String code) { try { Thread.sleep(100); } catch (InterruptedException e) { } return 5 + Math.random() * 20; } }
price: 21.019102834733275

 

除了串行執行外,多個CompletableFuture還可以並行執行。例如,我們考慮這樣的場景:

同時從新浪和網易查詢證券代碼,只要任意一個返回結果,就進行下一步查詢價格,查詢價格也同時從新浪和網易查詢,只要任意一個返回結果,就完成操作:

public class Main {
    public static void main(String[] args) throws Exception {
        // 兩個CompletableFuture執行異步查詢:
        CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
            return queryCode("中國石油", "https://finance.sina.com.cn/code/");
        });
        CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
            return queryCode("中國石油", "https://money.163.com/code/");
        });

        // 用anyOf合並為一個新的CompletableFuture:
        CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);

        // 兩個CompletableFuture執行異步查詢:
        CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
        });
        CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice((String) code, "https://money.163.com/price/");
        });

        // 用anyOf合並為一個新的CompletableFuture:
        CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);

        // 最終結果:
        cfFetch.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉:
        Thread.sleep(200);
    }

    static String queryCode(String name, String url) {
        System.out.println("query code from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
        }
        return "601857";
    }

    static Double fetchPrice(String code, String url) {
        System.out.println("query price from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
        }
        return 5 + Math.random() * 20;
    }
}
query code from https://money.163.com/code/...
query code from https://finance.sina.com.cn/code/...
query price from https://finance.sina.com.cn/price/...
query price from https://money.163.com/price/...
price: 6.214906451395034

上述邏輯實現的異步查詢規則實際上是:

┌─────────────┐ ┌─────────────┐
│ Query Code  │ │ Query Code  │
│  from sina  │ │  from 163   │
└─────────────┘ └─────────────┘
       │               │
       └───────┬───────┘
               ▼
        ┌─────────────┐
        │    anyOf    │
        └─────────────┘
               │
       ┌───────┴────────┐
       ▼                ▼
┌─────────────┐  ┌─────────────┐
│ Query Price │  │ Query Price │
│  from sina  │  │  from 163   │
└─────────────┘  └─────────────┘
       │                │
       └────────┬───────┘
                ▼
         ┌─────────────┐
         │    anyOf    │
         └─────────────┘
                │
                ▼
         ┌─────────────┐
         │Display Price│
         └─────────────┘

除了anyOf()可以實現“任意個CompletableFuture只要一個成功”,allOf()可以實現“所有CompletableFuture都必須成功”,這些組合操作可以實現非常復雜的異步流程控制。

最后我們注意CompletableFuture的命名規則:

  • xxx():表示該方法將繼續在已有的線程中執行;
  • xxxAsync():表示將異步在線程池中執行。

小結

CompletableFuture可以指定異步處理流程:

  • thenAccept()處理正常結果;
  • exceptional()處理異常結果;
  • thenApplyAsync()用於串行化另一個CompletableFuture
  • anyOf()allOf()用於並行化多個CompletableFuture


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM