大約各位看官君多少也聽說了Storm/Spark/Flink,這些都是大數據流式處理框架。如果一條手機組裝流水線上不同的人做不同的事,有的裝電池,有的裝屏幕,直到最后完成,這就是典型的流式處理。如果手機組裝是先全部裝完電池,再交給裝屏幕的組,直到完成,這就是舊式的集合式處理。今天,就來先說說JDK8中的流,雖然不是很個特新鮮的話題,但是一個很好的開始,因為——思想往往比細節重要!
作者原創文章,謝絕一切轉載!
本文只發表在"公眾號"和"博客園",其他均屬復制粘貼!如果覺得排版不清晰,請查看公眾號文章。
准備:
Idea2019.03/Gradle5.6.2/JDK11.0.4/Lambda
難度:新手--戰士--老兵--大師
目標:
1.Lambda表達式使用
2.流的篩選/切片/查找/匹配/映射/歸約操作
步驟:
為了遇見各種問題,同時保持時效性,我盡量使用最新的軟件版本。代碼地址:其中的day22,https://github.com/xiexiaobiao/dubbo-project.git
1.先來兩個概念:
- 流(Stream):一個元素序列。位於包java.util.stream.Stream,注意這個序列是可以串行或並行處理的。有多種方式建立流,最常見的是從集合(Collection)對象獲取,有序集合如List的流有序,Set的流則無序。
- Lambda表達式:流式處理的絕佳搭檔!什么是Lambda表達式?略。哪里可以用Lambda表達式?需要實現一個函數式接口(只定義了一個抽象函數的接口)的地方就可以使用Lambda表達式,代替匿名類方式。源代碼中com.biao.lambda包里,我寫了一個簡單的Lambda實例,供參考。
2.流式處理特點:
- 流水線:流操作可返回一個流,多個操作從而可形成一個鏈,
- 內部迭代:使用Iterator/forEach顯式迭代器為外部迭代,流的迭代是流內部完成的,只需聲明,是內部迭代,
- 一次使用:每個流只能消費一次,不能結束后又從頭開始!
3.流的一般使用:
-
建立流:創建一個Stream對象,如從一個數據源來執行一個查詢;
-
操作流:一個包含了各種操作的操作鏈;
-
結束流:一個終端操作,形成一個結果集或值
4.來個例子,假設這里有個com.biao.Fruit類:
@Data public class Fruit { private String name; private String origin; private Integer price; }
我們要從一堆水果里,找前4種產自中國的名稱是字母A開頭的水果。這還不是小菜?幾次使用Iterator或者forEach循環就實現了!
如果使用流處理,大致處理流程圖示則如下,代碼后面再聊:
5.流的主要操作:篩選/切片/查找/匹配/映射/歸約
下面將一一道來:
篩選/切片:使用filter/skip/limit/distinct方法過濾。filter接收一個Predicate函數表達式,方法簽名是T --> boolean,我們來實現上面的圖示邏輯,至於JDK7的實現,看官君可以想一想,對比一下,com.biao.Application1代碼實現片段:
// 創建流 Stream<Fruit> fruitStream = fruitList.stream(); // 過濾 Stream<Fruit> filteredStream = fruitStream.filter(d -> "China".equals(d.getOrigin())); // 去掉重復元素 Stream<Fruit> distinctStream = filteredStream.distinct(); // 打印流中元素,forEach是終端操作,如果這里使用了,則collect方法無法使用,即一個流只能消費一次 // distinctStream.forEach(System.out::println); // 跳過1個元素, Stream<Fruit> skippedStream = distinctStream.skip(1); // 切片,參數為maxSize Stream<Fruit> limitStream = skippedStream.limit(4); // 結束,collect方法是收集器,如果這里使用了,則forEach無法使用,即一個流只能有一個終端操作 List<Fruit> newList = limitStream.collect(Collectors.toList()); // 打印結果,lambda方式 newList.forEach(System.out::println); // 鏈式操作,和上面效果一樣,一氣呵成,真爽! List<Fruit> newList2 = fruitList.stream() .filter(d -> "China".equals(d.getOrigin())) .distinct() .skip(1) .limit(4) .collect(Collectors.toList()); // 打印結果集 newList2.forEach(System.out::println);
以上代碼核心點:
- 盡量使用鏈式語法配合Lambda,簡潔至極!
- 一個流只能有一個終端操作!即一個流只能被消費一次!
- filter方法的參數表達式可以進行邏輯復合運算,如and/not/or,
映射:對流中的每個元素應用映射函數,變換成新的對象。使用map方法,接受一個Function類型,函數簽名是 T—> R,比如對以上Fruit流提取水果的名稱,並過濾字母A開頭的水果,com.biao.Application2代碼實現片段:
// 創建流 Stream<Fruit> fruitStream = fruitList.stream(); //轉換,變為String流 Stream<String> stringStream = fruitStream.map(Fruit::getName); //過濾,名稱以A開頭 Stream<String> filteredStream = stringStream.filter(str -> "A".equals(String.valueOf(str.charAt(0)))); //終端操作,set自動去重復 Set<String> stringSet = filteredStream.collect(Collectors.toSet()); //打印結果集 stringSet.forEach(System.out::println); //鏈式語法實現,請君想象下JDK7的實現, fruitList.stream() .map(Fruit::getName) .filter(str -> "A".equals(str.substring(0,1))) .collect(Collectors.toSet()) .forEach(System.out::println);
我還寫了個map映射+flatMap扁平化流例子,com.biao.Application3代碼片段:
/**映射示例2:map映射+flatMap扁平化流*/ String[] arraysOfWords = {"Apple","Banana","Nuts","Olive"}; // 使用Arrays的靜態方法創建流 Stream<String> stringStream = Arrays.stream(arraysOfWords); // 對每個word映射為String[] stringStream.map(word -> word.split("")) // flatMap扁平化流,將生成的流組合成一個流 // 如果使用map(Arrays::stream),則生成由流元素組成的流 .flatMap(Arrays::stream) // 去掉重復 .distinct() // 終端操作,collect方法是收集器 .collect(Collectors.toList()) .forEach(System.out::println);
流的扁平化,一言以蔽之,flatmap方法讓你把一個流中的每個值都換成一個的流(即流中的元素也是流),然后把所有的流連接起來成為一個流。
查找/匹配:StreamAPI通過allMatch,anyMatch,noneMatch,findFirst,findAny方法找到符合的元素,com.biao.Application4代碼實現片段:
// 注意這里每個都要重建一個流 // 是否全部價格大於50 boolean almach = fruitList.stream().allMatch(fruit -> fruit.getPrice() > 50); System.out.println(almach); // 是否至少有一種產自America boolean anyMatch = fruitList.stream().anyMatch(fruit -> "America".equals(fruit.getOrigin())); System.out.println(anyMatch); // 找出流中第3個, Optional<Fruit> thirdOne = fruitList.stream().skip(2).findFirst(); // 存在則打印,防止NPE thirdOne.ifPresent(System.out::println); // 找出流中任意一個,, Optional<Fruit> anyOne = fruitList.stream().findAny(); // ifPresent,值存在則執行操作,否則 do nothing! anyOne.ifPresent(System.out::println);
以上代碼核心點:
- 這里每個查找/匹配都要重建一個流,
- 找可能的第3個元素,skip(2).findFirst(),返回Optional T 類,解決返回null的NPE問題,這樣即使不存在第3個元素,返回對象仍然可以繼續做計算,
- findAny時,流水線將在后台進行優化使其只需走一遍,並在利用短路找到結果時立即結束
歸約:使用reduce對流中元素累積計算,最后得到一個值。比如找到上面水果中價格最高的,計算出產自Japan的水果的總價格,com.biao.Application5代碼實現片段:
// 注意這里每個都要重建一個流 int totalPrice = fruitList.stream() .filter(fruit -> "Japan".equals(fruit.getOrigin())) //映射轉換為Integer流 .map(Fruit::getPrice) //reduce歸約計算 // 也可使用reduce(0,(a,b) -> a+b); .reduce(0,Integer::sum); System.out.println(totalPrice); /** reduce無初始值的歸約計算 */ Optional<Integer> totalPrice2 = fruitList.stream() .map(Fruit::getPrice) .reduce((a,b) -> a+b); // ifPresent,值存在則執行操作,否則 do nothing! totalPrice2.ifPresent(System.out::println); /** reduce計算最大*/ Optional<Integer> maxPrice = fruitList.stream() .map(Fruit::getPrice) // 歸約計算最大值: // 這里也可以使用reduce((x,y) -> x>y?x:y) .reduce(Integer::max); // ifPresent,值存在則執行操作,否則 do nothing! maxPrice.ifPresent(System.out::println); /** reduce計算最小值*/ Optional<Integer> minPrice = fruitList.stream() .map(Fruit::getPrice) // 歸約計算最小值:也可以使用reduce((x,y) -> x<y?x:y) .reduce(Integer::min); // ifPresent,值存在則執行操作,否則 do nothing! minPrice.ifPresent(System.out::println);
以上代碼核心點:
- reduce的函數參數有幾種重載,返回的值不一樣,無初始值的返回Optional對象,
- Optional. ifPresent方法,Optional對象值存在則執行操作,否則 do nothing,
- map和reduce的連接通常稱為map-reduce模式,源於google的搜索模式,
6.看完了上面的各種流操作,看官君也許會說,似乎也沒啥大不了的啊,頂多是少寫了幾行代碼,那請用集合式實現下相同邏輯!另請君回憶下,是否有過多層嵌套或者N多分支的if-elseif-else/switch-case場景?那現在就請試試流式寫法!事實上我這里只是舉了幾個常規的應用例子而已,抓住下看官君的興趣,StreamAPI還有其他強大的功能:
- 無限流、范圍流:能直接創建無限流,再局部處理,
- Collect收集器的分區分組:將最終結果集按條件分區分組,類比SQL的groupBy,
- 流的parallel/sequential並行計算和順序計算:聲明式並行計算,無需為鎖煩惱,
- 分支/合並ForkJoin框架的遞歸計算:多線程方式處理,還可自定義線程池參數,
- 同步/異步執行:使用CompletableFuture類實現更高效的異步處理,
總結:
1.流不僅僅是將外部迭代變為內部迭代,更是一種編程思想的轉變,結合函數抽象,行為參數化,將函數作為參數,提升為與值一樣的地位,威力巨大,這就是生產力。
2.流計算能大大簡化編程,使用聲明式語法,配合Lambda,寫起代碼根本停不下來!
3.后期會在看看其他Storm/Spark/Flink流式計算框架,發現點新鮮貨。
全文結束!
推薦閱讀: