Future和CompletableFuture


Future

從JDK1.5開始,提供了Future來表示異步計算的結果,一般它需要結合ExecutorService(執行者)和Callable(任務)來使用。

示例

import java.util.*;
import java.util.concurrent.*;

public class Main {

    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        Future<Integer> future = executor.submit(() -> {
            // 故意耗時
            Thread.sleep(5000);
            return new Random().nextInt(100);
        });

        System.out.println(future.get());
        System.out.println("如果get是阻塞的,則此消息在數據之后輸出");
        executor.shutdown();
    }

}

輸出

即使異步任務等待了5秒,也依然先於消息輸出,由此證明get方法是阻塞的。

Future只是個接口,實際上返回的類是FutureTask:

/**
 * 表示此任務的運行狀態。最初是NEW == 0。運行狀態僅在set、setException和cancel方法中轉換為終端狀態。
 *
 * 可能的狀態轉換:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果當前狀態是COMPLETING及其之下的狀態,則需要進入awaitDone方法阻塞等待任務完成。
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

CompletableFuture

JDk1.8引入了CompletableFuture,它實際上也是Future的實現類。這里可以得出:

1. 面試問Future和CompletableFuture的區別實際上是不嚴謹的,因為一個是接口一個是其實現類。

2. 問區別實際上是問FutureTask和CompletableFuture的區別,或者說CompletableFuture有哪些新特性,能完成Future不能完成的工作。

首先看類定義,可以看到,實現了CompletionStage接口,這個接口是所有的新特性了

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

對於CompletableFuture有四個執行異步任務的方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

1. 如果我們指定線程池,則會使用我么指定的線程池;如果沒有指定線程池,默認使用ForkJoinPool.commonPool()作為線程池。

2. supply開頭的帶有返回值,run開頭的無返回值。

1. 執行異步任務(supplyAsync / runAsync)

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return new Random().nextInt(100);
        }, executor);

        System.out.println(future.get());
        executor.shutdown();
    }

}

以上僅僅返回個隨機數,如果我們要利用計算結果進一步處理呢?

2. 結果轉換(thenApply / thenApplyAsync)

// 同步轉換
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
// 異步轉換,使用默認線程池
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
// 異步轉換,使用指定線程池
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

 

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<Integer> future = CompletableFuture
                // 執行異步任務
                .supplyAsync(() -> {
                    return new Random().nextInt(100);
                }, executor)
                // 對上一步的結果進行處理
                .thenApply(n -> {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    int res = new Random().nextInt(100);
                    System.out.println(String.format("如果是同步的,這條消息應該先輸出。上一步結果:%s,新加:%s", n, res));
                    return n + res;
                });

        System.out.println("我等了你2秒");
        System.out.println(future.get());

        executor.shutdown();
    }

}

輸出:

 

如果把thenApply換成thenApplyAsync,則會輸出:

 

處理完任務以及結果,該去消費了

3. 消費而不影響最終結果(thenAccept / thenRun / thenAcceptBoth)

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

這三種的區別是:

thenAccept:能夠拿到並利用執行結果

thenRun:不能夠拿到並利用執行結果,只是單純的執行其它任務

thenAcceptBoth:能傳入另一個stage,然后把另一個stage的結果和當前stage的結果作為參數去消費。

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<Integer> future = CompletableFuture
                // 執行異步任務
                .supplyAsync(() -> {
                    return new Random().nextInt(100);
                }, executor)
                // 對上一步的結果進行處理
                .thenApplyAsync(n -> {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    int res = new Random().nextInt(100);
                    System.out.println(String.format("如果是同步的,這條消息應該先輸出。上一步結果:%s,新加:%s", n, res));
                    return n + res;
                });
        // 單純的消費執行結果,注意這個方法是不會返回計算結果的——CompletableFuture<Void>
        CompletableFuture<Void> voidCompletableFuture = future.thenAcceptAsync(n -> {
            System.out.println("單純消費任務執行結果:" + n);
        });
        // 這個無法消費執行結果,沒有傳入的入口,只是在當前任務執行完畢后執行其它不相干的任務
        future.thenRunAsync(() -> {
            System.out.println("我只能執行其它工作,我得不到任務執行結果");
        }, executor);

        // 這個方法會接受其它CompletableFuture返回值和當前返回值
        future.thenAcceptBothAsync(CompletableFuture.supplyAsync(() -> {
            return "I'm Other Result";
        }), (current, other) -> {
            System.out.println(String.format("Current:%s,Other:%s", current, other));
        });

        System.out.println("我等了你2秒");
        System.out.println(future.get());

        executor.shutdown();
    }

}

結果:

如果我要組合兩個任務呢?

4. 組合任務(thenCombine / thenCompose)

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

這兩種區別:主要是返回類型不一樣。

thenCombine:至少兩個方法參數,一個為其它stage,一個為用戶自定義的處理函數,函數返回值為結果類型。

thenCompose:至少一個方法參數即處理函數,函數返回值為stage類型。

先看thenCombine

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<Integer> otherFuture = CompletableFuture
                // 執行異步任務
                .supplyAsync(() -> {
                    int result = new Random().nextInt(100);
                    System.out.println("任務A:" + result);
                    return result;
                }, executor);

        CompletableFuture<Integer> future = CompletableFuture
                // 執行異步任務
                .supplyAsync(() -> {
                    int result = new Random().nextInt(100);
                    System.out.println("任務B:" + result);
                    return result;
                }, executor)
                .thenCombineAsync(otherFuture, (current, other) -> {
                    int result = other + current;
                    System.out.println("組合兩個任務的結果:" + result);
                    return result;
                });

        System.out.println(future.get());

        executor.shutdown();
    }

}

執行結果:

再來看thenCompose

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<Integer> future = CompletableFuture
                // 執行異步任務
                .supplyAsync(() -> {
                    int result = new Random().nextInt(100);
                    System.out.println("任務A:" + result);
                    return result;
                }, executor)
                .thenComposeAsync((current) -> {
                    return CompletableFuture.supplyAsync(() -> {
                        int b = new Random().nextInt(100);
                        System.out.println("任務B:" + b);
                        int result = b + current;
                        System.out.println("組合兩個任務的結果:" + result);
                        return result;
                    }, executor);
                });

        System.out.println(future.get());

        executor.shutdown();
    }

}

輸出:

注意這兩個輸出雖然一樣,但是用法不一樣。

5. 快者優先(applyToEither / acceptEither)

有個場景,如果我們有多條渠道去完成同一種任務,那么我們肯定選擇最快的那個。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

這兩種區別:僅僅是一個有返回值,一個沒有(Void)

先看applyToEither

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<String> otherFuture = CompletableFuture
                .supplyAsync(() -> {
                    int result = new Random().nextInt(100);
                    System.out.println("執行者A:" + result);
                    try {
                        // 故意A慢了一些
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "執行者A【" + result + "】";
                }, executor);

        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> {
                    int result = new Random().nextInt(100);
                    System.out.println("執行者B:" + result);
                    return "執行者B【" + result + "】";
                }, executor)
                .applyToEither(otherFuture, (faster) -> {
                    System.out.println("誰最快:" + faster);
                    return faster;
                });

        System.out.println(future.get());

        executor.shutdown();
    }

}

輸出:

再看acceptEither

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<String> otherFuture = CompletableFuture
                .supplyAsync(() -> {
                    int result = new Random().nextInt(100);
                    System.out.println("執行者A:" + result);
                    try {
                        // 故意A慢了一些
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "執行者A【" + result + "】";
                }, executor);

        CompletableFuture<Void> future = CompletableFuture
                .supplyAsync(() -> {
                    int result = new Random().nextInt(100);
                    System.out.println("執行者B:" + result);
                    return "執行者B【" + result + "】";
                }, executor)
                .acceptEither(otherFuture, (faster) -> {
                    System.out.println("誰最快:" + faster);
                });

        System.out.println(future.get());

        executor.shutdown();
    }

}

輸出:

6. 異常處理(exceptionally / whenComplete / handle)

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

exceptionally

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> {
                    if (true){
                        throw new RuntimeException("Error!!!");
                    }
                    return "Hello";
                }, executor)
                // 處理上一步發生的異常
                .exceptionally(e -> {
                    System.out.println("處理異常:" + e.getMessage());
                    return "處理完畢!";
                });

        System.out.println(future.get());

        executor.shutdown();
    }

}

輸出:

whenComplete

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> {
                    if (true){
                        throw new RuntimeException("Error!!!");
                    }
                    return "Hello";
                }, executor)
                // 處理上一步發生的異常
                .whenComplete((result,ex) -> {
                    // 這里等待為了上一步的異常輸出完畢
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("上一步結果:" + result);
                    System.out.println("處理異常:" + ex.getMessage());
                });

        System.out.println(future.get());

        executor.shutdown();
    }

}

輸出結果:

可以看見,用whenComplete對異常情況不是特別友好。

handle

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> {
                    if (true){
                        throw new RuntimeException("Error!!!");
                    }
                    return "Hello";
                }, executor)
                // 處理上一步發生的異常
                .handle((result,ex) -> {
                    System.out.println("上一步結果:" + result);
                    System.out.println("處理異常:" + ex.getMessage());
                    return "Value When Exception Occurs";
                });

        System.out.println(future.get());

        executor.shutdown();
    }

}

輸出:

 

綜上,如果單純要處理異常,那就用exceptionally;如果還想處理結果(沒有異常的情況),那就用handle,比whenComplete友好一些,handle不僅能處理異常還能返回一個異常情況的默認值。

對比

Future:我們的目的都是獲取異步任務的結果,但是對於Future來說,只能通過get方法或者死循環判斷isDone來獲取。異常情況就更是難辦。

CompletableFuture:只要我們設置好回調函數即可實現:

1. 只要任務完成,即執行我們設置的函數(不用再去考慮什么時候任務完成)

2. 如果發生異常,同樣會執行我們處理異常的函數,甚至連默認返回值都有(異常情況處理更加省力)

3. 如果有復雜任務,比如依賴問題,組合問題等,同樣可以寫好處理函數來處理(能應付復雜任務的處理)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM