流式計算(一)-Java8Stream


大約各位看官君多少也聽說了Storm/Spark/Flink,這些都是大數據流式處理框架。如果一條手機組裝流水線上不同的人做不同的事,有的裝電池,有的裝屏幕,直到最后完成,這就是典型的流式處理。如果手機組裝是先全部裝完電池,再交給裝屏幕的組,直到完成,這就是舊式的集合式處理。今天,就來先說說JDK8中的流,雖然不是很個特新鮮的話題,但是一個很好的開始,因為——思想往往比細節重要!

 

作者原創文章,謝絕一切轉載!

 本文只發表在"公眾號"和"博客園",其他均屬復制粘貼!如果覺得排版不清晰,請查看公眾號文章。 

 

准備

Idea2019.03/Gradle5.6.2/JDK11.0.4/Lambda

難度:新手--戰士--老兵--大師

目標

1.Lambda表達式使用

2.流的篩選/切片/查找/匹配/映射/歸約操作

步驟

為了遇見各種問題,同時保持時效性,我盡量使用最新的軟件版本。代碼地址:其中的day22,https://github.com/xiexiaobiao/dubbo-project.git

1.先來兩個概念:

  • 流(Stream):一個元素序列。位於包java.util.stream.Stream,注意這個序列是可以串行或並行處理的。有多種方式建立流,最常見的是從集合(Collection)對象獲取,有序集合如List的流有序,Set的流則無序。
  • Lambda表達式:流式處理的絕佳搭檔!什么是Lambda表達式?略。哪里可以用Lambda表達式?需要實現一個函數式接口(只定義了一個抽象函數的接口)的地方就可以使用Lambda表達式,代替匿名類方式。源代碼中com.biao.lambda包里,我寫了一個簡單的Lambda實例,供參考。

2.流式處理特點:

  • 流水線:流操作可返回一個流,多個操作從而可形成一個鏈,
  • 內部迭代:使用Iterator/forEach顯式迭代器為外部迭代,流的迭代是流內部完成的,只需聲明,是內部迭代,
  • 一次使用:每個流只能消費一次,不能結束后又從頭開始!

3.流的一般使用:

  • 建立流:創建一個Stream對象,如從一個數據源來執行一個查詢;

  • 操作流:一個包含了各種操作的操作鏈;

  • 結束流:一個終端操作,形成一個結果集或值

 

4.來個例子,假設這里有個com.biao.Fruit類:

@Data
public class Fruit {
    private String name;
    private String origin;
    private Integer price;
}

我們要從一堆水果里,找前4種產自中國的名稱是字母A開頭的水果。這還不是小菜?幾次使用Iterator或者forEach循環就實現了!

如果使用流處理,大致處理流程圖示則如下,代碼后面再聊:

 

5.流的主要操作:篩選/切片/查找/匹配/映射/歸約

下面將一一道來:

篩選/切片:使用filter/skip/limit/distinct方法過濾。filter接收一個Predicate函數表達式,方法簽名是T --> boolean,我們來實現上面的圖示邏輯,至於JDK7的實現,看官君可以想一想,對比一下,com.biao.Application1代碼實現片段:

// 創建流
        Stream<Fruit> fruitStream = fruitList.stream();
        // 過濾
        Stream<Fruit> filteredStream = fruitStream.filter(d -> "China".equals(d.getOrigin()));
        // 去掉重復元素
        Stream<Fruit> distinctStream = filteredStream.distinct();
        // 打印流中元素,forEach是終端操作,如果這里使用了,則collect方法無法使用,即一個流只能消費一次
        // distinctStream.forEach(System.out::println);
        // 跳過1個元素,
        Stream<Fruit> skippedStream = distinctStream.skip(1);
        // 切片,參數為maxSize
        Stream<Fruit> limitStream = skippedStream.limit(4);
        // 結束,collect方法是收集器,如果這里使用了,則forEach無法使用,即一個流只能有一個終端操作
        List<Fruit> newList = limitStream.collect(Collectors.toList());
        // 打印結果,lambda方式
        newList.forEach(System.out::println);

        // 鏈式操作,和上面效果一樣,一氣呵成,真爽!
        List<Fruit> newList2 = fruitList.stream()
                .filter(d -> "China".equals(d.getOrigin()))
                .distinct()
                .skip(1)
                .limit(4)
                .collect(Collectors.toList());
        // 打印結果集
        newList2.forEach(System.out::println);

以上代碼核心點:

  • 盡量使用鏈式語法配合Lambda,簡潔至極!
  • 一個流只能有一個終端操作!即一個流只能被消費一次!
  • filter方法的參數表達式可以進行邏輯復合運算,如and/not/or,

映射:對流中的每個元素應用映射函數,變換成新的對象。使用map方法,接受一個Function類型,函數簽名是 T—> R,比如對以上Fruit流提取水果的名稱,並過濾字母A開頭的水果,com.biao.Application2代碼實現片段:

// 創建流
Stream<Fruit> fruitStream = fruitList.stream();
//轉換,變為String流
Stream<String> stringStream = fruitStream.map(Fruit::getName);
//過濾,名稱以A開頭
Stream<String> filteredStream = stringStream.filter(str -> "A".equals(String.valueOf(str.charAt(0))));
//終端操作,set自動去重復
Set<String> stringSet = filteredStream.collect(Collectors.toSet());
//打印結果集
stringSet.forEach(System.out::println);

//鏈式語法實現,請君想象下JDK7的實現,
fruitList.stream()
      .map(Fruit::getName)
      .filter(str -> "A".equals(str.substring(0,1)))
      .collect(Collectors.toSet())
      .forEach(System.out::println);

 

我還寫了個map映射+flatMap扁平化流例子,com.biao.Application3代碼片段:

 /**映射示例2:map映射+flatMap扁平化流*/
String[] arraysOfWords = {"Apple","Banana","Nuts","Olive"};
// 使用Arrays的靜態方法創建流
Stream<String> stringStream = Arrays.stream(arraysOfWords);
// 對每個word映射為String[]
stringStream.map(word -> word.split(""))
      // flatMap扁平化流,將生成的流組合成一個流
      // 如果使用map(Arrays::stream),則生成由流元素組成的流
      .flatMap(Arrays::stream)
      // 去掉重復
      .distinct()
      // 終端操作,collect方法是收集器
      .collect(Collectors.toList())
      .forEach(System.out::println);

 

流的扁平化,一言以蔽之,flatmap方法讓你把一個流中的每個值都換成一個的流(即流中的元素也是流),然后把所有的流連接起來成為一個流。

查找/匹配:StreamAPI通過allMatch,anyMatch,noneMatch,findFirst,findAny方法找到符合的元素,com.biao.Application4代碼實現片段:

   // 注意這里每個都要重建一個流
        // 是否全部價格大於50
        boolean almach = fruitList.stream().allMatch(fruit -> fruit.getPrice() > 50);
        System.out.println(almach);
        // 是否至少有一種產自America
        boolean anyMatch = fruitList.stream().anyMatch(fruit -> "America".equals(fruit.getOrigin()));
        System.out.println(anyMatch);
        // 找出流中第3個,
        Optional<Fruit> thirdOne = fruitList.stream().skip(2).findFirst();
        // 存在則打印,防止NPE
        thirdOne.ifPresent(System.out::println);
        // 找出流中任意一個,,
        Optional<Fruit> anyOne = fruitList.stream().findAny();
        // ifPresent,值存在則執行操作,否則 do nothing!
        anyOne.ifPresent(System.out::println);

 

以上代碼核心點:

  • 這里每個查找/匹配都要重建一個流,
  • 找可能的第3個元素,skip(2).findFirst(),返回Optional T 類,解決返回null的NPE問題,這樣即使不存在第3個元素,返回對象仍然可以繼續做計算,
  • findAny時,流水線將在后台進行優化使其只需走一遍,並在利用短路找到結果時立即結束

歸約:使用reduce對流中元素累積計算,最后得到一個值。比如找到上面水果中價格最高的,計算出產自Japan的水果的總價格,com.biao.Application5代碼實現片段:

// 注意這里每個都要重建一個流
        int totalPrice = fruitList.stream()
                .filter(fruit -> "Japan".equals(fruit.getOrigin()))
                //映射轉換為Integer流
                .map(Fruit::getPrice)
                //reduce歸約計算
                // 也可使用reduce(0,(a,b) -> a+b);
                .reduce(0,Integer::sum);
        System.out.println(totalPrice);

        /** reduce無初始值的歸約計算 */
        Optional<Integer> totalPrice2 = fruitList.stream()
                .map(Fruit::getPrice)
                .reduce((a,b) -> a+b);
        // ifPresent,值存在則執行操作,否則 do nothing!
        totalPrice2.ifPresent(System.out::println);

        /** reduce計算最大*/
        Optional<Integer> maxPrice = fruitList.stream()
                .map(Fruit::getPrice)
                // 歸約計算最大值:
                // 這里也可以使用reduce((x,y) -> x>y?x:y)
                .reduce(Integer::max);
        // ifPresent,值存在則執行操作,否則 do nothing!
        maxPrice.ifPresent(System.out::println);

        /** reduce計算最小值*/
        Optional<Integer> minPrice = fruitList.stream()
                .map(Fruit::getPrice)
                // 歸約計算最小值:也可以使用reduce((x,y) -> x<y?x:y)
                .reduce(Integer::min);
        // ifPresent,值存在則執行操作,否則 do nothing!
        minPrice.ifPresent(System.out::println);

 

以上代碼核心點:

  • reduce的函數參數有幾種重載,返回的值不一樣,無初始值的返回Optional對象,
  • Optional. ifPresent方法,Optional對象值存在則執行操作,否則 do nothing,
  • map和reduce的連接通常稱為map-reduce模式,源於google的搜索模式,

6.看完了上面的各種流操作,看官君也許會說,似乎也沒啥大不了的啊,頂多是少寫了幾行代碼,那請用集合式實現下相同邏輯!另請君回憶下,是否有過多層嵌套或者N多分支的if-elseif-else/switch-case場景?那現在就請試試流式寫法!事實上我這里只是舉了幾個常規的應用例子而已,抓住下看官君的興趣,StreamAPI還有其他強大的功能:

  • 無限流、范圍流:能直接創建無限流,再局部處理,
  • Collect收集器的分區分組:將最終結果集按條件分區分組,類比SQL的groupBy,
  • 流的parallel/sequential並行計算和順序計算:聲明式並行計算,無需為鎖煩惱,
  • 分支/合並ForkJoin框架的遞歸計算:多線程方式處理,還可自定義線程池參數,
  • 同步/異步執行:使用CompletableFuture類實現更高效的異步處理,

總結:

1.流不僅僅是將外部迭代變為內部迭代,更是一種編程思想的轉變,結合函數抽象,行為參數化,將函數作為參數,提升為與值一樣的地位,威力巨大,這就是生產力。

2.流計算能大大簡化編程,使用聲明式語法,配合Lambda,寫起代碼根本停不下來!

3.后期會在看看其他Storm/Spark/Flink流式計算框架,發現點新鮮貨。

全文結束!


推薦閱讀:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM