* 實現異步API public double getPrice(String product) { return calculatePrice(product); } /** * 同步計算商品價格的方法 * * @param product 商品名稱 * @return 價格 */ private double calculatePrice(String product) { delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } /** * 模擬計算,查詢數據庫等耗時 */ public static void delay() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } * 將同步方法裝換為異步方法 /** * 異步計算商品的價格. * * @param product 商品名稱 * @return 價格 */ public Future<Double> getPriceAsync(String product) { CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread(() -> { double price = calculatePrice(product); futurePrice.complete(price); }).start(); return futurePrice; } 使用異步API 模擬客戶端 Shop shop = new Shop("BestShop"); long start = System.nanoTime(); Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); long incocationTime = (System.nanoTime() - start) / 1_000_000; System.out.println("執行時間:" + incocationTime + " msecs"); try { Double price = futurePrice.get(); System.out.printf("Price is %.2f%n", price); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } long retrievalTime = (System.nanoTime() - start) / 1_000_000; System.out.println("retrievalTime:" + retrievalTime + " msecs");
* 錯誤處理 上述代碼,如果沒有意外,可以正常工作,但是如果價格計算過程中生產了錯誤會怎樣呢?非常不幸,這種情況下你會得到一個相當糟糕的結果:用於提示錯誤的異常會限制在視圖計算商品的價格的當前線程的范圍內,最終會殺死該線程,而這會導致等待get方法放回結果的客戶端永久的被阻塞, 客戶端可以使用重載的get方法,它給定了一個超時時間,這是一種推薦做法! 為了讓客戶端能了解商店無法提供請求商品價格的原因.我們對代碼優化,! /** * 異步計算商品的價格. * * @param product 商品名稱 * @return 價格 */ public Future<Double> getPriceAsync(String product) { CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread(() -> { try { double price = calculatePrice(product); futurePrice.complete(price); } catch (Exception e) { //否則就拋出異常,完成這次future操作 futurePrice.completeExceptionally(e); } }).start(); return futurePrice; } * 使用工廠方法supplyAsync創建CompletableFuture /** * 異步計算商品的價格. * * @param product 商品名稱 * @return 價格 */ public Future<Double> getPriceAsync(String product) { /* CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread(() -> { try { double price = calculatePrice(product); futurePrice.complete(price); } catch (Exception e) { //否則就拋出異常,完成這次future操作 futurePrice.completeExceptionally(e); } }).start(); return futurePrice;*/ return CompletableFuture.supplyAsync(() -> calculatePrice(product)); } * 讓代碼免受阻塞之苦 案例:最佳價格查詢器 private static List<Shop> shops = Arrays.asList(new Shop("BestPrice"), new Shop(":LetsSaveBig"), new Shop("MyFavoriteShop"), new Shop("BuyItAll")); /** * 最佳價格查詢器 * * @param product 商品 * @return */ public static List<String> findprices(String product) { return shops .stream() .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))) .collect(Collectors.toList()); } 驗證findprices的正確性和執行性能 long start = System.nanoTime(); System.out.println(findprices("myPhones27s")); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Done in " + duration+" msecs"); 執行結果:
* 使用平行流對請求進行並行操作 /** * 最佳價格查詢器(並行流) * * @param product 商品 * @return */ public static List<String> parallelFindprices(String product) { return shops .parallelStream() .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))) .collect(Collectors.toList()); } 同樣的測試代碼:
相當不錯,看起來這是個簡單有效的主意,對4個不同商店的查詢實現了並行.所有完成操作的總耗時只有1秒多一點,讓我們嘗試使用CompletableFuture,將findprices方法中對不同商店的同步調用替換為異步調用. * 使用CompletableFuture發起異步請求 /** * 最佳價格查詢器(異步調用實現) * @param product 商品 * @return */ public static List<String> asyncFindprices(String product) { //使用這種方式,你會得到一個List<CompletableFuture<String>>,列表中的每一個CompletableFuture對象在計算完成后都包含商店的String類型的名稱. //但是,由於你用CompletableFuture實現了asyncFindprices方法要求返回一個List<String>.你需要等待所有的future執行完畢,將其包含的值抽取出來,填充到列表中才能返回 List<CompletableFuture<String>> priceFuture = shops .stream() .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))) .collect(Collectors.toList()); //為了實現這個效果,我門可以向最初的List<CompletableFuture<String>>施加第二個map操作,對list中的每一個future對象執行join操作,一個接一個地等待他們允許結束,join和get方法 //有相同的含義,不同的在於join不會拋出任何檢測到的異常 return priceFuture .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } 相同的測試代碼:
結果讓我們失望了.我們采用異步調用新版方法,要比並行流慢了一倍. * 尋找更好的方案 經過我增加商店數量,然后使用三種方式反復的測試,發現了一個問題,並行流和異步調用的性能不分伯仲,究其原因都一樣,它們內部采用的是同樣的通用線程池,默認都使用固定數目的線程,具體線程數取決於Runtime.getRuntime.availableProcessors()放回值,然而,.CompletableFuture具有一定的優勢,因為它允許你對執行器進行配置,尤其是線程池的大小,讓它以適合應用需求的方式進行配置,滿足程序的要求,而這是並行流API無法提供的. * 使用定制的執行器 private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100)); /** * 最佳價格查詢器(異步調用實現,自定義執行器) * * @param product 商品 * @return */ public static List<String> asyncFindpricesThread(String product) { List<CompletableFuture<String>> priceFuture = shops .stream() .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor)) .collect(Collectors.toList()); return priceFuture .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } 經過測試處理5個商店 是1秒多,處理9個商店也是1秒多 並行--使用流還是CompletableFutures? 目前為止,我們已經知道對集合進行並行計算有兩種方式,要么將其轉化為並行流,利用map這樣的操作開展工作,要么枚舉出集合中的每一個元素,創建新的線程,在CompletableFuture內對其進行操作,后者提供了更多的靈活性,你可以調整線程池大小,二者能幫助你確保整體計算機不會因為線程都在等待I/O而發生阻塞 我們使用這些API的建議如下: 1. 如果你進行的是計算密集型的操作,並且沒有I/O,那么推薦使用Stream接口,因為實現簡單,同時效率也可能是最高的 2. 反之,如果你並行的工作單元還涉及等待I/O的操作(包括網絡連接等待).那么使用CompletableFuture是靈活性更好,你可以像前面討論的那樣,依據等待/計算,或者W/C的比率設定需要使用的線程數,