在多任務程序中,我們比較熟悉的是分支-合並框架的並行計算,他的目的是將一個操作(比如巨大的List計算)切分為多個子操作,充分利用CPU的多核,甚至多個機器集群,並行執行這些子操作。
而CompletableFuture的目標是並發(執行多個操作),而非並行,是利用CPU的核,使其持續忙碌,達成最大吞吐,在並發進行中避免等待遠程服務的返回值,或者數據庫的長時查詢結果等耗時較長的操作,如果解決了這些問題,就能獲得最大的並發(通過避免阻塞)。
而分支-合並框架只是並行計算,是沒有阻塞的情況。
Future接口
Future接口用於對將來某個時刻發生的結果進行建模,它建模了一種異步計算,返回一個執行結果的引用,計算完成后,這個引用被返回給調用方,
在使用Future時,將耗時操作封裝在一個Callable接口對象中,提交給ExecutorService。
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
我們在異步執行結果獲取時,設置了get過期時間2秒,否則,如果沒有正常的返回值,會阻塞線程,非常危險。
import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class FutureDemo { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); Future<Integer> result = executor.submit(new Callable<Integer>() { public Integer call() throws Exception { Util.delay(); return new Random().nextInt(); } }); doSomeThingElse(); executor.shutdown(); try { try { System.out.println("result:" + result.get(2,TimeUnit.SECONDS)); } catch (TimeoutException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } private static void doSomeThingElse() { System.out.println("Do Some Thing Else." ); } }
因為Future的局限性,如
- 難異步合並
- 等待 Future 集合中的所有任務都完成。
- 僅等待 Future 集合中最快結束的任務完成,並返回它的結果。
一是我們沒有好的方法去獲取一個完成的任務;二是 Future.get 是阻塞方法,使用不當會造成線程的浪費。解決第一個問題可以用 CompletionService 解決,CompletionService 提供了一個 take() 阻塞方法,用以依次獲取所有已完成的任務。對於第二個問題,可以用 Google Guava 庫所提供的 ListeningExecutorService 和 ListenableFuture 來解決。這些都會在后面的介紹。
CompletableFuture組合異步
單一操作異步
我們的演示程序是查詢多個在線商店,把最佳的價格返回給消費者。首先先查詢一個產品的價格(即單一操作異步)。
先建一個同步模型商店類Shop,其中delay()是一個延時阻塞。
calculatePrice(String)同步輸出一個隨機價格,
getPriceAsync(String)是異步獲取隨機價格,依賴calculatePrice同步方法,返回類型是Future<Double>
import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; public class Shop { private final String name; private final Random random; public Shop(String name) { this.name = name; random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2)); } public double getPrice(String product) { return calculatePrice(product); } private double calculatePrice(String product) { delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } public Future<Double> getPriceAsync(String product) { CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread( () -> { double price = calculatePrice(product); futurePrice.complete(price); }).start(); return futurePrice; } public String getName() { return name; } }
delay()阻塞,延遲一秒,注釋的代碼是隨機延遲0.5 - 2.5秒之間
public static void delay() { int delay = 1000; //int delay = 500 + RANDOM.nextInt(2000); try { Thread.sleep(delay); } catch (InterruptedException e) { throw new RuntimeException(e); } }
調用單一查詢同步:
public class ShopSyncMain { public static void main(String[] args) { Shop shop = new Shop("BestShop"); long start = System.nanoTime(); double price = shop.getPrice("my favorite product"); System.out.printf("Price is %.2f%n", price); long invocationTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Invocation returned after " + invocationTime + " msecs"); // Do some more tasks doSomethingElse(); long retrievalTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Price returned after " + retrievalTime + " msecs"); } private static void doSomethingElse() { System.out.println("Doing something else..."); } } //結果: Price is 123.26 Invocation returned after 1069 msecs Doing something else... Price returned after 1069 msecs
嗯,一秒出頭,這符合delay()阻塞一秒的預期
調用單一查詢異步操作:
public class ShopMain { public static void main(String[] args) { Shop shop = new Shop("BestShop"); long start = System.nanoTime(); Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); long invocationTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Invocation returned after " + invocationTime + " msecs"); // Do some more tasks, like querying other shops doSomethingElse(); // while the price of the product is being calculated try { double price = futurePrice.get(); System.out.printf("Price is %.2f%n", price); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } long retrievalTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Price returned after " + retrievalTime + " msecs"); } private static void doSomethingElse() { System.out.println("Doing something else..."); } }
結果:
Invocation returned after 54 msecs
Doing something else...
Price is 123.26
Price returned after 1102 msecs
咦,異步反而時間多了,別忘了,大家都是執行一個方法,不存在多個並發(也沒有多個順序流操作),異步當然顯示不出優勢。
異步異常
new Thread方式新建線程可能會有個糟糕的情況:用於提示錯誤的異常被限制在當前線程內,最終會殺死線程,所以get是無法返回期望值,client調用方會被阻塞。
你當然可以重載get,要求超時過期時間,可以解決問題,但你也應該解決這個錯誤邏輯,避免觸發超時,因為超時又會觸發TimeoutException,client永遠不知道為啥產生了異常。
為了讓client獲知異常原因,需要對CompletableFuture對象使用completeExceptionally方法,將內部錯誤異常跑出。
改寫如下:
public Future<Double> getPrice(String product) { new Thread(() -> { try { double price = calculatePrice(product); futurePrice.complete(price); } catch (Exception ex) { futurePrice.completeExceptionally(ex); } }).start();
使用工廠方法創建CompletableFuture對象
前面用門通過手動建立線程,通過調用同步方法創建CompletableFuture對象,還可以通過CompletableFuture的工廠方法來方便的創建對象,重寫如下:
public Future<Double> getPrice(String product) { return CompletableFuture.supplyAsync(() -> calculatePrice(product)); }
通過原生代碼,我們可以了解supplyAsync和runAsync的區別,runAsync沒有返回值,只是運行一個Runnable對象,supplyAsync有返回值U,參數是Supplier函數接口,或者可以自定義一個指定的執行線程池。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
並發單一任務異步
前面是單一的操作異步,異步的內涵不能顯現,現在要查詢一大堆的商店(並發)的同一產品的價格,然后對比價格。
商店List:
private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"), new Shop("LetsSaveBig"), new Shop("MyFavoriteShop"), new Shop("BuyItAll")/*, new Shop("ShopEasy")*/);
使用三種方式處理多個商店的價格查詢:
//順序同步方式 public List<String> findPricesSequential(String product) { return shops.stream() .map(shop -> shop.getName() + " price is " + shop.getPrice(product)) .collect(Collectors.toList()); } //並行流方式 public List<String> findPricesParallel(String product) { return shops.parallelStream() .map(shop -> shop.getName() + " price is " + shop.getPrice(product)) .collect(Collectors.toList()); } //工廠方法異步 public List<String> findPricesFuture(String product) { List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor)) .collect(Collectors.toList()); List<String> prices = priceFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); return prices; } private final Executor executor = Executors.newFixedThreadPool(shops.size(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } });
CompletableFuture::join的作用是異步的開啟任務並發,並且將結果合並聚集。
結果:
sequential done in 4130 msecs
parallel done in 1091 msecs
composed CompletableFuture done in 1010 msecs
如果將list個數變成5個,結果是
同步必然是線性增長
sequential done in 5032 msecs
並行流受到PC內核數的限制(4個),所以一組並行只能指派最多4個線程,第二輪要多費時1秒多
parallel done in 2009 msecs
CompletableFuture異步方式能保持1秒的秘密在於線程池,我們定義了shops.size()大小的線程池,並且使用了守護線程。java提供了倆類的線程:用戶線程和守護線程(user thread and Daemon thread)。用戶線程是高優先級的線程。JVM虛擬機在結束一個用戶線程之前,會先等待該用戶線程完成它的task。
在另一方面,守護線程是低優先級的線程,它的作用僅僅是為用戶線程提供服務。正是由於守護線程是為用戶線程提供服務的,僅僅在用戶線程處於運行狀態時才需要守護線程。另外,一旦所有的用戶線程都運行完畢,那么守護線程是無法阻止JVM退出的
並發多任務異步
假設所有商店開始折扣服務,用5種折扣碼代表。
public class Discount { public enum Code { NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20); private final int percentage; Code(int percentage) { this.percentage = percentage; } } public static String applyDiscount(Quote quote) { return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode()); } private static double apply(double price, Code code) { delay(); return format(price * (100 - code.percentage) / 100); } } public class Quote { private final String shopName; private final double price; private final Discount.Code discountCode; public Quote(String shopName, double price, Discount.Code discountCode) { this.shopName = shopName; this.price = price; this.discountCode = discountCode; } public static Quote parse(String s) { String[] split = s.split(":"); String shopName = split[0]; double price = Double.parseDouble(split[1]); Discount.Code discountCode = Discount.Code.valueOf(split[2]); return new Quote(shopName, price, discountCode); } public String getShopName() { return shopName; } public double getPrice() { return price; } public Discount.Code getDiscountCode() { return discountCode; } }
比如,SILVER(5)代表95折, GOLD(10)代表90折,PLATINUM(15)代表85折。、
Quote類用於包裝輸出。同步的價格計算也做了修改,使用:來分割商店名、價格、折扣碼。折扣碼是隨機產生的。
新的商店類,getPrice同步方法輸出商店名、價格、折扣碼的待分割字符串。parse用於將字符串轉化為Quote實例。
打折延遲1秒。
public class Shop { private final String name; private final Random random; public Shop(String name) { this.name = name; random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2)); } public String getPrice(String product) { double price = calculatePrice(product); Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)]; return name + ":" + price + ":" + code; } public double calculatePrice(String product) { delay(); return format(random.nextDouble() * product.charAt(0) + product.charAt(1)); } public String getName() { return name; } }
運行三種服務
public class BestPriceFinder { private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"), new Shop("LetsSaveBig"), new Shop("MyFavoriteShop"), new Shop("BuyItAll"), new Shop("ShopEasy")); private final Executor executor = Executors.newFixedThreadPool(shops.size(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); //同步流 public List<String> findPricesSequential(String product) { return shops.stream() .map(shop -> shop.getPrice(product)) .map(Quote::parse) .map(Discount::applyDiscount) .collect(Collectors.toList()); } //並行流 public List<String> findPricesParallel(String product) { return shops.parallelStream() .map(shop -> shop.getPrice(product)) .map(Quote::parse) .map(Discount::applyDiscount) .collect(Collectors.toList()); } //CompletableFuture異步 public List<String> findPricesFuture(String product) { List<CompletableFuture<String>> priceFutures = findPricesStream(product) .collect(Collectors.<CompletableFuture<String>>toList()); return priceFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } public Stream<CompletableFuture<String>> findPricesStream(String product) { return shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)) .map(future -> future.thenApply(Quote::parse)) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))); } public void printPricesStream(String product) { long start = System.nanoTime(); CompletableFuture[] futures = findPricesStream(product) .map(f -> f.thenAccept(s -> System.out.println(s + " (done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs)"))) .toArray(size -> new CompletableFuture[size]); CompletableFuture.allOf(futures).join(); System.out.println("All shops have now responded in " + ((System.nanoTime() - start) / 1_000_000) + " msecs"); } } //結果 sequential done in 10176 msecs parallel done in 4010 msecs composed CompletableFuture done in 2016 msecs
- 同步的10多秒是在5個list元素時獲得的,每個一次詢價,一次打折延遲,共2秒。
- 並行流一輪需要2秒多(第一輪詢價4個+1個共兩個輪次,2秒,第二輪打折4個+1個)。
- 異步方式第一輪和第二輪都是1秒多,所以2秒多。
這里的三步需要特別說明,這也是並發多任務的核心所在,並發是5條記錄並發,多任務是
- 詢價任務(耗時1秒)
- 解析字符換成Quote實例
- 計算折扣
這三個子任務
- .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
- .map(future -> future.thenApply(Quote::parse))
- .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
第一行是用異步工廠執行同步方法getPrice,返回字符串,涉及線程池;
第二行thenApply是將字符串轉成Quote實例,這個是同步內存處理,不涉及線程池;
thenapply()是接受一個Function<? super T,? extends U> fn參數用來轉換CompletableFuture,相當於流的map操作,返回的是非CompletableFuture類型,它的功能相當於將CompletableFuture<T>轉換成CompletableFuture<U>.
在這里是把CompletableFuture<String>轉成CompletableFuture<Quote>
第三行也是異步處理,也涉及線程池,獲得折扣后的價格。thenCompose()在異步操作完成的時候對異步操作的結果進行一些操作,Function<? super T, ? extends CompletionStage<U>> fn參數,並且仍然返回CompletableFuture類型,相當於flatMap,用來連接兩個CompletableFuture
如果加入一個不相互依賴的Future對象進行整合,比如需要計算匯率(不需要計算折扣,他們之間無依賴),可以試用thenCombine方法,這里有4種實現,最后一種比較優。
public class ExchangeService { public enum Money { USD(1.0), EUR(1.35387), GBP(1.69715), CAD(.92106), MXN(.07683); private final double rate; Money(double rate) { this.rate = rate; } } public static double getRate(Money source, Money destination) { return getRateWithDelay(source, destination); } private static double getRateWithDelay(Money source, Money destination) { delay(); return destination.rate / source.rate; } } public List<String> findPricesInUSD(String product) { List<CompletableFuture<Double>> priceFutures = new ArrayList<>(); for (Shop shop : shops) { // Start of Listing 10.20. // Only the type of futurePriceInUSD has been changed to // CompletableFuture so that it is compatible with the // CompletableFuture::join operation below. CompletableFuture<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product)) .thenCombine( CompletableFuture.supplyAsync( () -> ExchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate ); priceFutures.add(futurePriceInUSD); } // Drawback: The shop is not accessible anymore outside the loop, // so the getName() call below has been commented out. List<String> prices = priceFutures .stream() .map(CompletableFuture::join) .map(price -> /*shop.getName() +*/ " price is " + price) .collect(Collectors.toList()); return prices; } public List<String> findPricesInUSDJava7(String product) { ExecutorService executor = Executors.newCachedThreadPool(); List<Future<Double>> priceFutures = new ArrayList<>(); for (Shop shop : shops) { final Future<Double> futureRate = executor.submit(new Callable<Double>() { public Double call() { return ExchangeService.getRate(Money.EUR, Money.USD); } }); Future<Double> futurePriceInUSD = executor.submit(new Callable<Double>() { public Double call() { try { double priceInEUR = shop.getPrice(product); return priceInEUR * futureRate.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e.getMessage(), e); } } }); priceFutures.add(futurePriceInUSD); } List<String> prices = new ArrayList<>(); for (Future<Double> priceFuture : priceFutures) { try { prices.add(/*shop.getName() +*/ " price is " + priceFuture.get()); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } return prices; } public List<String> findPricesInUSD2(String product) { List<CompletableFuture<String>> priceFutures = new ArrayList<>(); for (Shop shop : shops) { // Here, an extra operation has been added so that the shop name // is retrieved within the loop. As a result, we now deal with // CompletableFuture<String> instances. CompletableFuture<String> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product)) .thenCombine( CompletableFuture.supplyAsync( () -> ExchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate ).thenApply(price -> shop.getName() + " price is " + price); priceFutures.add(futurePriceInUSD); } List<String> prices = priceFutures .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); return prices; } public List<String> findPricesInUSD3(String product) { // Here, the for loop has been replaced by a mapping function... Stream<CompletableFuture<String>> priceFuturesStream = shops .stream() .map(shop -> CompletableFuture .supplyAsync(() -> shop.getPrice(product)) .thenCombine( CompletableFuture.supplyAsync(() -> ExchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate) .thenApply(price -> shop.getName() + " price is " + price)); // However, we should gather the CompletableFutures into a List so that the asynchronous // operations are triggered before being "joined." List<CompletableFuture<String>> priceFutures = priceFuturesStream.collect(Collectors.toList()); List<String> prices = priceFutures .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); return prices; }