[Java 8] (10) 使用Lambda完成函數組合,Map-Reduce以及並行化


好文推薦!!!!!

原文見:http://blog.csdn.net/dm_vincent/article/details/40856569

Java 8中同時存在面向對象編程(OOP)和函數式編程(FP, Functional Programming)這兩種編程范式。實際上,這兩種范式並不矛盾,只是着重點不同。在OOP中,着重於通過豐富的類型系統對需要解決的問題進行建模;而FP中則着重於通過高階函數和Lambda表達式來完成計算。所以我們完全可以將這兩者融合在一起,對問題提出更加優雅的解決方案。

在這篇文章中,會介紹如何通過函數組合(Function Composition)來將若干個函數單元組合成一個Map-Reduce模式的應用。同時,還會介紹如何將整個計算過程並行化。

使用函數組合

在使用函數式編程的時候,函數是組成程序的單元。通過將函數以高階函數的形式組織,可以有效地提高不變性(Immutability),從而減少程序的狀態變化,最終讓並行化更加容易。

下面這張圖反映了,純粹的面向對象設計和混合式設計(面向對象和函數式)的風格。

 

在OOP中,對象的狀態會隨着程序的進行而不斷發生變化,但是對象始終只有一個。 而在FP中,對象每次被一個函數處理之后,都會得到一個新的對象,而原來的對象並不會發生變化。

下面是一個小例子,讓你對這種混合式的編程范式有一個初步的了解。假設我們有一些股票的代碼,需要得到股票價格大於100美元的股票並對它們進行排序:

public class Tickers { public static final List<String> symbols = Arrays.asList( "AMD", "HPQ", "IBM", "TXN", "VMW", "XRX", "AAPL", "ADBE", "AMZN", "CRAY", "CSCO", "DELL", "GOOG", "INTC", "INTU", "MSFT", "ORCL", "TIBX", "VRSN", "YHOO"); }

對於每只股票代碼,可以通過調用下面這段程序借助Yahoo提供的Web Service來得到對應的股價:

public class YahooFinance { public static BigDecimal getPrice(final String ticker) { try { final URL url = new URL("http://ichart.finance.yahoo.com/table.csv?s=" + ticker); final BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream())); final String data = reader.lines().skip(1).findFirst().get(); final String[] dataItems = data.split(","); return new BigDecimal(dataItems[dataItems.length - 1]); } catch(Exception ex) { throw new RuntimeException(ex); } } }

最后,通過一串操作來得到我們需要的答案:

final BigDecimal HUNDRED = new BigDecimal("100"); System.out.println("Stocks priced over $100 are " + Tickers.symbols .stream() .filter(symbol -> YahooFinance.getPrice(symbol).compareTo(HUNDRED) > 0) .sorted() .collect(joining(", ")));

這就是一個混合范式的應用,將主要的計算邏輯通過方法進行封裝,然后將這些函數根據其所屬的類型進行面向對象建模,比如getPrice方法屬於類型YahooFinance。最后使用Stream類型和Lambda表達式完成需要執行的計算邏輯,得到最終結果。

將計算邏輯封裝成一個函數調用鏈的好處在於:

  • 更簡潔,代碼量會少很多,從而代碼也更容易被理解
  • 提高了對象的不變性(Immutability),從而更加容易並行化
  • 調用鏈中的每一環都很容易被復用,如filter,sorted等

使用Map-Reduce

顧名思義,Map-Reduce實際上分為了兩個步驟:

  1. Map階段:對集合中的元素進行操作
  2. Reduce階段:將上一步得到的結果進行合並得到最終的結果

正是因為這個模式十分簡單,同時它也能夠最大限度的利用多核處理器的能力,所以它得到了廣泛關注。

比如,當我們需要得到股票價格小於500美元的最高價格的股票時,應該如何做呢? 首先我們還是從最熟悉的命令式代碼開始。

准備工作

首先,我們需要對這個問題進行一個基礎的建模,這個步驟就是面向對象設計的過程。很容易地,可以得到下面的實體類型:

public class StockInfo { public final String ticker; public final BigDecimal price; public StockInfo(final String symbol, final BigDecimal thePrice) { ticker = symbol; price = thePrice; } public String toString() { return String.format("ticker: %s price: %g", ticker, price); } }

同時,也需要一些工具方法來幫助我們解決這個問題:

  1. 通過股票代碼得到對應的實體信息。我們可以使用前面介紹的YahooFinance中定義的getPrice方法來完成這一任務。
  2. 判斷股票的價格是否小於某個值,可以通過Predicate函數接口實現,它是一個高階函數,會將傳入的price信息作為閾值來生成一個Lambda表達式並返回。
  3. 用來比較取得兩個股價實體對象中股價較高的對象的方法。

分別實現如下:

public class StockUtil { public static StockInfo getPrice(final String ticker) { return new StockInfo(ticker, YahooFinance.getPrice(ticker)); } public static Predicate<StockInfo> isPriceLessThan(final int price) { return stockInfo -> stockInfo.price.compareTo(BigDecimal.valueOf(price)) < 0; } public static StockInfo pickHigh( final StockInfo stockInfo1, final StockInfo stockInfo2) { return stockInfo1.price.compareTo(stockInfo2.price) > 0 ? stockInfo1 : stockInfo2; } }

命令式風格

有了以上的准備工作,我們就可以着手實現了。首先是命令式風格的代碼,這也是最熟悉的方式:

final List<StockInfo> stocks = new ArrayList<>(); for(String symbol : Tickers.symbols) { stocks.add(StockUtil.getPrice(symbol)); } final List<StockInfo> stocksPricedUnder500 = new ArrayList<>(); final Predicate<StockInfo> isPriceLessThan500 = StockUtil.isPriceLessThan(500); for(StockInfo stock : stocks) { if(isPriceLessThan500.test(stock)) stocksPricedUnder500.add(stock); } StockInfo highPriced = new StockInfo("", BigDecimal.ZERO); for(StockInfo stock : stocksPricedUnder500) { highPriced = StockUtil.pickHigh(highPriced, stock); } System.out.println("High priced under $500 is " + highPriced);

上述代碼完成了以下幾個工作:

  1. 首先是根據股票代碼得到股價信息,然后將股價實體放到一個列表對象中。
  2. 然后對集合進行一次遍歷,得到所有價格低於500美元的股價實體。
  3. 對步驟2中的結果進行遍歷,得到其中擁有最高股價的實體。

當然,如果覺得循環的次數太多了,我們也可以將它們合並到一個循環中:

StockInfo highPriced = new StockInfo("", BigDecimal.ZERO); final Predicate<StockInfo> isPriceLessThan500 = StockUtil.isPriceLessThan(500); for(String symbol : Tickers.symbols) { StockInfo stockInfo = StockUtil.getPrice(symbol); if(isPriceLessThan500.test(stockInfo)) highPriced = StockUtil.pickHigh(highPriced, stockInfo); } System.out.println("High priced under $500 is " + highPriced);

可以發現,只是使用了一個Predicate類型的Lambda表達式就可以將代碼的篇幅大大的較少。 只不過,以上的代碼仍然是命令式風格,仍然會通過對變量進行修改來實現計算邏輯。更重要的是,以上的代碼復用性比較差,當我們需要更改過濾條件的時候,就需要對它進行修改。

更好的辦法是將所有會發生變化的代碼封裝成一個個單獨的小模塊,然后使用函數式風格的代碼將它們聯系起來。

函數式風格

使用函數式風格后,代碼中看不到for循環的蹤影了:

public static void findHighPriced(final Stream<String> symbols) { final StockInfo highPriced = symbols .map(StockUtil::getPrice) .filter(StockUtil.isPriceLessThan(500)) .reduce(StockUtil::pickHigh) .get(); System.out.println("High priced under $500 is " + highPriced); }

map,filter和reduce方法分別替代了三個for循環,而且代碼也變的異常簡潔。除了簡潔之外,更重要的是這段代碼隨時可以被並行化。

以上的計算邏輯可以使用下圖進行表達:

 

並行化

 

在實施並行化之前,讓我們看看上面的幾個操作:map,filter和reduce。

顯然,map方法的速度是最慢的,因為它依賴於外部的Web Service。但是同時也可以注意到,對於每個股票代碼,獲取它們對應的股價信息是完全獨立的,故而可以考慮將這部分並行化。

當需要讓一段代碼以並行的方式運行時,需要考慮兩個方面:

  • 如何完成?
  • 如何以合適的方式完成?

對於第一個方面,我們可以使用JDK中提供的各種並發相關的庫來完成。 對於第二個方面,就需要我們根據這段代碼的特點進行考慮了。對於並發程序,首先需要避免的是競態條件(Race Condition),當多個線程試圖去更新一個對象或者一個變量時,就有可能發生。所以對於這類更新,我們需要小心翼翼地維護其線程安全性。反過來,如果對象的狀態是不可變的(狀態變量被修飾為final),那么滋生競態條件的土壤也就不復存在了,而這一點正是函數式編程所一再強調和標榜的。

因此,在嚴格遵守函數式編程的最佳實踐后,並行化只不過是臨門一腳的功夫而已:

// 串行執行的調用方式
findHighPriced(Tickers.symbols.stream()); // 並行執行的調用方式 findHighPriced(Tickers.symbols.parallelStream());

只不過是把stream方法替換成了parallelStream方法,就給代碼插上了並行的翅膀。不需要考慮如何完成,也不需要考慮如競態條件那樣的各種風險。

關於這兩個方法的定義,可以在Collection接口中找到,這也意味着不僅僅對於List類型可以很方便的實現並行,對其它實現了Collection接口的類型也非常方便:

default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); } default Stream<E> parallelStream() { return StreamSupport.stream(spliterator(), true); }

這里不打算對深層的實現原理進行剖析,但是當使用parallelStream時,意味着像map,filter這樣的方法都會以並行的方式被運行,而工作線程則是來自於底層的一個線程池。這些細節都已經被封裝的相當好,作為開發人員只需要保證你的代碼確實遵守了游戲規則。

串行方式和並行方式的性能比較如下:

串行 並行
24.325s 5.621s

通過簡單地改變一個方法,就將性能提高了接近5倍!這也許就是函數式編程的魅力之一吧。

那么,在從stream和parallelStream方法中進行選擇時,需要考慮以下幾個問題:

  1. 是否需要並行?
  2. 任務之間是否是獨立的?是否會引起任何競態條件?
  3. 結果是否取決於任務的調用順序?

對於問題2,如果任務之間是獨立的,並且代碼中不涉及到對同一個對象的某個狀態或者某個變量的更新操作,那么就表明代碼是可以被並行化的。

對於問題3,由於在並行環境中任務的執行順序是不確定的,因此對於依賴於順序的任務而言,並行化也許不能給出正確的結果。

對於問題1,在回答這個問題之前,你需要弄清楚你要解決的問題是什么,數據量有多大,計算的特點是什么?並不是所有的問題都適合使用並發程序來求解,比如當數據量不大時,順序執行往往比並行執行更快。畢竟,准備線程池和其它相關資源也是需要時間的。

但是,當任務涉及到I/O操作並且任務之間不互相依賴時,那么並行化就是一個不錯的選擇。通常而言,將這類程序並行化之后,執行速度會提升好幾個等級,正如上面的例子那樣。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM