響應式編程2----CompletableFuture詳解


completableFuture是java8之后引入的特性,也就是我們可以異步執行相關操作,並根據這個未來會到來的結果去進行操作。感覺跟js中的promise差不多。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

可以看出這個接口實現了future以及completionStage接口

future

當看到工作線程結果並非立馬就要的話,可以交給Future,然后主線程可以在這期間做一些其他事情,當需要工作線程結果時,使用get()方法進行獲取。get方法是阻塞的,也就是當你的工作線程並沒有完成時,主線程會等待結果。future無法表達任務之間的依賴關系。

 

 

 CompletableStage

CompletableStage用來表示一步過程中的一個階段,可以在另一個CompletableStage完成時做一些操作或者計算,此接口中定義了一些基本的行為,通過這些行為可以簡潔的描述非常復雜的任務

 常用方法:

  • thenApply    將上一個stage的結果轉化成新的類型或值
  • thenAccept     將上一個stage的結果進行消耗無返回值
  • thenRun       有上一個stage結果后,執行一段新的操作
  • thenCombine    結合兩個stage的結果,轉化成新的類型或值
  • thenCompose   返回一個新的completableStage,並將上一個stage的結果作為新的stage的supplier
  • exceptionally     當處理過程中遇到異常時進行的補償處理
  • handle       統一對正常結果和異常結果的處理                  

大部分方法都有一aysnc結果的,表示異步,如果不傳遞就是用默認線程池;

completableFuture

completableFuture大致可以分為三種情況:

不帶Aysnc方法:同步方法

帶Async方法,只有一個參數:異步方法,使用默認的ForkJoinPool.commonPool獲取線程池

帶Aysnc方法,有兩個參數:異步方法,切使用第二個參數指定的ExecutorService線程池

 

 

簡單使用 

        // just get the result
        CompletableFuture<String> future = CompletableFuture.completedFuture("completed");
        System.out.println(future.get());  //completed

        // init complete and get
        CompletableFuture<Object> objectCompletableFuture = new CompletableFuture<>();
        System.out.println("start the thread to complete ie");
        System.out.println(Thread.currentThread());
        new Thread(() -> {
            System.out.println("will finish in 1s");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread());
            System.out.println("finished");
            objectCompletableFuture.complete("finish");
        }).start();
        System.out.println("start to get the result");
        System.out.println(objectCompletableFuture.get());
        /*
        result
            completed
            start the thread to complete ie
            Thread[main,5,main]
            start to get the result
            will finish in 1s
            Thread[Thread-0,5,main]
            finished
            finish
         */

        // exception
        CompletableFuture<Object> base = new CompletableFuture<>();
        base.completeExceptionally(new RuntimeException("error"));
        System.out.println(base.get());
        /*
            Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: error
                at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
                at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
                at com.yang.completableFuture.CompletableFutureDemo.main(CompletableFutureDemo.java:51)
            Caused by: java.lang.RuntimeException: error
                at com.yang.completableFuture.CompletableFutureDemo.main(CompletableFutureDemo.java:50)
         */

常用創建CompletableFuture方法

// 比較特殊,入參就是返回值,也就是說可以用來執行需要其他返回值的異步任務。
public static <U> CompletableFuture<U> completedFuture(U value)

// 無返回值,使用默認線程池
public static CompletableFuture<Void> runAsync(Runnable runnable)

// 無返回值,使用自定義線程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

// 有返回值,使用默認線程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

// 有返回值,使用自定義線程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

// 無返回值,傳入的所有對象執行完畢后,會返回一個新的CompletableFuture,如果有異常,會返回空
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

執行任務完成之后,可以對結果進行額外操作

whenComplete

BiConsumer<T,U> 函數接口有兩個參數,無返回值。
Function<T,R> 函數接口有一個輸入參數,返回一個結果。

// Async,同步處理正常計算結果或異常,使用執行任務的線程來執行該方法
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
// Async,異步處理正常計算結果或異常,使用執行任務的那個線程池中的線程來執行該方法!
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
// Async,異步處理正常計算結果或異常,使用自定義線程池來執行該方法
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? superThrowable> action, Executor executor)
// 處理異常。
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

示例:

public static int mockTask(){
        System.out.println("start task");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("the mock task thread:" + Thread.currentThread().getId());
        System.out.println("task is end");
        return new Random().nextInt();
    }

    public static void sync() throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(10);
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(WhenComplete::mockTask, forkJoinPool);
        CompletableFuture<Integer> completableFuture = future.whenComplete((result, exception) -> {
            System.out.println("the when complete use thread" + Thread.currentThread().getId());
            System.out.println("the result is :" + result);
            System.out.println("teh exception is : {}" + (exception == null ? "no error" : exception));
        });
        System.out.println("the main use thread" + Thread.currentThread().getId());

        System.out.println("the future result is:" + future.get());
        System.out.println("the completableFuture result is:" + completableFuture.get());
        /*
            start task
            the main use thread1
            the mock task thread:13
            task is end
            the when complete use thread13
            the result is :87831413
            the future result is:87831413
            teh exception is : {}no error
            the completableFuture result is:87831413
         */
    }

    public static void async() throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(10);

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(WhenComplete::mockTask, forkJoinPool);
        CompletableFuture<Integer> completableFuture = future.whenCompleteAsync((result, exception) -> {
            System.out.println("the when complete use thread" + Thread.currentThread().getId());
            System.out.println("the result is :" + result);
            System.out.println("teh exception is : {}" + (exception == null ? "no error" : exception));
        });
        System.out.println("the main use thread" + Thread.currentThread().getId());

        System.out.println("the future result is:" + future.get());
        System.out.println("the completableFuture result is:" + completableFuture.get());
        /*
            start task
            the main use thread1
            the mock task thread:13
            task is end
            the when complete use thread14
            the result is :696707836
            the future result is:696707836
            teh exception is : {}no error
            the completableFuture result is:696707836
         */
    }

從上述輸出結果可以看出,whenComplete是使用處理任務的工線程繼續處理,也就是同步的,whenCompleteAsync是另外其一個線程進行處理的,在進行任務是,需要自己注入一個線程池,否則使用默認線程池,上述邏輯只會有兩個激活的線程,無法看出區別。

exceptionally

    public static void exception() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(WhenComplete::throwException);
        CompletableFuture<Integer> completableFuture = future.whenCompleteAsync((result, exception) -> {
            System.out.println("the result is :" + result);
            System.out.println("teh exception is : {}" + (exception == null ? "no error" : exception));
        });
        completableFuture.exceptionally(exception -> {
            System.out.println("cache the exception " + exception);
            return 0;
        });
        System.out.println("the future result is:" + future.get());
        System.out.println("the completableFuture result is:" + completableFuture.get());
        /*
            start to throw the exception
         */
    }

    public static void exception2() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(WhenComplete::throwException);
        CompletableFuture<Integer> completableFuture = future.whenCompleteAsync((result, exception) -> {
            System.out.println("the result is :" + result);
            System.out.println("teh exception is : {}" + (exception == null ? "no error" : exception));
        });
        completableFuture.exceptionally(exception -> {
            System.out.println("cache the exception " + exception);
            return 0;
        });
        System.out.println("the completableFuture result is:" + completableFuture.get());
        /*
            start to throw the exception
            the result is :null
            teh exception is : {}java.util.concurrent.CompletionException: java.lang.RuntimeException: error
            cache the exception java.util.concurrent.CompletionException: java.lang.RuntimeException: error
         */
    }

    public static void exception3() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(WhenComplete::throwException);
        CompletableFuture<Integer> completableFuture = future.whenCompleteAsync((result, exception) -> {
            System.out.println("the result is :" + result);
            System.out.println("teh exception is : {}" + (exception == null ? "no error" : exception));
        });
        CompletableFuture<Integer> exceptionally = completableFuture.exceptionally(exception -> {
            System.out.println("cache the exception " + exception);
            return 0;
        });
        System.out.println("the completableFuture result is:" + completableFuture.get());
        System.out.println("the exceptionally result is:" + exceptionally.get());
        /*
            start to throw the exception
            the result is :null
            teh exception is : {}java.util.concurrent.CompletionException: java.lang.RuntimeException: error
            cache the exception java.util.concurrent.CompletionException: java.lang.RuntimeException: error
         */
    }

    public static void exception4() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(WhenComplete::throwException);
        CompletableFuture<Integer> completableFuture = future.whenCompleteAsync((result, exception) -> {
            System.out.println("the result is :" + result);
            System.out.println("teh exception is : {}" + (exception == null ? "no error" : exception));
        });
        CompletableFuture<Integer> exceptionally = completableFuture.exceptionally(exception -> {
            System.out.println("cache the exception " + exception);
            return 0;
        });
        System.out.println("the exceptionally result is:" + exceptionally.get());
        /*
            start to throw the exception
            the result is :null
            teh exception is : {}java.util.concurrent.CompletionException: java.lang.RuntimeException: error
            cache the exception java.util.concurrent.CompletionException: java.lang.RuntimeException: error
            the exceptionally result is:0
         */
    }

從上述四個實例可以看出,exceptionally是捕捉異常,他會在whenComplete方法之后執行,如果去獲取已經發生異常的CompletableFuture對象,會直接拋出錯誤,但是經過exceptionally捕捉之后並返回值,會構建一個新的CompletableFuture對象,這個對象的值就是返回值。

handle

方法

//同步
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
//異步,使用原始CompletableFuture的線程
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
//異步,使用自定義線程池的線程
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

示例:

public static int mockTask(){
        System.out.println("start task");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task is end");
        return new Random().nextInt();
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(Handle::mockTask);
        CompletableFuture<Integer> future1 = future.handleAsync((result, exception) -> {
            System.out.println("the result: " + result);
            System.out.println("the error: " + (exception == null ? "no error" : exception));
            return result * 10;
        });

        System.out.println("the future result: " + future.get() );
        System.out.println("the future1 result: " + future1.get() );
        /*
            start task
            task is end
            the future result: 1544126383
            the result: 1544126383
            the error: no error
            the future1 result: -1738605354
         */
    }

handle方法就是在任務執行完畢之后,執行該方法邏輯,這個對象內部方式是需要有返回值,並且返回的對象的值就是這個方法的返回值。

thenRun

方法

// 同步
public CompletableFuture<Void> thenRun(Runnable action)
// 異步
public CompletableFuture<Void> thenRunAsync(Runnable action)
// 異步
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

示例

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Void> future1 = future.thenRun(() -> {
            System.out.println("run able");
        });
        System.out.println("future: " + future.get());
        System.out.println("future1: " + future1.get());
        /*
            run able
            future: 100
            future1: null
         */
    }

 

這個方法不消費CompletableFuture的結果,而是執行下一個任務

thenApply

方法

// 同步
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
//異步
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
// 異步
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

示例

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1).thenApply(i -> i * 2).thenApply(i -> i * 2);
System.out.println(future.get()); // 4

thenApply方法跟handle有點類似,都可以改變對象的返回結果,但不同的是handle可以捕捉異常,thenApply相當於把CompleteableFuture一個一個的連接起來,並把上一個對象的結果傳給下一個對象

thenAccept

方法

// 同步
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
// 異步
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
// 異步
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

示例:

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1);
        CompletableFuture<Void> future1 = future.thenAcceptAsync(i -> {
            System.out.println("result: " + i);
        });
        System.out.println("future: " + future.get());
        System.out.println("future1: " + future1.get());
        /*
            result: 1
            future: 1
            future1: null
         */

這個方法就是一個消費消息,無返回值,我們可以看到接受的參數就是Consumer

thenAcceptBoth

// 同步
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
// 異步
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
// 異步
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)

示例

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 200);
        CompletableFuture<Void> future2 = future.thenAcceptBoth(future1, (x, y) -> {
            System.out.println("the x : " + x);
            System.out.println("the y : " + y);
        });
        System.out.println("future: " + future.get());
        System.out.println("future1: " + future1.get());
        System.out.println("future2: " + future2.get());
        /*
            the x : 100
            the y : 200
            future: 100
            future1: 200
            future2: null
         */
    }

action就是bigConsumer,純消費的,這個方法會把兩個CompletableFuture對象的值結合起來,並接受,無返回值

runAfterBoth

方法

//runAfterBoth和上面三個的區別就是它不消費原始的CompletableFuture結果
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,  Runnable action)。

示例

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("start sleep");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end sleep");
            return 200;
        });
        future.runAfterBoth(future1, () ->{
            System.out.println("over, but no args");
        });
        System.out.println("future: " + future.get());
        System.out.println("future1: " + future1.get());
        /*
            start sleep
            future: 100
            end sleep
            over, but no args
            future1: 200        
         */
    }

從上述結果可以看出,這個方法會等待兩個對象都執行完畢,會執行,但是並不接收參數

acceptEither

方法

public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

示例

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("start sleep");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end sleep");
            return 200;
        });
        future.acceptEither(future1, (x) ->{
            System.out.println("the result: " + x);
        });
        System.out.println("future: " + future.get());
        System.out.println("future1: " + future1.get());
        /*
            run able
            future: 100
            future1: null
         */
    }

從結果可以看出,當任意一個CompletableFuture執行完畢就會執行,並且沒有返回值

applyToEither

方法

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

示例

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("start sleep");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end sleep");
            return 200;
        });
        CompletableFuture<Integer> future2 = future.applyToEither(future1, (x) -> {
            System.out.println("the result: " + x);
            return 1000;
        });
        System.out.println("future: " + future.get());
        System.out.println("future2: " + future2.get());
        System.out.println("future1: " + future1.get());
        /*
            start sleep
            the result: 100
            future: 100
            future2: 1000
            end sleep
            future1: 200
         */
    }

從上述代碼可以看出只要任意一個CompletalbeFuture對象完成,就會執行方法,並且有返回值,注意打印順序

anyof/allof

方法

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

示例

public static void all(){
        Random random = new Random();

        ForkJoinPool forkJoinPool = new ForkJoinPool(10);

        long start = System.currentTimeMillis();
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("all product detail");
            return "all product detail\n";
        }, forkJoinPool);

        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("all seller info");
            return "all seller info\n";
        }, forkJoinPool);

        CompletableFuture<String> futureC = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("all stock");
            return "all stock\n";
        }, forkJoinPool);

        CompletableFuture<String> futureD = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("all order");
            return "all order\n";
        }, forkJoinPool);

        CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureA, futureB, futureC, futureD);
        allFuture.join();
        System.out.println("all use time:" + (System.currentTimeMillis() - start));
    }

    public static void main(String[] args) {
        all();
        any();
        /*
            all seller info
            all product detail
            all order
            all stock
            all use time:1790
            any order
            any: any order
            any use time:1194
         */
    }

    public static void any(){
        Random random = new Random();

        ForkJoinPool forkJoinPool = new ForkJoinPool(10);

        long start = System.currentTimeMillis();
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("any product detail");
            return "any product detail\n";
        }, forkJoinPool);

        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("any seller info");
            return "any seller info\n";
        }, forkJoinPool);

        CompletableFuture<String> futureC = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("any stock");
            return "any stock\n";
        }, forkJoinPool);

        CompletableFuture<String> futureD = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("any order");
            return "any order\n";
        }, forkJoinPool);
        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(futureA, futureB, futureC, futureD);
        System.out.println("any: " + anyFuture.join());
        System.out.println("any use time:" + (System.currentTimeMillis() - start));
    }

從上述代碼運行結果可以知道,allof會等待所有方法執行完畢,並且無返回值,anyof會等到最快執行的那個方法執行完畢,並且接受最快執行的的那個方法的返回值

任務的依賴關系

    static void executeBase() throws ExecutionException, InterruptedException {
        CompletableFuture<Object> base = new CompletableFuture<>();
        CompletableFuture<String> future = base.thenApply(s -> s + "2").thenApply(s -> s + "3");
        base.complete("1");
        System.out.println(future.get());  // 123
    }

    static void executeFuture() throws ExecutionException, InterruptedException {
        CompletableFuture<Object> base = new CompletableFuture<>();
        CompletableFuture<String> future = base.thenApply(s -> s + "2").thenApply(s -> s + "3");
        future.complete("1");
        System.out.println(future.get());  // 1
    }

    static void printBase() throws ExecutionException, InterruptedException {
        CompletableFuture<Object> base = new CompletableFuture<>();
        CompletableFuture<String> future = base.thenApply(s -> s + "2").thenApply(s -> s + "3");
        future.complete("1");
        System.out.println(base.get());  // the thread will be block
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        executeBase();
        executeFuture();
        printBase();
    }

從上述可以簡單得出結論,completableFuture是依靠complete來進行驅動,如果沒有complete,那么不會執行后續代碼。

另外執行base的complete與future的complete的返回結果不同,分析一下源碼:

 

首先這里面有兩個比較重要的屬性,一個是Completion對象stack,這是一個CAS((Compare-and-Swap),即比較並替換,是一種實現並發算法時常用到的技術在並發程序中,如果多個線程共享一個變量,通過CAS可以在不加鎖的情況下實現對共享變量的安全的訪問)實現的無鎖並發棧,每個鏈式調用的任務都會被壓入這個棧,stack會永遠指向棧頂,另外一個就是object對象,這個就是當前CompletableFuture的結果

接下來看一下Completion

 

 其中屬性next保存了棧中下一個元素的引用

 接下來看一下thenAply的引用,可以看到這里面調用uniApplyStage方法,如果同步調用,就不傳遞線程池,異步調用就傳遞默認線程是或者調用方傳遞的線程池

 

接下來進入uniApplyStage,從截圖可以看出,如果當前CompletableFuture有結果會進入uniApplyNow

 

 接下來先進入uniApplyNow

 

 另外一種情況就是這個result為空就會進入unipush, 將這個completion放入棧,NEXT.set就是之前說的CAS壓棧

 

tryFire是一個abstract方法,接下來看一下uniApply中的實現,其實跟之前uniApplyNow類似,先判斷異常,然后進入執行,completeValue也是考略線程安全放入結果

 

 當然如果值不為空,那會進入下述方法,如果棧為空,name返回空

 

 

 

 進入postComplete

 

 調用內部執行步驟

 

一個教程:

CompletableFuture<String> base = new CompletableFuture<>();
CompletableFuture<String> future =
    base.thenApply(
        s -> {
            log.info("2");
            return s + " 2";
        });
base.thenAccept(s -> log.info(s+"a")).thenAccept(aVoid -> log.info("b"));
base.thenAccept(s -> log.info(s+"c")).thenAccept(aVoid -> log.info("d"));
base.complete("1");
log.info("base result: {}", base.get());
log.info("future result: {}", future.get());

 

 

 

第八行

 

 

第九行后

 

 

 

至此,整個對象關系如同一個執行計划,等待着base的complete那一刻。

我們再來分解下第10行的執行步驟:

  1. base.complete("1")后base里的result屬性會變成1
  2. 取base中stack(對象1)執行,出棧
  3. 取對象1中dep屬性的stack(對象2)執行,出棧
  4. 取base中stack(對象3)執行,出棧
  5. 取對象3中dep屬性的stack(對象4)執行,出棧
  6. 取base中stack(對象5)執行,出棧

base的stack(對象2、1、0)和它下面那些dep中的stack執行上順序正好是相反的,暫且稱base的stack為主stack吧,我們來畫一張更通用的關系來重點看下stack:

先執行base的棧頂Completion 2,成功后出棧。然后會檢查Completion 2中dep的stack,只要沒到棧底,則會取出棧頂壓入base的stack中,該圖則把Completion 8、7分別壓到base的stack中,然后執行棧底的Completion 6

重復這個過程,執行base的棧頂Completion 7,由於Completion 7的dep的stack為空,則直接出棧即可。接着Completion 8會被執行。

接下來處理Completion 1的過程和之前類似。

最終的執行順序是base,2,6,7,8,1,3,4,5,0


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM