前言
隨着多核處理器的出現,如何輕松高效的進行異步編程變得愈發重要,我們看看在java8之前,使用java語言完成異步編程有哪些方案。
JAVA8之前的異步編程
- 繼承Thead類,重寫run方法
- 實現runable接口,實現run方法
- 匿名內部類編寫thread或者實現runable的類,當然在java8中可以用lambda表達式簡化
- 使用futureTask進行附帶返回值的異步編程
- 使用線程池和Future來實現異步編程
- spring框架下的
@async
獲得異步編程支持
使用線程池與future來實現異步編程
實現方式可謂是多種多樣,這里我們使用線程池和future來實現異步編程,借着這個例子來講述java8的組合式異步編程有着怎樣的優勢
//構造線程池
ExecutorService pool = Executors.newCachedThreadPool();
try {
//構造future結果,doSomethingA十分耗時,因此采用異步
Future<Integer> future = pool.submit(() -> doSomethingA());
//做一些別的事情
doSomethingB();
//從future中獲得結果,設置超時時間,超過了就拋異常
Integer result = future.get(10, TimeUnit.SECONDS);
//打印結果
System.out.printf("the async result is : %d", result);
//異常捕獲
} catch (InterruptedException e) {
System.out.println("任務計算拋出了一個異常!");
} catch (ExecutionException e) {
System.out.println("線程在等待的過程中被中斷了!");
} catch (TimeoutException e) {
System.out.println("future對象等待時間超時了!");
}
}
然而這樣的異步編程方式僅僅能滿足基本的需要,稍微復雜的一些異步處理Future接口似乎就有點束手無策了,例如
- 將兩個異步計算合並為一個——這兩個異步計算之間相互獨立,同時第二個又依賴於第
一個的結果。 - 等待 Future 集合中的所有任務都完成。
- 僅等待 Future 集合中最快結束的任務完成(有可能因為它們試圖通過不同的方式計算同
一個值) ,並返回它的結果。 - 通過編程方式完成一個 Future 任務的執行(即以手工設定異步操作結果的方式) 。
- 應對 Future 的完成事件(即當 Future 的完成事件發生時會收到通知,並能使用 Future計算的結果進行下一步的操作,不只是簡單地阻塞等待操作的結果)
這種感覺其實就很像沒有stream之前的collections的操作感覺一樣,同樣的,對於future,java8提供了它的函數式升級版本CompletableFuture,從名字就可以看出來這絕對是future的升級版。
JAVA8中的組合式異步編程
使用CompletableFuture進行異步編程
事物的發展往往都是由簡單->復雜->簡單,這里我們同樣遵循這樣的規律,循序漸進。
下面的例子摘取《java8實戰》
的異步編程章節,並做了簡化。
我們假設現在我們有一項查詢商品價格的服務十分耗時,所以毫無例外的我們想讓查詢最佳價格的服務以異步的形式執行。
最直接的方式是直接構建一個異步查詢商品價格的api,並且返回,為了演示需要,編寫一個線程等待一秒的方法來模擬長時間的請求。
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product) {
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
public static void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
現在getPrice這方法是一個同步的方法,該方法在經過1秒的延遲之后會返回給我們一個商品的價格(這里只是簡單的根據名字構造了一個隨機數)
我們使用completFuture將getPrice
轉化為異步方法,如下
public Future<Double> getAsyncPrice(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
double price = calculatePrice(product);
futurePrice.complete(price);
}).start();;
return futurePrice;
}
這里構造一個completableFuture對象,並另起一個異步線程,將異步計算的結果使用futurePrice.complete來接受,無需等待直接返回future結果
調用類使用Integer result = future.get(10, TimeUnit.SECONDS)
來接受返回的結果,如果等待超時則拋出異常。
另外,如果異步線程發生異常,並且在排查問題的時候想要知道具體是什么原因導致的,
可以在getAsyncPrice
方法中使用completeExcepitonally來得到異常信息並且結束這次異步任務,代碼如下
public Future<Double> getPriceAsync(String product) {
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch (Exception ex) {
futurePrice.completeExceptionally(ex);
}
}).start();
return futurePrice;
}
這樣,基本的功能就實現了。
使用工廠類簡化異步操作
也許你看到上面的代碼,會說:"我暈,你這寫法比原來還復雜哦,而且我也沒看出啥區別啊。",是的,上文的寫法可以算是原生態的寫法了,目的為為下面的知識做一個簡單的鋪墊。
事實上,CompleteFuture本身提供了大量的工廠方法來供我們十分方便的實現一個異步編程,他封裝了前篇一律的異常與結果接收,你只需要編寫真正的異步邏輯部分就可以了,同時借住於lambda表達式,可以更進一步。
supplyAsync 方法接受一個生產者(Supplier)作為參數,返回一個 CompletableFuture
對象, 該對象完成異步執行后會讀取調用生產者方法的返回值。 生產者方法會交由 ForkJoinPool
池中的某個執行線程(Executor)運行,但是你也可以使用 supplyAsync 方法的重載版本,傳
遞第二個參數指定不同的執行線程執行生產者方法。
於是上文的例子可以改寫如下
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
是不是簡潔了許多呢?
可現在還有問題,這里我們成功的編寫了一個十分簡潔的異步方法,可實際的情況中,我們所能調用的API大部分都是同步的,因此下面將介紹如何使用異步的方法去操作這些同步API。
使用流異步操作同步API
我們現在有這么一個需求,它接受產品名作為參數,返回一個字符串列表,
這個字符串列表中包括商店的名稱、該商店中指定商品的價格,商店集合以及接口設計如下。
List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll"));
public List<String> findPrices(String product);
使用同步的方法實現
這樣的集合變換使用stream流來操作十分容易,代碼如下
public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(toList());
}
stream流將 shop映射為了shop的名稱以及該shop中商品的價格的字符串,並使用收集器進行收集。
使用異步的方法實現
事實上,我們完全可以使用流將shop映射成CompletableFuture對象,就好像在操作集合一樣,代碼如下
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture
.supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))))
.collect(toList());
使用這種方式,你會得到一個,List<CompletableFuture
為了實現這個效果,你可以向最初的 List<CompletableFuture
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(
shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product)))
.collect(Collectors.toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
以上的代碼你可能會疑惑,為什么不直接按照shop->completableFuture->join->collect
的方式進行流處理呢?那是因為join這一步本身是阻塞的,對於流操作來說,前一個shop沒有處理完之前,是不會處理下一個shop的,所以對於每一個shop,處理到join這一步的時候就會阻塞住等待1秒,這樣的話,這個流水線本身就會變回阻塞的了。
而上文的編寫方法可以看出 shop->completableFuture->collect
這個操作本身是非阻塞的,順利的將所有的請求都發出去了,隨后再使用join來完成結果的收集。
使用線程池來管控異步方法
前面在介紹工廠方法時提到,可以選擇第二參數放入一個線程池來進行管控。
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
接着在supplyAsync中使用該線程池即可,代碼如下
CompletableFuture.supplyAsync(() -> shop.getName() + " price is " +
shop.getPrice(product), executor);
進階的異步流操作
既然我們已經將異步操作與流相結合了,因此很容易的就會想到對於異步流來說,應該有會有類似於集合流的一些非常好用的API吧?事實上,JAVA8的確為我們提供了這些API。
構造同步和異步操作
如同集合流操作一樣,異步流也可以提前安排一系列的任務,然后讓異步任務有條不紊的按照這個順序去執行。
- 同步任務
使用future.thenApply(Function)
來實現,該方法接受一個Function對象
你可以規划這樣的任務 任務A(異步)->任務B(同步),語法可能是這樣的
stream()
.map(xxx->supplayAsync(()->任務A)) //這一步已經異步的映射成了任務A
.map(future->future.thenApply(任務B)//執行同步的任務B
.collect
- 異步任務
與同步幾乎一樣,方法變為future.thenCompose(Function)
你可以規划這樣的任務 任務A(異步)->任務B(同步)->任務C(異步),語法可能是這樣的
stream()
.map(xxx->supplayAsync(()->任務A)) //這一步已經異步的映射成了任務A
.map(future->future.thenApply(任務B)//執行同步的任務B
.map(future->future.thenCompose(任務C))//再異步執行任務C
.collect
將兩個 CompletableFuture 對象整合起來,無論它們是否存在依賴
使用thenCombine來完成,類似任務A與任務B,A是查詢價格,B是查詢匯率,這兩個任務之間本身沒有關聯關系,所以可以同時發起,但你最后需要計算價格乘以匯率,因此在這兩個任務完成之后需要對他們的結果進行合並,代碼如下
Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product))//任務A
.thenCombine(
CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)), //任務B
(price, rate) -> price * rate); //任務A與任務B的合並操作
注意這里的任務A與任務B是異步的,但他們的合並操作是同步的,如果想要合並操作也是異步的,使用future.thenCombineAsync的異步方法版本。
對結果進行處理
使用thenAccept(Consumer)
以上都是對結果進行一些映射,你現在要對結果只進行處理,說白了就是前面的都是Function,現在要換成consumer,並且參數不再是異步任務,而是任務的結果值,舉個例子,上文的任務A(異步)->任務B(同步)->任務C(異步) 的操作現在想到對他們的操作結果進行打印
就可以使用thenAccept(Consumer)
stream()
.map(xxx->supplayAsync(()->任務A)) //這一步已經異步的映射成了任務A
.map(future->future.thenApply(任務B)//執行同步的任務B
.map(future->future.thenCompose(任務C))//再異步執行任務C
.map(future->future.thenAccept(System.out::println))//將結果打印
.collect
使用allOf與anyOf對結果進行處理
需要注意的是在執行了Accpet方法之后,你得到的是一個 CompletableFuture
你可以對這些流對象進行類似及早求值的操作,例如這條查詢4個商家的價格服務只要有一個給出了返回結果就結束這次異步流。
CompletableFuture[] futures = findPricesStream("myPhone")
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.anyOf(futures).join();
allOf 工廠方法接收一個由 CompletableFuture 構成的數組,數組中的所有 Completable-Future 對象執行完成之后,它返回一個 CompletableFuture