隨着多核處理器的出現,提升應用程序的處理速度最有效的方式就是可以編寫出發揮多核能力的軟件,我們已經可以通過切分大型的任務,讓每個子任務並行運行,使用線程的方式,分支/合並框架(java 7) 和並行流(java 8)來實現。
現在很多大型的互聯網公司都對外提供了API服務,比如百度的地圖,微博的新聞,天氣預報等等。很少有網站或網絡應用匯以完全隔離的方式工作,而是采用混聚的方式:它會使用來自多個源的內容,將這些內容聚合在一起,方便用戶使用。
比如實現一個功能,你需要在微博中搜索某個新聞,然后根據當前坐標獲取天氣預報。這些調用第三方信息的時候,不想因為等待搜索新聞時,放棄對獲取天氣預報的處理,於是我們可以使用 分支/合並框架 及並行流 來並行處理,將他們切分為多個子操作,在多個不同的核、CPU甚至是機器上並行的執行這些子操作。
相反,如果你想實現並發,而不是並行,或者你的主要目標是在同一個CPU上執行幾個松耦合的任務,充分利用CPU的核,讓其足夠忙碌,從而最大化程序的吞吐量,那么你其實真正想做的是避免因為等待遠程服務的返回,或者對數據庫的查詢,而阻塞線程的執行,浪費寶貴的計算資源,因為這種等待時間可能會很長。Future接口,尤其是它的新版實現CompletableFuture是處理這種情況的利器。
Future接口
Future接口在java 5中被引入,設計初衷是對將來某個時刻會發生的結果進行建模。它建模了一種異步計算,返回一個執行運算結果的引用,當運算結束后,這個引用被返回給調用方。在Future中觸發那些可能會耗時的操作把調用線程解放出來,讓它能繼續執行其他工作,不用一直等待耗時的操作完成,比如:你拿了一袋子衣服到洗衣店去洗衣服,洗衣店會給你張發票,告訴你什么時候會洗好,然后你就可以去做其他的事了。Future的另一個優點是它比更底層的Thread更容易使用。使用Future只需要講耗時的操作封裝在一個Callable對象中,再將它提交給ExecutorService就可以了。 Java 8之前使用Future的例子:
public static void main(String[] args) { //創建Executor-Service,通過他可以向線程池提交任務 ExecutorService executor = Executors.newCachedThreadPool(); //向executor-Service提交 Callable對象 Future<Double> future = executor.submit(new Callable<Double>() { @Override public Double call() throws Exception { //異步的方式執行耗時的操作 return doSomeLongComputation(); } }); //異步時,做其他的事情 doSomethingElse(); try{ //獲取異步操作的結果,如果被阻塞,無法得到結果,那么最多等待1秒鍾之后退出 Double result = future.get(1, TimeUnit.SECONDS); System.out.print(result); } catch (InterruptedException e) { System.out.print("計算拋出一個異常"); } catch (ExecutionException e) { System.out.print("當前線程在等待過程中被中斷"); } catch (TimeoutException e) { System.out.print("future對象完成之前已過期"); } } public static Double doSomeLongComputation() throws InterruptedException { Thread.sleep(1000); return 3 + 4.5; } public static void doSomethingElse(){ System.out.print("else"); }
這種方式可以再ExecutorService以並發的方式調用另外一個線程執行耗時的操作的同時,去執行一些其他任務。接着到已經沒有任務運行時,調用它的get方法來獲取操作的結果,如果操作完成,就會返回結果,否則會阻塞你的線程,一直到操作完成,返回響應的結果。
CompletableFuture
在java 8 中引入了CompletableFuture類,它實現了Future接口,使用了Lambda表達式以及流水線的思想,通過下面這個例子進行學習,比如:我們要做一個商品查詢,根據折扣來獲取價格。
public class Shop { public double getPrice(String product) throws InterruptedException { //查詢商品的數據庫,或鏈接其他外部服務獲取折扣 Thread.sleep(1000); return new Random().nextDouble() * product.charAt(0) + product.charAt(1); } }
當調用這個方法時,它會阻塞進程,等待事件完成。
將同步方法轉換成異步方法
public Future<Double> getPriceAsync(String product){ //創建CompletableFuture對象 CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread (()->{ try { //在另一個線程中執行計算 double price = getPrice(product); //需要長時間計算的任務結束並得出結果時,設置future的返回值 futurePrice.complete(price); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); return futurePrice; }
然后可以這樣調用:
System.out.println("begin"); Future<Double> futurePrice = shop.getPriceAsync("ss"); System.out.println("doSomething"); System.out.println(futurePrice.get());
System.out.println("end");
begin
doSomething
171.47509091822835
end
這個例子中,首先會調用接口 立即返回一個Future對象,在這種方式下,在查詢價格的同時,還可以處理其他任務。最后所有的工作都已經完成,然后再調用future的get方法。獲得Future中封裝的值,要么發生阻塞,直到該任務異步任務完成,期望的值能夠返回。
錯誤處理
如果沒有意外,這個代碼工作的會非常正常。但是如果計算價格的過程中發生了錯誤,那么get會永久的被阻塞。這時可以使用重載的get方法,讓它超過一個時間后就強制返回。應該盡量在代碼中使用這種方式來防止程序永久的等待下去。超時會引發TimeoutException。但是這樣會導致你無法知道具體什么原因導致Future無法返回,這時需要使用CompletableFUture的completeExceptionally方法將導致CompletableFuture內發生的問題拋出。
public Future<Double> getPriceAsync(String product){ //創建CompletableFuture對象 CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread (()->{ try { double price = getPrice(product); futurePrice.complete(price); } catch (Exception ex) { //拋出異常 futurePrice.completeExceptionally(ex); } }).start(); return futurePrice; }
調用時:
System.out.println("begin"); Future<Double> futurePrice = shop.getPriceAsync("ss"); System.out.println("doSomething"); try { System.out.println(futurePrice.get(1, TimeUnit.SECONDS)); } catch (TimeoutException e) { System.out.print(e); } System.out.println("end");
設置超時時間,然后會將錯誤信息打印出來。
工廠方法supplyAsync創建CompletableFuture
使用工廠方法可以一句話來創建getPriceAsync方法
public Future<Double> getPriceAsync(String product) { return CompletableFuture.supplyAsync(() -> getPrice(product)); }
supplyAsync方法接受一個生產者(Supplier)作為參數,返回一個CompletableFuture對象,該對象完成異步執行后悔讀取調用生產者方法的返回值。生產者方法會交由ForkJoinPool池中的某個執行線程(Executor)運行,也可以調用supplyAsync方法的重載版本,傳入第二個參數指定不同的線程執行生產者方法。 工廠方法返回的CompletableFuture對象也提供了同樣的錯誤處理機制。
阻塞優化
例如現在有一個商品列表,然后輸出一個字符串 商品名,價格 。
List<Shop> shops = Arrays.asList( new Shop("one"), new Shop("two"), new Shop("three"), new Shop("four")); long start = System.nanoTime(); List<String> str = shops.stream().map(shop -> String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))).collect(toList()); System.out.print(str); long end = System.nanoTime(); System.out.print((end - start) / 1000000);
[one price: 161.83, two price: 126.04, three price: 153.20, four price: 166.06]
4110
每次調用getPrice方法都會阻塞1秒鍾,對付這種我們可以使用並行流來進行優化:
List<String> str = shops.parallelStream().map(shop -> String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))).collect(toList());
1137
明顯速度提升了,現在對四個商品查詢 實現了並行,所以只耗時1秒多點,下面我們嘗試CompletableFuture:
List<CompletableFuture<String>> str2 = shops.stream().map(shop->
CompletableFuture.supplyAsync(
()->String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName())))).collect(toList());
我們使用工廠方法supplyAsync創建CompletableFuture對象,使用這種方式我們會得到一個List<CompletableFuture<String>>,列表中的每一個ComplatableFuture對象在計算完成后都會包含商品的名稱。但是我們要求返回的是List<String>,所以需要等待所有的future執行完畢,再將里面的值提取出來,填充到列表中才能返回。
List<String> str3 =str2.stream().map(CompletableFuture::join).collect(toList());
為了返回List<String> 需要對str2添加第二個map操作,對List中的所有future對象執行join操作,一個接一個的等待他們的運行結束。CompletableFuture類中的join和Future接口中的get方法有相同的含義,並且聲明在Future接口中,唯一的不同是join不會拋出任何檢測到的異常。
1149
現在使用了兩個不同的Stream流水線,而不是在同一個處理流的流水線上一個接一個的防治兩個map操作。考慮流操作之間的延遲特性,如果你在單一流水線中處理流,發向不同商家的請求只能以同步、順序執行的方式才會成功。因此每個創建CompletableFuture對象只能在前一個操作結束之后,再join返回計算結果。
更好的解決方式
並行流的版本工作的非常好,那是因為他可以並行處理8個任務,獲取操作系統線程數量:
System.out.print(Runtime.getRuntime().availableProcessors());
但是如果列表是9個呢?那么執行結果就會2秒。因為他最多只能讓8個線程處於繁忙狀態。 但是使用CompletableFuture允許你對執行器Executor進行配置,尤其是線程池的大小,這是並行流API無法實現的。
定制執行器
//創建一個線程池,線程池的數目為100何商店數目二者中較小的一個值 final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); //使用守護線程 ---這種方式不會阻止程序的關停 return t; } });
這個線程池是一個由守護線程構成的線程池,Java程序無法終止或退出正在運行中的線程,所以最后剩下的那個線程會由於一直等待無法發生的事件而引發問題。與此相反,如果將線程標記為守護進程,意味着程序退出時它也會被回收。這二者之間沒有性能上的差異。現在可以將執行器作為第二個參數傳遞給supplyAsync方法了。
CompletableFuture.supplyAsync( ()->String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName())) ,executor)
這時,執行9個商品時,執行速度只有1秒。 執行18個商品時也是1秒。這種狀態會一直持續,直到商店的數目達到我們之前計算的閥值。 處理需要大量使用異步操作的情況時,這幾乎是最有效的策略。
對多個異步任務進行流水線操作
我們在商品中增加一個枚舉Discount.Code 來代表每個商品對應不同的折扣率,創建枚舉如下:
public class Discount { public enum Code{ NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20); private final int value; Code(int value){ this.value = value; } } }
現在我們修改 getPrice方法的返回格式為:ShopName:price:DiscountCode 使用 : 進行分割的返回值。
public String getPrice(String product){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Double price = new Random().nextDouble() * product.charAt(0) + product.charAt(1); Discount.Code code = Discount.Code.values()[new Random().nextInt(Discount.Code.values().length)]; return String.format("%s:%.2f:%s",name,price,code); }
返回值: one:120.10:GOLDD
將返回結果封裝到 Quote 類中:
public class Quote { private final String shopName; private final double price; private final Discount.Code discountCode; public Quote(String shopName, double price, Discount.Code code) { this.shopName = shopName; this.price = price; this.discountCode = code; } public static Quote parse(String s) { String[] split = s.split(":"); String shopName = split[0]; double price = Double.parseDouble(split[1]); Discount.Code discountCode = Discount.Code.valueOf(split[2]); return new Quote(shopName, price, discountCode); } public String getShopName() { return shopName; } public double getPrice() { return price; } public Discount.Code getDiscountCode() { return discountCode; } }
parse方法 通過getPrice的方法 返回的字符串 會返回Quote對象,此外 Discount服務還提供了一個applyDiscount方法,它接收一個Quote對象,返回一個字符串,表示該Quote的shop中的折扣價格:
public class Discount { public enum Code{.. } public static String applyDiscount(Quote quote){ return quote.getShopName() + "price :" + Discount.apply(quote.getPrice() ,quote.getDiscountCode()); } public static double apply(double price,Code code){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return price * (100 - code.value) / 100; } }
Discount中 也模擬了遠程操作 睡了1秒鍾,首先我們嘗試最直接的方式:
List<String> str = shops.stream() .map(shop->shop.getPrice("hhhhh")) //獲取 one:120.10:GOLDD 格式字符串 .map(Quote::parse) //轉換為 Quote 對象 .map(Discount::applyDiscount) //返回 Quote的shop中的折扣價格 .collect(toList()); System.out.print(str);
8146
首先,我們調用getPrice遠程方法將shop對象轉換成了一個字符串。每個1秒
然后,我們將字符串轉換為Quote對象。
最后,我們將Quote對象 調用 遠程 Discount服務獲取折扣,返回折扣價格。每個1秒
順序執行4個商品是4秒,然后又調用了Discount服務又4秒 所以是8秒。 雖然我們現在把流轉換為並行流 性能會很好 但是數量大於8時也很慢。相反,使用自定義CompletableFuture執行器能夠更充分的利用CPU資源。
List<CompletableFuture<String>> priceFutures = shops.stream() //異步獲取每個shop中的價格 .map(shop -> CompletableFuture.supplyAsync( () -> shop.getPrice("hhhhh", executor) )) //Quote對象存在時,對其返回值進行轉換 .map(future -> future.thenApply(Quote::parse)) //使用另一個異步任務構造期望的future,申請折扣 .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync( () -> Discount.applyDiscount(quote), executor) )) .collect(toList()); //等待流中的所有Future執行完畢,提取各自的返回值 List<String> str = priceFutures.stream().map(CompletableFuture::join).collect(toList()); System.out.print(str);
2126
使用的這三個map跟同步沒有太大的區別,但是使用了CompletableFuture類提供的特性,在需要的地方把他們變成了異步操作。
thenApply方法:當第一個Future運行結束,返回CompletableFuture<String>對象轉換為CompleTableFuture<Quote>對象。
thenCompose方法:將兩個異步操作進行流水線,當第一個操作完成時,將其結果作為參數傳遞給第二個操作。換句話說,你可以創建兩個CompletableFuture對象,對第一個對象調用thenCompose,並向其傳遞一個函數。
這個方法也有Async版本:thenComposeAsync,通常帶后綴的版本是講任務移交到一個新線程,不帶后綴的在當前線程執行。對於這個例子我們沒有加上后綴,因為對於最終結果,或者大致的時間而言都沒有多少差別,少了很多線程切換的開銷。
合並兩個CompletableFuture,無論是否依賴
與上面不同,第二個CompletableFuture無需等待第一個CompletableFuture運行結束。而是,將兩個完全不相干的CompletableFuture對象整合起來,不希望等到第一個任務完全結束才開始第二個任務。
這種情況應該使用thenCombine方法,它接受名為BiFunction的第二個參數,這個參數定義了當兩個CompletableFuture對象完成計算后,結果如何合並。同thenCompose方法一樣,thenCombine方法也提供了一個Async的版本。使用thenCombineAsync會導致BiFunction中定義的合並操作被提交到線程池中,由另一個任務以異步的方式執行。
回到這個例子,比如說我們現在需要第三個CompletableFuture來獲取匯率,展示美元。當前兩個CompletableFuture計算出結果,並由BiFunction方法完全合並后,由它來最終將誒書這一任務:
Future<Double> futurePriceUSD = CompletableFuture.supplyAsync(()->shops.get(0).getPrice("gg")) .thenCombine( CompletableFuture.supplyAsync( ()-> 0.66 //遠程服務獲取 匯率 ),(price,rate) -> price * rate );
這里 第一個參數price 是 getPrice的返回值 double , 第二個參數 rate 是第二個工廠方法返回的0.66 偷了個懶, 最后是他們的結果進行乘法操作 返回最終結果。
響應CompletableFuture的completion事件
在本章中,所有的延遲例子都是延遲1秒鍾,但是在現實世界中,有時可能更糟。到目前為止,你所實現的方法必須等待所有的商品返回時才能現實商品的價格。而你希望的效果是,只要有商品返回商品價格就在第一時間顯示出來,不用等待那些還沒有返回的商品。
CompletableFuture[] futures = shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getPrice("hhhhh", executor) )) .map(future -> future.thenApply(Quote::parse)) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync( () -> Discount.applyDiscount(quote), executor) )) //在每個CompletableFuture上注冊一個操作,該操作會在CompletableFuture完成后使用它的返回值。 //使用thenAccept將結果輸出,它的參數就是 CompletableFuture的返回值。 .map(f -> f.thenAccept(System.out::println)) //你可以把構成的Stream的所有CompletableFuture<void>對象放到一個數組中,等待所有的任務執行完成 .toArray(size -> new CompletableFuture[size]); //allOf方法接受一個CompletableFuture構成的數組,數組中所有的COmpletableFuture對象執行完成后, //它返回一個COmpletableFuture<Void>對象。所以你需要哦等待最初Stream中的所有CompletableFuture對象執行完畢, //對allOf方法返回的CompletableFuture執行join操作 CompletableFuture.allOf(futures).join();
Connected to the target VM, address: '127.0.0.1:62278', transport: 'socket'
8twoprice :113.31
threeprice :108.15
oneprice :137.844
Disconnected from the target VM, address: '127.0.0.1:62278', transport: 'socket'
fourprice :119.2725
3768
還有一個方法anyOf,對於CompletableFuture對象數組中有任何一個執行完畢就不在等待時使用。
小結:
1.執行比較耗時的操作時,尤其是那些依賴一個或多個遠程服務的操作,使用異步任務可以改善程序的性能,加快程序的響應速度。
2.你應該盡可能的為客戶提供異步API。使用CompletableFuture類提供的特性,能夠輕松的實現這一目標。
3.CompletableFuture類還提供了異常管理的機制,然給你有機會拋出/管理異步任務執行中發生的異常。
4.將同步API的調用封裝到一個CompletableFuture中,你能夠以異步的方式使用其結果。
5.如果異步任務之間互相獨立,或者他們之間某一些的結果是另一些的輸入,你可以講這些異步任務合並成一個。
6.你可以為CompletableFuture注冊一個回調函數,在Future執行完畢或者他們計算的結果可用時,針對性的執行一些程序。
7.你可以決定在什么時候將誒書程序的運行,是等待由CompletableFuture對象構成的列表中所有的對象都執行完畢,還是只要其中任何一個首先完成就終止程序的運行。