一、CompletableFuture用法入門介紹
入門介紹的一個例子:
1 package com.cy.java8; 2 3 import java.util.Random; 4 import java.util.concurrent.CompletableFuture; 5 6 public class CompletableFutureInAction { 7 private final static Random RANDOM = new Random(System.currentTimeMillis()); 8 9 public static void main(String[] args){ 10 CompletableFuture<Double> completableFuture = new CompletableFuture<>(); 11 new Thread(() -> { 12 double value = get(); 13 completableFuture.complete(value); 14 }).start(); 15 16 System.out.println("do other things..."); 17 18 completableFuture.whenComplete((t, e) -> { 19 System.out.println("complete. value = "+ t); 20 if(e != null){ 21 e.printStackTrace(); 22 } 23 }); 24 } 25 26 private static double get(){ 27 try { 28 Thread.sleep(RANDOM.nextInt(3000)); 29 } catch (InterruptedException e) { 30 e.printStackTrace(); 31 } 32 return RANDOM.nextDouble(); 33 } 34 }
console打印:
do other things... complete. value = 0.8244376567363494
二、CompletableFuture.supplyAsync
CompletableFuture很少有直接new出來的方式去用的,一般都是通過提供的靜態方法來使用。
1.使用CompletableFuture.supplyAsync來構造CompletableFuture:
1 package com.cy.java8; 2 3 import java.util.concurrent.*; 4 5 import static com.cy.java8.CompletableFutureInAction.get; 6 7 public class CompletableFutureInAction2 { 8 9 public static void main(String[] args) { 10 /** 11 * 可以發現value=..沒有被打印,為什么呢? 12 * 因為此方法構造出來的線程是demon的,守護進程,main執行結束之后就消失了,所以 13 * 根本沒來得及執行whenComplete中的語句 14 */ 15 CompletableFuture.supplyAsync(() -> get()) 16 .whenComplete((v, e) -> { 17 System.out.println("value = " + v); 18 if (e != null) { 19 e.printStackTrace(); 20 } 21 }); 22 23 System.out.println("do other things..."); 24 } 25 26 27 }
2.要將上面whenComplete中的語句執行,進行改造:
1 package com.cy.java8; 2 3 import java.util.concurrent.*; 4 import java.util.concurrent.atomic.AtomicBoolean; 5 import static com.cy.java8.CompletableFutureInAction.get; 6 7 public class CompletableFutureInAction2 { 8 9 public static void main(String[] args) throws InterruptedException { 10 AtomicBoolean finished = new AtomicBoolean(false); 11 12 CompletableFuture.supplyAsync(() -> get()) 13 .whenComplete((v, e) -> { 14 System.out.println("value = " + v); 15 if (e != null) { 16 e.printStackTrace(); 17 } 18 finished.set(true); 19 }); 20 21 System.out.println("do other things..."); 22 23 while(!finished.get()){ 24 Thread.sleep(1); 25 } 26 } 27 28 29 }
改寫之后, main線程發現如果finished沒有變為true就會一直等1ms,直到whenComplete執行將finished變為true。
3.上面的改寫很low,其實只要將守護線程變為前台進程,main結束后不會消失就行了。
1 package com.cy.java8; 2 3 import java.util.concurrent.*; 4 import static com.cy.java8.CompletableFutureInAction.get; 5 6 public class CompletableFutureInAction2 { 7 8 public static void main(String[] args){ 9 ExecutorService executorService = Executors.newFixedThreadPool(2, r -> { 10 Thread t = new Thread(r); 11 t.setDaemon(false); //非守護線程 12 return t; 13 }); 14 15 CompletableFuture.supplyAsync(() -> get(), executorService) 16 .whenComplete((v, e) -> { 17 System.out.println("value = " + v); 18 if (e != null) { 19 e.printStackTrace(); 20 } 21 }); 22 23 System.out.println("do other things..."); 24 25 //main執行結束之后,executorService線程不會結束,需要手動shutdown 26 executorService.shutdown(); 27 } 28 29 30 }
三、thenApply:
1 package com.cy.java8; 2 3 import java.util.concurrent.CompletableFuture; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 7 public class CompletableFutureInAction3 { 8 9 public static void main(String[] args) { 10 ExecutorService executor = Executors.newFixedThreadPool(2, r -> { 11 Thread t = new Thread(r); 12 t.setDaemon(false); 13 return t; 14 }); 15 16 /** 17 * 將執行完的結果再*100 18 */ 19 CompletableFuture.supplyAsync(CompletableFutureInAction::get, executor) 20 .thenApply(v -> multiply(v)) 21 .whenComplete((v, e) -> System.out.println(v)); 22 } 23 24 private static double multiply(double value){ 25 try { 26 Thread.sleep(1000); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 return value * 100; 31 } 32 33 }
console打印:
43.15351824222534
四、CompletableFuture.join()
1 package com.cy.java8; 2 3 import java.util.Arrays; 4 import java.util.List; 5 import java.util.concurrent.CompletableFuture; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 import java.util.stream.Collectors; 9 10 public class CompletableFutureInAction3 { 11 12 public static void main(String[] args) { 13 ExecutorService executor = Executors.newFixedThreadPool(2, r -> { 14 Thread t = new Thread(r); 15 t.setDaemon(false); 16 return t; 17 }); 18 19 /** 20 * 需求:將一組商品列表里面的每個商品對應的價格查詢出來,並將這個價格*100. 21 * 5個商品同時並發去做這件事 22 * 23 * CompletableFuture.join():等到所有的結果都執行結束,會返回CompletableFuture自己本身 24 * 執行完的結果,等於get()返回的結果。 25 */ 26 List<Integer> productionIDs = Arrays.asList(1, 2, 3, 4, 5); //待查的一組商品列表的ID 27 List<Double> priceList = productionIDs.stream().map(id -> CompletableFuture.supplyAsync(() -> queryProduction(id), executor)) 28 .map(future -> future.thenApply(price -> multiply(price))) 29 .map(multiplyFuture -> multiplyFuture.join()) 30 .collect(Collectors.toList()); 31 System.out.println(priceList); 32 33 /** 34 * 按照以前,要5個分別for循環去查詢 35 * 或者分多個線程去查詢,再將每個線程查詢的結果匯總,等到全部線程都執行完了,結果也就出來了 36 */ 37 } 38 39 private static double multiply(double value) { 40 try { 41 Thread.sleep(1000); 42 } catch (InterruptedException e) { 43 e.printStackTrace(); 44 } 45 return value * 100; 46 } 47 48 /** 49 * 模擬 根據商品id查詢對應的價格 50 * @param id 51 * @return 52 */ 53 private static double queryProduction(int id){ 54 return CompletableFutureInAction.get(); 55 } 56 }
console打印:
[90.93730009374265, 23.65282935900653, 17.415066430776815, 16.605197824452343, 60.143109082288206]
說明:這里多個任務同時執行,最終把結果匯總到一起 ,這種都是並行去執行的,編寫代碼也比較簡潔,不需要考慮多線程之間的一些交互、鎖、多線程之間的通信、控制,都不需要去關心。
五、CompletableFuture的常用API介紹
supplyAsync
thenApply
whenComplete
handle
thenRun
thenAccept
thenCompose
thenCombine
thenAcceptBoth
runAfterBoth
applyToEither
acceptEither
runAfterEither
anyOf
allOf
1)supplyAsync、thenApply、whenComplete前面的代碼已經介紹了。
1 package com.cy.java8; 2 3 import java.util.concurrent.CompletableFuture; 4 5 public class CompletableFutureInAction4 { 6 public static void main(String[] args) throws InterruptedException { 7 CompletableFuture.supplyAsync(() -> 1) 8 .thenApply(v -> Integer.sum(v,10)) 9 .whenComplete((v, e) -> System.out.println(v)); 10 11 Thread.sleep(1000); 12 } 13 }
2)whenCompleteAsync: whenComplete是同步的方式,如果對於結果的處理是比較占時間的,不想通過這種同步的方式去做,可以用whenCompleteAsync進行異步操作。
3)handle:和thenApply差不多,只是多了一個對於異常的考慮。
1 package com.cy.java8; 2 3 import java.util.concurrent.CompletableFuture; 4 5 public class CompletableFutureInAction4 { 6 public static void main(String[] args) throws InterruptedException { 7 CompletableFuture.supplyAsync(() -> 1) 8 .handle((v, e) -> Integer.sum(v, 10)) 9 .whenComplete((v, e) -> System.out.println(v)); 10 11 Thread.sleep(1000); 12 } 13 }
4)thenRun:如果想在completableFuture整個執行結束之后,還想進行一個操作,可以thenRun(Runnable r)
1 package com.cy.java8; 2 3 import java.util.concurrent.CompletableFuture; 4 5 public class CompletableFutureInAction4 { 6 public static void main(String[] args) throws InterruptedException { 7 CompletableFuture.supplyAsync(() -> 1) 8 .handle((v, e) -> Integer.sum(v, 10)) 9 .whenComplete((v, e) -> System.out.println(v)) 10 .thenRun(()-> System.out.println("thenRunning...")); 11 12 Thread.sleep(1000); 13 } 14 }
11 thenRunning...
5)thenAccept: thenAccept(Consumer c)里面傳的是consumer,對執行結果進行消費,不會對執行結果進行任何操作。
1 package com.cy.java8; 2 3 import java.util.concurrent.CompletableFuture; 4 5 public class CompletableFutureInAction4 { 6 public static void main(String[] args) throws InterruptedException { 7 CompletableFuture.supplyAsync(() -> 1) 8 .thenApply(v -> Integer.sum(v, 10)) 9 .thenAccept(System.out::println); 10 11 Thread.sleep(1000); 12 } 13 }
11
6)thenCompose: 對執行結果再交給另外一個CompletableFuture,它再去對這個執行結果進行另外的計算。compose:組合,組合設計模式。
1 package com.cy.java8; 2 3 import java.util.concurrent.CompletableFuture; 4 5 public class CompletableFutureInAction4 { 6 public static void main(String[] args) throws InterruptedException { 7 CompletableFuture.supplyAsync(() -> 1) 8 .thenCompose(v -> CompletableFuture.supplyAsync(() -> v * 10)) 9 .thenAccept(System.out::println); 10 11 Thread.sleep(1000); 12 } 13 }
10
7)thenCombine: thenCombine(CompletableFuture extends CompletionStage, BiFuntion)
CompletableFuture的計算結果v1作為BiFunction的第1個入參,thenCombine中的第一個參數CompletableFuture的計算結果v2作為BiFunction的第2個入參,biFunction進行操作然后返回結果。
1 package com.cy.java8; 2 3 import java.util.concurrent.CompletableFuture; 4 5 public class CompletableFutureInAction4 { 6 public static void main(String[] args) throws InterruptedException { 7 CompletableFuture.supplyAsync(() -> 1) 8 .thenCombine(CompletableFuture.supplyAsync(() -> 2.0), (v1, v2) -> v1 + v2) 9 .thenAccept(System.out::println); 10 11 Thread.sleep(1000); 12 } 13 }
3.0
8)thenAcceptBoth: 和thenCombine差不多,只不過它的第二個參數是BiConsumer
1 package com.cy.java8; 2 3 import java.util.concurrent.CompletableFuture; 4 5 public class CompletableFutureInAction4 { 6 public static void main(String[] args) throws InterruptedException { 7 CompletableFuture.supplyAsync(() -> 1) 8 .thenAcceptBoth(CompletableFuture.supplyAsync(() -> 2.0), (v1, v2) -> { 9 System.out.println("value=" + (v1 + v2)); 10 }); 11 12 Thread.sleep(1000); 13 } 14 }
value=3.0
9)runAfterBoth:兩個CompletableFuture都執行結束之后,run
1 package com.cy.java8; 2 3 import java.util.concurrent.CompletableFuture; 4 5 public class CompletableFutureInAction5 { 6 public static void main(String[] args) throws InterruptedException { 7 8 CompletableFuture.supplyAsync(() -> { 9 System.out.println(Thread.currentThread().getName() + " is running"); 10 return 1; 11 }).runAfterBoth(CompletableFuture.supplyAsync(() -> { 12 System.out.println(Thread.currentThread().getName() + " is running too"); 13 return 2; 14 }), () -> System.out.println("both done")); 15 16 Thread.sleep(1000); 17 } 18 }
ForkJoinPool.commonPool-worker-9 is running ForkJoinPool.commonPool-worker-9 is running too both done
10)applyToEither
applyToEither:兩個CompletableFuture只要有1個執行完了,就將這個CompletableFuture交給Function。誰先執行完就將誰交給Function去執行
1 package com.cy.java8; 2 3 import java.util.concurrent.CompletableFuture; 4 5 public class CompletableFutureInAction5 { 6 public static void main(String[] args) throws InterruptedException { 7 8 CompletableFuture.supplyAsync(() -> { 9 try { 10 Thread.sleep(900); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 System.out.println("I am future 1"); 15 return 1; 16 }).applyToEither(CompletableFuture.supplyAsync(() -> { 17 try { 18 Thread.sleep(50); 19 } catch (InterruptedException e) { 20 e.printStackTrace(); 21 } 22 System.out.println("I am future 2"); 23 return 2; 24 }), v -> { 25 System.out.println("value = " + v); 26 return v * 10; 27 }).thenAccept(System.out::println); 28 29 30 Thread.currentThread().join(); 31 } 32 }
I am future 2 value = 2 20 I am future 1
11)acceptEither
acceptEither:acceptEither(CompletableFuture extends CompletionStage, Consumer), 兩個CompletableFuture誰先執行完成,就將誰的結果交給consumer執行。
1 package com.cy.java8; 2 3 import java.util.concurrent.CompletableFuture; 4 5 public class CompletableFutureInAction5 { 6 public static void main(String[] args) throws InterruptedException { 7 8 CompletableFuture.supplyAsync(() -> { 9 try { 10 Thread.sleep(900); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 System.out.println("I am future 1"); 15 return 1; 16 }).acceptEither(CompletableFuture.supplyAsync(() -> { 17 try { 18 Thread.sleep(50); 19 } catch (InterruptedException e) { 20 e.printStackTrace(); 21 } 22 System.out.println("I am future 2"); 23 return 2; 24 }), v -> System.out.println("value = " + v)); 25 26 Thread.currentThread().join(); 27 } 28 }
I am future 2 value = 2 I am future 1
12)runAfterEither
runAfterEither: runAfterEither(CompletionStage, Runnable),只要有一個CompletableFuture執行完了,就執行run
1 package com.cy.java8; 2 3 import java.util.concurrent.CompletableFuture; 4 5 public class CompletableFutureInAction5 { 6 public static void main(String[] args) throws InterruptedException { 7 8 CompletableFuture.supplyAsync(() -> { 9 try { 10 Thread.sleep(900); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 System.out.println("I am future 1"); 15 return 1; 16 }).runAfterEither(CompletableFuture.supplyAsync(() -> { 17 try { 18 Thread.sleep(50); 19 } catch (InterruptedException e) { 20 e.printStackTrace(); 21 } 22 System.out.println("I am future 2"); 23 return 2; 24 }), () -> System.out.println("done.")); 25 26 Thread.currentThread().join(); 27 } 28 }
I am future 2 done. I am future 1
13)allOf
allOf(CompletableFuture<?>... cfs),返回值是CompletableFuture<Void>。要等所有的CompletableFuture都執行完成,才能執行下一步動作。
1 package com.cy.java8; 2 3 import java.util.Arrays; 4 import java.util.List; 5 import java.util.Random; 6 import java.util.concurrent.CompletableFuture; 7 import java.util.stream.Collectors; 8 9 public class CompletableFutureInAction5 { 10 private final static Random RANDOM = new Random(System.currentTimeMillis()); 11 12 public static void main(String[] args) throws InterruptedException { 13 List<CompletableFuture<Double>> list = Arrays.asList(1, 2, 3, 4).stream() 14 .map(i -> CompletableFuture.supplyAsync(CompletableFutureInAction5::get)) 15 .collect(Collectors.toList()); 16 17 //要等所有的CompletableFuture這些task執行完了,才會打印done. 18 CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()])) 19 .thenRun(() -> System.out.println("done.")); 20 21 Thread.currentThread().join(); 22 } 23 24 static double get() { 25 try { 26 Thread.sleep(RANDOM.nextInt(3000)); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 double result = RANDOM.nextDouble(); 31 System.out.println(result); 32 return result; 33 } 34 }
0.6446554001163166 0.24435437709196395 0.18251850071600362 0.5261702037394511 done.
14)anyOf
和allOf相反,只要有一個CompletableFuture執行完成,就會執行下一步動作
1 package com.cy.java8; 2 3 import java.util.Arrays; 4 import java.util.List; 5 import java.util.Random; 6 import java.util.concurrent.CompletableFuture; 7 import java.util.stream.Collectors; 8 9 public class CompletableFutureInAction5 { 10 private final static Random RANDOM = new Random(System.currentTimeMillis()); 11 12 public static void main(String[] args) throws InterruptedException { 13 List<CompletableFuture<Double>> list = Arrays.asList(1, 2, 3, 4).stream() 14 .map(i -> CompletableFuture.supplyAsync(CompletableFutureInAction5::get)) 15 .collect(Collectors.toList()); 16 17 //只要有一個CompletableFuture執行完了,就會打印done. 18 CompletableFuture.anyOf(list.toArray(new CompletableFuture[list.size()])) 19 .thenRun(() -> System.out.println("done.")); 20 21 Thread.currentThread().join(); 22 } 23 24 static double get() { 25 try { 26 Thread.sleep(RANDOM.nextInt(3000)); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 double result = RANDOM.nextDouble(); 31 System.out.println(result); 32 return result; 33 } 34 }
0.1334361442807943 done. 0.6715112881360222 0.12945359790698785 0.1307762755130788
----