引子
為了讓程序更加高效,讓CPU最大效率的工作,我們會采用異步編程。首先想到的是開啟一個新的線程去做某項工作。再進一步,為了讓新線程可以返回一個值,告訴主線程事情做完了,於是乎Future粉墨登場。然而Future提供的方式是主線程主動問詢新線程,要是有個回調函數就爽了。所以,為了滿足Future的某些遺憾,強大的CompletableFuture隨着Java8一起來了。
Future
傳統多線程的卻讓程序更加高效,畢竟是異步,可以讓CPU充分工作,但這僅限於新開的線程無需你的主線程再費心了。比如你開啟的新線程僅僅是為了計算1+...+n再打印結果。有時候你需要子線程返回計算結果,在主線程中進行進一步計算,就需要Future了。
看下面這個例子,主線程計算2+4+6+8+10;子線程計算1+3+5+7+9;最后需要在主線程中將兩部分結果再相加。
public class OddNumber implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
int result = 1 + 3 + 5 + 7 + 9;
return result;
}
}
public class FutureTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
OddNumber oddNumber = new OddNumber();
Future<Integer> future = executor.submit(oddNumber);
long startTime = System.currentTimeMillis();
int evenNumber = 2 + 4 + 6 + 8 + 10;
try {
Thread.sleep(1000);
System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
int oddNumberResult = future.get();//這時間會被阻塞
System.out.println("1+2+...+9+10="+(evenNumber+oddNumberResult));
System.out.println("1.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
} catch (Exception e) {
System.out.println(e);
}
}
}
輸出結果:
0.開始了:1001秒
1+2+...+9+10=55
1.開始了:3002秒
看一下Future接口,只有五個方法比較簡單
//取消任務,如果已經完成或者已經取消,就返回失敗
boolean cancel(boolean mayInterruptIfRunning);
//查看任務是否取消
boolean isCancelled();
//查看任務是否完成
boolean isDone();
//剛才用到了,查看結果,任務未完成就一直阻塞
V get() throws InterruptedException, ExecutionException;
//同上,但是加了一個過期時間,防止長時間阻塞,主線程也做不了事情
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
CompletableFuture
上面的看到Future的五個方法,不是很豐富,既然我們的主線程叫做main,就應該以我為主,我更希望子線程做完了事情主動通知我。為此,Java8帶來了CompletableFuture,一個Future的實現類。其實CompletableFuture最迷人的地方並不是極大豐富了Future的功能,而是完美結合了Java8流的新特性。
實現回調,自動后續操作
提前說一下CompletableFuture實現回調的方法(之一):thenAccept()
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
參數有個Consumer,用到了Java8新特性,行為參數化,就是參數不一定是基本類型或者類,也可以是函數(行為),或者說一個方法(接口)。
public class OddNumberPlus implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1+3+5+7+9;
}
}
public class CompletableFutureTest {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
final int evenNumber = 2 + 4 + 6 + 8 + 10;
CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
try {
Thread.sleep(1000);
System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
//看這里,實現回調
oddNumber.thenAccept(oddNumberResult->
{
System.out.println("1.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
System.out.println("此時計算結果為:"+(evenNumber+oddNumberResult));
});
oddNumber.get();
} catch (Exception e) {
System.out.println(e);
}
}
}
輸出結果:
0.開始了:1006秒
1.開始了:3006秒
此時計算結果為:55
值得一提的是,本例中並沒有顯示的創建任務連接池,程序會默認選擇一個任務連接池ForkJoinPool.commonPool()
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
ForkJoinPool始自JDK7,叫做分支/合並框架。可以通過將一個任務遞歸分成很多分子任務,形成不同的流,進行並行執行,同時還伴隨着強大的工作竊取算法。極大的提高效率。當然,你也可以自己指定連接池。
CompletableFuture合並
Java8的確豐富了Future實現,CompletableFuture有很多方法可供大家使用,但是但從上面的例子來看,其實CompletableFuture能做的功能,貌似Future。畢竟你CompletableFuture用get()這個方法的時候還不是阻塞了,我Future蠻可以自己拿到返回值,再手動執行一些操作嘛(雖說這樣main方法一定很不爽)。那么接下來的事情,Future做起來就十分麻煩了。假設我們main方法只做奇數合集加上偶數合集這一個操作,提前算這兩個合集的操作異步交給兩個子線程,我們需要怎么做呢?沒錯,開啟兩個線程,等到兩個線程都計算結束的時候,我們進行最后的相加,問題在於,你怎么知道那個子線程最后結束的呢?(貌似可以做個輪詢,不定的調用isDone()這個方法...)豐富的CompletableFuture功能為我們提供了一個方法,用於等待兩個子線程都結束了,再進行相加操作:
//asyncPool就是上面提到的默認線程池ForkJoinPool
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
看個例子:
public class OddCombine implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1+3+5+7+9;
}
}
public class EvenCombine implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2+4+6+8+10;
}
}
public class CompletableCombineTest {
public static void main(String[] args) throws Exception{
CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
long startTime = System.currentTimeMillis();
CompletableFuture<Integer> resultFuturn = oddNumber.thenCombine(evenNumber,(odd,even)->{
return odd + even;
});
System.out.println(resultFuturn.get());
System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
}
}
輸出結果:
55
0.開始了:3000秒
這邊模擬一個睡1秒,一個睡3秒,但是真正的網絡請求時間是不定的。是不是很爽,最爽的還不是現象,而是以上操作已經利用了Java8流的概念。
兩個子線程還不夠,那么還有**anyOff()**函數,可以承受多個CompletableFuture,會等待所有任務都完成。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
與它長的很像的,有個方法,是當第一個執行結束的時候,就結束,后面任務不再等了,可以看作充分條件。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
在上面那個例子的基礎上,把OddNumberPlus類時間調長一點:
public class OddNumberPlus implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1+3+5+7+9;
}
}
public class CompletableCombineTest {
public static void main(String[] args) throws Exception{
CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
CompletableFuture<Integer> testNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
long startTime = System.currentTimeMillis();
CompletableFuture<Object> resultFuturn = CompletableFuture.anyOf(oddNumber,evenNumber,testNumber);
System.out.println(resultFuturn.get());
System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒");
}
}
輸出結果:
30
0.開始了:1000秒
小結
CompletableFuture的方法其實還有很多,常用的比如說runAsync(),類似於supplyAsync(),只是沒有返回值;除了thenApply()可以加回調函數以外,還有thenApply();還有注入runAfterBoth()、runAfterEither(),這些見名知意。還有很多,可以點開CompletableFuture這個類的源碼仔細看一看。見微知著,透過CompletableFuture,更加感覺到Java8的強大,強大的流概念、行為參數化、高效的並行理念等等,不僅讓Java寫起來更爽,還不斷豐富Java整個生態。Java一直在進步,所以沒有被時代淘汰,我們Javaer也可以繼續職業生涯,感謝Java,一起進步。
BLOG地址:www.liangsonghua.me
關注微信公眾號:松花皮蛋的黑板報,獲取更多精彩!
公眾號介紹:分享在京東工作的技術感悟,還有JAVA技術和業內最佳實踐,大部分都是務實的、能看懂的、可復現的