Java CompletableFuture 詳解


 

 

參考文章:https://colobu.com/2016/02/29/Java-CompletableFuture/

https://www.jdon.com/50027

https://www.jianshu.com/p/f2735065a13a


public class TestMain {
/**
* @desc : Future的用法
* 雖然Future以及相關使用方法提供了異步執行任務的能力,但是對於結果的獲取卻是很不方便,
* 只能通過阻塞或者輪詢的方式得到任務的結果。阻塞的方式顯然和我們的異步編程的初衷相違背,
* 輪詢的方式又會耗費無謂的CPU資源,而且也不能及時地得到計算結果,
* 為什么不能用觀察者設計模式當計算結果完成及時通知監聽者呢
* @author : 毛會懂
* @create: 2021/11/7 13:09:00
**/
public static void main1(String[] args) throws InterruptedException, ExecutionException {
ExecutorService cachePool = Executors.newCachedThreadPool();
Future<String> future = cachePool.submit(() -> {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務完成了");
return "異步任務計算結果!";
});

System.out.println("做其他的事情");
Thread.sleep(100);
// 用法一:使用get堵塞主調用線程,知道計算完成返回結果
// String result = future.get();
// System.out.println("異步結果:"+ result);

// 用法二:使用isDone()方法檢查計算是否完成
// long start = System.currentTimeMillis();
// while (true){
// // 使用isDone()方法檢查計算是否完成
// if(future.isDone()){
// break;
// }
// }
// String result = future.get();
// System.out.println("輪訓時間:" + (System.currentTimeMillis() - start));
// System.out.println("異步結果:"+ result);

// 用法三:使用cancel方法停止任務的執行,參數false不會立即終止任務,true會打斷sleep,拋出異常
future.cancel(true);
System.out.println("停止任務執行了");
cachePool.shutdown();
}

/**
* @desc : CompletableFuture常用的用法(同步)
* 在Java 8中, 新增加了一個包含50個方法左右的類: CompletableFuture,
* 提供了非常強大的Future的擴展功能,可以幫助我們簡化異步編程的復雜性,
* 提供了函數式編程的能力,可以通過回調的方式處理計算結果,
* 並且提供了轉換和組合CompletableFuture的方法。
* @author : 毛會懂
* @create: 2021/11/7 13:19:00
**/
public static void main2(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFutureOne = new CompletableFuture<>();
ExecutorService cachePool = Executors.newCachedThreadPool();
cachePool.execute(() ->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("one=" +Thread.currentThread().getName());
completableFutureOne.complete("異步執行結果");
});

// WhenComplete 方法返回的 CompletableFuture 仍然是原來的 CompletableFuture 計算結果(類型都是String).
CompletableFuture<String> completableFutureTwo = completableFutureOne.whenComplete((s, t) -> {
System.out.println("two=" +Thread.currentThread().getName());
System.out.println("異步執行完畢后,打印異步任務的結果:" + s);
});

// ThenApply 方法返回的是一個新的 completeFuture(類型不再一致)
CompletableFuture<Integer> completableFutureThree = completableFutureTwo.thenApply(s -> {
System.out.println("three=" + Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.length();
});
System.out.println("堵塞獲取結果:" + completableFutureThree.get());
cachePool.shutdown();
}

/**
* @desc : CompletableFuture常用的用法(異步)
* @author : 毛會懂
* @create: 2021/11/7 13:19:00
**/
public static void main3(String[] args) throws IOException, ExecutionException, InterruptedException {
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
List<CompletableFuture<String>> newList = list.stream().map(id -> getNum(id)).collect(Collectors.toList());
System.out.println("以上3個completableFuture執行完畢");

// 使用allOf方法封裝所有的並行任務
CompletableFuture<Void> allFutures = CompletableFuture.allOf(newList.toArray(new CompletableFuture[newList.size()]));
//獲得所有子任務的處理結果
CompletableFuture<List<String>> listCompletableFuture = allFutures.thenApply(v -> newList.stream().map(future -> future.join()).collect(Collectors.toList()));

System.out.println("等待結果");
List<String> strings = listCompletableFuture.get();
System.out.println("結果" + strings);

// 讓主線程暫停
System.in.read();
}

private static CompletableFuture<String> getNum(Integer id){
return CompletableFuture.supplyAsync(() -> {
System.out.println("start" + id);
try {
Thread.sleep(3000 * id);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("over" + id);
return "num" + id;
});
}


/**
* @desc : get 和 join拋異常的不同
* @author : 毛會懂
* @create: 2021/11/7 13:54:00
**/
public static void main4(String[] args) throws ExecutionException, InterruptedException, IOException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
int i = 1/ 0;
return 100;
});
//completableFuture.get();
completableFuture.join();
System.in.read();
}

/**
* @desc : 拋異常和執行正常的結果
* CompletableFuture.complete()、CompletableFuture.completeExceptionally只能被調用一次。
* @author : 毛會懂
* @create: 2021/11/7 14:07:00
**/
public static void main5(String[] args) throws InterruptedException, IOException {
final CompletableFuture<Integer> future = new CompletableFuture<>();
class Client extends Thread{
CompletableFuture<Integer> future;
Client(String threadName,CompletableFuture<Integer> future){
super(threadName);
this.future = future;
}
@Override
public void run(){
try {
System.out.println(this.getName() + ":" + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

new Client("client1",future).start();
new Client("client2",future).start();
System.out.println("等待");
Thread.sleep(3000);
future.complete(100);
// future.completeExceptionally(new Exception());
System.in.read();
}

/**
* @desc : 異步執行長時間的任務
* @author : 毛會懂
* @create: 2021/11/7 14:30:00
*
* @return*/
public static void main6(String[] args) throws InterruptedException, IOException {

try {
System.out.println("開始執行我們的業務");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("結束執行我們的業務");
CompletableFuture<Integer> future = new CompletableFuture<>();
// 方法一: 無關業務異步執行。 supplyAsync方法以Supplier<U>函數式接口類型為參數,CompletableFuture的計算結果類型為U。
// future.supplyAsync( () ->{
// System.out.println("開始執行其他與本請求的無關的業務");
// try {
// Thread.sleep(5000);
// } catch (InterruptedException ee ){
// ee.printStackTrace();
// }
// System.out.println("結束執行其他與本請求的無關的業務");
// return null;
// });

// 方法二:主業務成功執行完,執行future.complete(),可觸發無關業務的異步執行。
// future.whenComplete((v,e) ->{
// System.out.println("開始執行其他與本請求的無關的業務");
// try {
// Thread.sleep(5000);
// } catch (InterruptedException ee ){
// ee.printStackTrace();
// }
// System.out.println("結束執行其他與本請求的無關的業務");
// });
// // 此行與方法二配套
// future.complete(1);

// 方法三: 無關業務異步執行 runAsync方法也好理解,它以Runnable函數式接口類型為參數,所以CompletableFuture的計算結果為空。
Map<String,Integer> map = new HashMap<>();
map.put("activityId",11);
String name = "毛會懂";
// CompletableFuture.runAsync(() ->{
// System.out.println("開始執行其他與本請求的無關的業務");
// try {
// System.out.println(map);
// Thread.sleep(5000);
// System.out.println("name=" + name);
// } catch (InterruptedException ee ){
// ee.printStackTrace();
// }
// System.out.println("結束執行其他與本請求的無關的業務");
// });

// 方法四:supplyAsync方法以Supplier<U>函數式接口類型為參數,CompletableFuture的計算結果類型為U。
CompletableFuture.supplyAsync(()->{
System.out.println("開始執行其他與本請求的無關的業務");
try {
System.out.println(map);
Thread.sleep(5000);
System.out.println("name=" + name);
} catch (InterruptedException ee ){
ee.printStackTrace();
}
System.out.println("結束執行其他與本請求的無關的業務");
return null;
});
System.out.println("ok");

System.in.read();
}


/**
* @desc : 當原先的CompletableFuture的值計算完成或者拋出異常的時候,
* 會觸發這個CompletableFuture對象的計算,結果由BiFunction參數計算而得。
* 因此handle 和 handleAsync方法兼有whenComplete和轉換的兩個功能
* @author : 毛會懂
* @create: 2021/11/7 15:43:00
**/
public static void main7(String[] args) throws IOException, ExecutionException, InterruptedException {
CompletableFuture future = new CompletableFuture();
CompletableFuture handler = future.handle((s, v) -> {
System.out.println("handler");
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
});

// CompletableFuture handler = future.handleAsync((s, v) -> {
// System.out.println("handler");
// System.out.println(Thread.currentThread().getName());
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// return s;
// });
System.out.println(Thread.currentThread().getName());
future.complete("ok");
System.out.println(handler.get());
System.in.read();
}

/**
* @desc : thenApply 和thenApplyAsync:原來的CompletableFuture計算完后,
* 將結果傳遞給函數fn,將fn的結果作為新的CompletableFuture計算結果。
* 因此它的功能相當於將CompletableFuture<T>轉換成CompletableFuture<U>
* 它們與handle方法的區別在於handle方法會處理正常計算值和異常,
* 因此它可以屏蔽異常,避免異常繼續拋出。而thenApply方法只是用來處理正常值,因此一旦有異常就會拋出。
* @author : 毛會懂
* @create: 2021/11/7 15:46:00
**/
public static void main8(String[] args) throws ExecutionException, InterruptedException {
// future初始化
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
return 100;
});
CompletableFuture<String> completableFuture = future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString() + "ok");
System.out.println(completableFuture.get());
}

/**
* @desc : 沒有返回值的用法
* @author : 毛會懂
* @create: 2021/11/7 15:52:00
**/
public static void main9(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});

// thenAccept 和 thenAcceptAsync沒有返回值
CompletableFuture<Void> future1 = future.thenAccept(System.out::println);
// 沒有返回值,打印出來為 null
System.out.println(future1.get());

//thenAcceptBoth以及相關方法提供了類似的功能,當兩個CompletionStage都正常完成計算的時候,就會執行提供的action,它用來組合另外一個異步的結果。
CompletableFuture<Void> future2 = future.thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
return 10;
}), (x, y) -> System.out.println(x * y));
System.out.println(future2.get());
//runAfterBoth是當兩個CompletionStage都正常完成計算的時候,執行一個Runnable,這個Runnable並不使用計算的結果
CompletableFuture<Void> future3 = future.runAfterBoth(CompletableFuture.supplyAsync(() -> {
return 10;
}), () -> System.out.println("ok"));
System.out.println(future3.get());

// Runnable並不使用CompletableFuture計算的結果。
CompletableFuture<Void> future4 = future.thenRun(() -> System.out.println("ok"));
System.out.println(future4.get());
}

/**
* @desc : 組合
* @author : 毛會懂
* @create: 2021/11/7 16:10:00
**/
public static void main10(String[] args) throws ExecutionException, InterruptedException, IOException {
// CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// return 100;
// });
// CompletableFuture<String> f = future.thenCompose( i -> {
// return CompletableFuture.supplyAsync(() -> {
// return (i * 10) + "";
// });
// });
// System.out.println(f.get()); //1000

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
CompletableFuture<String> f = future.thenCombine(future2, (x,y) -> {String str = (y + "-" + x); return str;});
System.out.println(f.get()); //abc-100

System.in.read();
}

/**
* @desc : Java Future轉CompletableFuture
* @author : 毛會懂
* @create: 2021/11/7 16:29:00
**/
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Future<String> future = executorService.submit(() -> {
return "ok";
});
CompletableFuture<String> completableFuture = toCompletable(future, executorService);
String s = completableFuture.get();
System.out.println(s);
}

public static <T> CompletableFuture<T> toCompletable(Future<T> future, Executor executor) {
return CompletableFuture.supplyAsync(() -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}, executor);
}


public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList()));
}
public static <T> CompletableFuture<List<T>> sequence(Stream<CompletableFuture<T>> futures) {
List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList());
return sequence(futureList);
}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM