前言
最近在忙生活的第一個OKR,這個等等后面具體聊聊,今天開始恢復每周一篇原創,感謝小伙伴的不離不棄。這篇文章也是最近在Code Review的時候,看到的大家代碼,想整體推下大家異步編程的思想,由此而寫的。
為什么使用CompletableFuture
一些業務場景我們需要使用多線程異步執行任務,加快任務執行速度。 JDK5新增了Future接口,用於描述一個異步計算的結果。雖然 Future 以及相關使用方法提供了異步執行任務的能力,但是對於結果的獲取卻是很不方便,我們必須使用Future.get的方式阻塞調用線程,或者使用輪詢方式判斷 Future.isDone 任務是否結束,再獲取結果。這兩種處理方式都不是很優雅,相關代碼如下:
@Test
public void testFuture() {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Future<String> future = executorService.submit(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
try {
System.out.println(future.get());
System.out.println("end");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
此外就是無法解決多個異步任務需要相互依賴的場景,簡單點說就是,主線程需要等待子線程任務執行完畢之后在進行執行,這個時候你可能想到了CountDownLatch,沒錯確實可以解決,代碼如下,但是Java8以后我不在認為這是一種優雅的解決方式,接下來我們來了解下CompletableFuture的使用。
@Test
public void testCountDownLatch() {
ExecutorService executorService = Executors.newFixedThreadPool(5);
CountDownLatch downLatch = new CountDownLatch(2);
Future<String> orderFuture = executorService.submit(() -> {
OrderService orderService = new OrderServiceImpl();
String result = orderService.queryOrderInfo();
downLatch.countDown();
return result;
});
Future<String> trailFuture = executorService.submit(() -> {
TrailService trailService = new TrailServiceImpl();
String result = trailService.queryTrail();
downLatch.countDown();
return result;
});
try {
downLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(orderFuture.get() + trailFuture.get());
System.out.println("end");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
使用介紹
創建實例

關於CompletableFuture的創建提供了5種方式,第一個就是創建一個具有默認結果的 CompletableFuture,不經常使用,我們常用就是runAsync和supplyAsync,重點關注下這兩個靜態方法。runAsync是不具有返回值,supplyAsync具有返回值,關於這兩個方法都提供一個兩種創建形式,一種是默認的使用公共的 ForkJoinPool 線程池執行,這個線程池默認線程數是 CPU 的核數。

另外一種使用就是提供一個主動創建線程池,這樣做相比於ForkJoinPool的好處是,可以通過自定義不同場景的線程池,來進行業務划分方便發現問題,還有一個好處就是對於ForkJoinPool這種共享線程來說,一旦阻塞,會照成其他線程無法獲取執行機會。
CompletionStage

關於CompletableFuture核心能力就是通過繼承Future和CompletionStage來實現的,關於Future就是提供一些異步的能力,如果單存就這樣CompletableFuture也就不會那么強大,所以我們核心就是介紹CompletionStage內部常用一些方法。

關於CompletionStage的方法,可以分為兩種類型的接口,其中核心方法都提供異步的方式,這里關於異步的方法不進行介紹,基本上原理類似,都是新提供一個線程池去實現任務。
異步回調
關於異步回調可以分為兩類,一種是有參數的返回,另外一種是無參數的返回,有參數返回的包括thenApply和thenCompose,無參數返回的包括thenRun和thenAccept。
有參數返回
thenApply 和 thenCompose表示某個任務執行完成后執行的動作,即回調方法,會將該任務的執行結果即方法返回值作為入參傳遞到回調方法中,也可以理解為串行化的,唯一不同的是thenCompose需要返回一個新的 CompletionStage,整體的使用如下:
@Test
public void testCompletableFuture() {
long start = System.currentTimeMillis();
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "aaaaaaaaaaaaaaaaaaaaa";
}).thenApply(x -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x + "bbbbbbbbbbbbbbbbbbbbbbbb";
}).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
System.out.println(first.join());
long end = System.currentTimeMillis();
System.out.println("耗時" + (end - start) / 1000 + "");
}
無參數返回
thenAccep也是消費上一個任務的動作,將該任務的執行結果即方法返回值作為入參傳遞到回調方法中,只是無返回值,thenAccep與thenRun方法不同就是沒有入參也沒有返回值。
@Test
public void testCompletableFuture() {
long start = System.currentTimeMillis();
CompletableFuture<Void> first = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "a";
}).thenAccept(x -> {
System.out.println(x);
});
first.join();
long end = System.currentTimeMillis();
System.out.println("耗時" + (end - start) / 1000 + "秒");
}
異常
CompletableFuture方法執行過程若產生異常,只有get或者join方法的才能獲取到異常,針對這種情況CompletableFuture提供三種處理異常的方式。
exceptionally
exceptionally的使用方式類似於 try catch中的catch代碼塊中異常處理。exceptionally當某個任務執行異常時執行的回調方法,將拋出的異常作為參數傳遞到回調方法中,如果該任務正常執行,exceptionally方法返回的CompletionStage的result就是該任務正常執行的結果。
@Test
public void testCompletableFuture() {
long start = System.currentTimeMillis();
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new RuntimeException("test");
});
CompletableFuture<String> two = first.exceptionally((x) -> {
x.printStackTrace();
return "123";
});
two.join();
long end = System.currentTimeMillis();
System.out.println("耗時" + (end - start) / 1000 + "秒");
}
whenComplete
whenComplete的使用類似於 try..catch..finanlly 中 finally 代碼塊,無論是否發生異常,都將會執行的。whenComplete當某個任務執行完成后執行的回調方法,會將執行結果或者執行期間拋出的異常傳遞給回調方法,如果是正常執行則異常為null,回調方法對應的CompletableFuture的result和該任務一致,如果該任務正常執行,則get方法返回執行結果,如果是執行異常,則get方法拋出異常。
@Test
public void testCompletableFuture() {
long start = System.currentTimeMillis();
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "aa";
}).whenComplete((x, throwable) -> {
// 如果異常存在,打印異常,並且返回默認值
if (throwable != null) {
throwable.printStackTrace();
System.out.println("失敗");
} else {
System.out.println("成功");
}
});
System.out.println(first.join());
long end = System.currentTimeMillis();
System.out.println("耗時" + (end - start) / 1000 + "秒");
}
handle
跟whenComplete基本一致,區別在於handle的回調方法有返回值,且handle方法返回的CompletableFuture的result是回調方法的執行結果或者回調方法執行期間拋出的異常,與原始CompletableFuture的result無關。
@Test
public void testCompletableFuture() {
long start = System.currentTimeMillis();
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new RuntimeException("test");
}).handle((x, throwable) -> {
// 如果異常存在,打印異常,並且返回默認值
if (throwable != null) {
throwable.printStackTrace();
return "異常";
} else {
return x + "aa";
}
});
System.out.println(first.join());
long end = System.currentTimeMillis();
System.out.println("耗時" + (end - start) / 1000 + "秒");
}
組合關系
組合關系關系分為兩種,一種是和的關系,一種是或的關系,和關系就是當所有的任務執行完成以后再繼續執行,類似於CountDownLatch,或關系就是只要有一個任務執行完成以后就可以向下執行。
和關系
thenCombine /thenAcceptBoth / runAfterBoth/allOf
這四個方法可以將多個CompletableFuture組合起來,將多個CompletableFuture都執行完成以后,才能執行后面的操作,區別在於,thenCombine會將任務的執行結果作為方法入參傳遞到指定方法中,且該方法有返回值;thenAcceptBoth同樣將任務的執行結果作為方法入參,但是無返回值;runAfterBoth沒有入參,也沒有返回值。注意多個任務中只要有一個執行異常,則將該異常信息作為指定任務的執行結果。allOf是多個任務都執行完成后才會執行,只要有一個任務執行異常,則返回的CompletableFuture執行get方法時會拋出異常,如果都是正常執行,則get返回null。
@Test
public void testCompletableFuture() {
CompletableFuture<String> order = CompletableFuture.supplyAsync(() -> {
OrderService orderService = new OrderServiceImpl();
return orderService.queryOrderInfo();
});
CompletableFuture<String> trail = CompletableFuture.supplyAsync(() -> {
TrailService trailService = new TrailServiceImpl();
return trailService.queryTrail();
});
CompletableFuture<String> future = order.thenCombine(trail, (a, b) -> a + b);
CompletableFuture<Void> afterBoth = future.runAfterBoth(trail, () -> {
System.out.println(future.join());
});
CompletableFuture<Void> result = CompletableFuture.allOf(afterBoth);
try {
System.out.println(result.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
applyToEither / acceptEither / runAfterEither/anyOf
這四個方法可以將多個CompletableFuture組合起來,只需要其中一個CompletableFuture執行完成以后,就能執行后面的操作,applyToEither會將已經執行完成的任務的執行結果作為方法入參,並有返回值;acceptEither同樣將已經執行完成的任務的執行結果作為方法入參,但是沒有返回值;runAfterEither沒有方法入參,也沒有返回值,注意多個任務中只要有一個執行異常,則將該異常信息作為指定任務的執行結果。anyOf多個任務只要有一個任務執行完成,后續任務就可執行。
@Test
public void testCompletableFuture() {
CompletableFuture<String> order = CompletableFuture.supplyAsync(() -> {
OrderService orderService = new OrderServiceImpl();
return orderService.queryOrderInfo();
});
CompletableFuture<String> trail = CompletableFuture.supplyAsync(() -> {
TrailService trailService = new TrailServiceImpl();
return trailService.queryTrail();
});
CompletableFuture<String> future = order.applyToEither(trail, (result) -> result);
CompletableFuture<Void> afterBoth = future.runAfterEither(trail, () -> {
System.out.println(future.join());
});
CompletableFuture<Object> result = CompletableFuture.anyOf(afterBoth,order);
try {
System.out.println(result.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
項目中如何使用
CompletableFuture在自定義線程時,默認使用的線程池是 ForkJoinPool.commonPool(),對於我們用java常做的IO密集型任務,默認線程池是遠遠不夠使用的;在雙核及以下機器上,默認線程池又會退化為為每個任務創建一個線程,相當於沒有線程池。因此對於CompletableFuture在項目中的使用一定要自定義線程池,同時又要注意自定義線程池,線程池有個容量滿了的拒絕策略,如果采用丟棄策略的拒絕策略,並且allOf方法和get方法如果沒有設置超時則會無限期的等待下去,接下來我們通過自定義線程使用CompletableFuture。
-
自定義線程池,此處通過繼承ThreadPoolExecutor,重寫了shutdown() 、shutdownNow() 、beforeExecute() 和 afterExecute()方法來統計線程池的執行情況,此處還可以結合Spring和appllo實現自定義擴展線程池,下一篇可以聊聊擴展思路以及實現方案,不同對的業務場景使用的不同的線程池,一是方便出現問題的排查,另外就是類似於Hystrix隔離的方案;
package com.zto.lbd;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 線程池監控類
*
* @author wangtongzhou 18635604249
* @since 2022-02-23 07:27
*/
public class ThreadPoolMonitor extends ThreadPoolExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class);
/**
* 保存任務開始執行的時間,當任務結束時,用任務結束時間減去開始時間計算任務執行時間
*/
private ConcurrentHashMap<String, Date> startTimes;
/**
* 線程池名稱,一般以業務名稱命名,方便區分
*/
private String poolName;
/**
* 調用父類的構造方法,並初始化HashMap和線程池名稱
*
* @param corePoolSize 線程池核心線程數
* @param maximumPoolSize 線程池最大線程數
* @param keepAliveTime 線程的最大空閑時間
* @param unit 空閑時間的單位
* @param workQueue 保存被提交任務的隊列
* @param poolName 線程池名稱
*/
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), poolName);
}
/**
* 調用父類的構造方法,並初始化HashMap和線程池名稱
*
* @param corePoolSize 線程池核心線程數
* @param maximumPoolSize 線程池最大線程數
* @param keepAliveTime 線程的最大空閑時間
* @param unit 空閑時間的單位
* @param workQueue 保存被提交任務的隊列
* @param threadFactory 線程工廠
* @param poolName 線程池名稱
*/
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, String poolName) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.startTimes = new ConcurrentHashMap<>();
this.poolName = poolName;
}
/**
* 線程池延遲關閉時(等待線程池里的任務都執行完畢),統計線程池情況
*/
@Override
public void shutdown() {
// 統計已執行任務、正在執行任務、未執行任務數量
LOGGER.info("{} 關閉線程池, 已執行任務: {}, 正在執行任務: {}, 未執行任務數量: {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
super.shutdown();
}
/**
* 線程池立即關閉時,統計線程池情況
*/
@Override
public List<Runnable> shutdownNow() {
// 統計已執行任務、正在執行任務、未執行任務數量
LOGGER.info("{} 立即關閉線程池,已執行任務: {}, 正在執行任務: {}, 未執行任務數量: {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
return super.shutdownNow();
}
/**
* 任務執行之前,記錄任務開始時間
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
startTimes.put(String.valueOf(r.hashCode()), new Date());
}
/**
* 任務執行之后,計算任務結束時間
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
Date finishDate = new Date();
long diff = finishDate.getTime() - startDate.getTime();
// 統計任務耗時、初始線程數、核心線程數、正在執行的任務數量、
// 已完成任務數量、任務總數、隊列里緩存的任務數量、池中存在的最大線程數、
// 最大允許的線程數、線程空閑時間、線程池是否關閉、線程池是否終止
LOGGER.info("{}-pool-monitor: " +
"任務耗時: {} ms, 初始線程數: {}, 核心線程數: {}, 正在執行的任務數量: {}, " +
"已完成任務數量: {}, 任務總數: {}, 隊列里任務數量: {}, 池中存在的最大線程數: {}, " +
"最大線程數: {}, 線程空閑時間: {}, 線程池是否關閉: {}, 線程池是否終止: {}",
this.poolName,
diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
}
/**
* 生成線程池所用的線程,改寫了線程池默認的線程工廠,傳入線程池名稱,便於問題追蹤
*/
static class MonitorThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
/**
* 初始化線程工廠
*
* @param poolName 線程池名稱
*/
MonitorThreadFactory(String poolName) {
SecurityManager s = System.getSecurityManager();
group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
}
-
使用自定義線程池的CompletableFuture;
private final static BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
private final static ThreadPoolMonitor threadPoolMonitor = new ThreadPoolMonitor(5, 10, 100L,
TimeUnit.SECONDS, workQueue, "monitor");
@Test
public void testCompletableFuture() {
CompletableFuture<String> order = CompletableFuture.supplyAsync(() -> {
OrderService orderService = new OrderServiceImpl();
return orderService.queryOrderInfo();
},threadPoolMonitor);
String result=order.join();
assertTrue(Objects.nonNull(result));
}
結束
歡迎大家點點關注,點點贊!