JDK1.8新特性CompletableFuture總結


https://blog.csdn.net/finalheart/article/details/87615546 

 

CompletableFuture
這個completableFuture是JDK1.8版本新引入的類。下面是這個類。實現了倆接口。本身是個class。這個是Future的實現類。

使用completionStage接口去支持完成時觸發的函數和操作。

一個completetableFuture就代表了一個任務。他能用Future的方法。還能做一些之前說的executorService配合futures做不了的。

之前future需要等待isDone為true才能知道任務跑完了。或者就是用get方法調用的時候會出現阻塞。而使用completableFuture的使用就可以用then,when等等操作來防止以上的阻塞和輪詢isDone的現象出現。

 

1. 創建CompletableFuture直接new對象也成。一個completableFuture對象代表着一個任務。這個對象能跟這個任務產生聯系。下面用的complete方法意思就是這個任務完成了需要返回的結果。然后用get();方法可以獲取到。

 

2.JDK1.8使用的接口類。在本文的CompletableFuture中大量的使用了這些函數式接口。

注:這些聲明大量應用於方法的入參中。像thenApply和thenAccept這倆就是一個用Function一個用Consumer

而lambda函數正好是可以作為這些接口的實現。例如 s->{return 1;} 這個就相當於一個Function。因為有入參和返回結果。

 

(1)Function

 

(2)Consumer

 

對於前面有Bi的就是這樣的。BiConsumer就是兩個參數的。

 

(3)Predicate這個接口聲明是一個入參,返回一個boolean。

 

(4)supplier

 

3. 下面是這個類的靜態方法。帶有Async就是異步執行的意思、也是一個completableFuture對象代表着一個任務這個原則。

這種異步方法都可以指定一個線程池作為任務的運行環境。如果沒有指定就會使用ForkJoinPool線程池來執行、

 

(1)supplyAsync&runAsync的使用例子。

public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println("executorService 是否為守護線程 :" + Thread.currentThread().isDaemon());
return null;
}
});
final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("this is lambda supplyAsync");
System.out.println("supplyAsync 是否為守護線程 " + Thread.currentThread().isDaemon());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("this lambda is executed by forkJoinPool");
return "result1";
});
final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("this is task with executor");
System.out.println("supplyAsync 使用executorService 時是否為守護線程 : " + Thread.currentThread().isDaemon());
return "result2";
}, executorService);
System.out.println(completableFuture.get());
System.out.println(future.get());
executorService.shutdown();
}


注意:這些任務中。帶有supply是持有返回值的,run是void返回值的。在玩supply時發現一個問題如果使用supplyAsync任務時不使用任務的返回值。即不用get方法阻塞主線程會導致任務執行中斷。(注:跟get方法無關,后面有答案)

 

 

然后我開始探索是否是只有supplyAsync是這樣。我測試了runAsync發現也是這樣。

 

下圖為與supplyAsync任務執行不全面一樣的問題。我甚至測試了將lambda換成runnable發現無濟於事。

 

答案:
造成這個原因是因為Daemon。因為completableFuture這套使用異步任務的操作都是創建成了守護線程。那么我們沒有調用get方法不阻塞這個主線程的時候。主線程執行完畢。所有線程執行完畢就會導致一個問題,就是守護線程退出。那么我們沒有執行的代碼就是因為主線程不再跑任務而關閉導致的。可能這個不叫問題,因為在開發中我們主線程常常是一直開着的。但是這個小問題同樣讓我想了好久。下面我們開一個非守護線程,可以看到程序執行順利。

 

下面證實守護線程在其他非守護線程全部退出的情況下不繼續執行。

final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("this is lambda supplyAsync");
System.out.println("supplyAsync 是否為守護線程 " + Thread.currentThread().isDaemon());
try {
TimeUnit.SECONDS.sleep(1);
try(BufferedWriter writer = new BufferedWriter
(new OutputStreamWriter(new FileOutputStream(new File("/Users/zhangyong/Desktop/temp/out.txt"))))){
writer.write("this is completableFuture daemon test");
}catch (Exception e){
System.out.println("exception find");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("this lambda is executed by forkJoinPool");
return "result1";
});
這個代碼就是操作本地文件。並且sleep了一秒。其他線程就一句控制台輸出的代碼。最終的結果是文件沒有任何變化。

當我把主線程sleep 5秒時。本地文件會寫入一句 this is completableFuture daemon test 驗證成功。

(2)allOf&anyOf

這兩個方法的入參是一個completableFuture組、allOf就是所有任務都完成時返回。但是是個Void的返回值。

anyOf是當入參的completableFuture組中有一個任務執行完畢就返回。返回結果是第一個完成的任務的結果。

public static void otherStaticMethod() throws ExecutionException, InterruptedException {
final CompletableFuture<String> futureOne = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("futureOne InterruptedException");
}
return "futureOneResult";
});
final CompletableFuture<String> futureTwo = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
System.out.println("futureTwo InterruptedException");
}
return "futureTwoResult";
});
CompletableFuture future = CompletableFuture.allOf(futureOne, futureTwo);
System.out.println(future.get());
// CompletableFuture completableFuture = CompletableFuture.anyOf(futureOne, futureTwo);
// System.out.println(completableFuture.get());
}

 


(3)completedFuture這個方法我沒懂他是干啥的。源碼就是返回一個值。感覺沒啥意義。

 

(4)取值方法,除了get還有一個getNow(); 這個就比較特殊了。 這個方法是執行這個方法的時候任務執行完了就返回任務的結果,如果任務沒有執行完就返回你的入參。

 

(5)join方法跟線程的join用法差不多。

 

(6)whenXXX,在一個任務執行完成之后調用的方法。

這個有三個名差不多的方法。whenComplete、whenCompleteAsync、還有一個是whenCompleteAsync用自定義Executor

 

首先看一下這個whenComplete實例方法。這個就是任務執行完畢調用的,傳入一個action。這個方法的執行線程是當前線程,意味着會阻塞當前線程。下面圖中test的輸出跟whenComplete方法運行的線程有關。運行到main線程就會阻塞test的輸出。運行的是completableFuture線程則不會阻塞住test的輸出。

 

下面是任務執行的線程的探索。

 

 

根據測試得出的結論是:如果調用whenComplete的中途,還發生了其他事情,圖中的主線程的sleep(400);導致completableFuture這個任務執行完畢了,那么就使用主線程調用。如果調用的中途沒有發生其他任務且在觸碰到whenComplete方法時completableFuture這個任務還沒有徹底執行完畢那么就會用completableFuture這個任務所使用的線程。

 

下面是whenCompleteAsync方法。這個方法就是新創建一個異步線程執行。所以不會阻塞。

 

(7) then。方法瞅着挺多的。實際上就是異不異步和加不加自定義Executor.

 

注:whenComplete中出現的問題在then中測試不存在、使用的就是上一個任務的線程。這個thenCompose就是一個任務執行完之后可以用它的返回結果接着執行的方法。方法返回的是另一個你期盼泛型的結果、

compose理解就是上一個任務結果是then的一部分。

 

下面介紹一下thenCombine。這個combine的理解就是結合兩個任務的結果。

 

綜上:這個線程的問題並不是大問題,只要你不用線程來做判斷條件。他並不會影響你的效率。試想pool線程都執行完了就用主線程跑唄。沒跑完,而使你等了那你就用pool線程唄。

thenRun就是這個任務運行完,再運行下一個任務。感覺像是join了一下。

 

其余不再介紹。大同小異。

像thenApply(Function);這樣的就是有入參有返回值類型的。

像thenAccept(Consumer);這樣的就是有入參,但是沒有返回值的。詳情在上文中有過關於函數式接口的敘述。
————————————————
版權聲明:本文為CSDN博主「finalheart」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/finalheart/article/details/87615546


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM