使用Java 8中的Stream


Stream是Java 8 提供的高效操作集合類(Collection)數據的API。

1. 從Iterator到Stream

有一個字符串的list,要統計其中長度大於7的字符串的數量,用迭代來實現:

List<String> wordList = Arrays.asList("regular", "expression", "specified", "as", "a", "string", "must"); int countByIterator = 0; for (String word: wordList) { if (word.length() > 7) { countByIterator++; } } 

用Stream實現:

long countByStream= wordList.stream().filter(w -> w.length() > 7).count(); 

顯然,用stream實現更簡潔,不僅如此,stream很容易實現並發操作,比如:

long countByParallelStream = wordList.parallelStream().filter(w -> w.length() > 7).count(); 

stream遵循的原則是:告訴我做什么,不用管我怎么做。比如上例:告訴stream通過多線程統計字符串長度,至於以什么順序、在哪個線程中執行,由stream來負責;而在迭代實現中,由於計算的方式已確定,很難優化了。

Stream和Collection的區別主要有:

  • stream本身並不存儲數據,數據是存儲在對應的collection里,或者在需要的時候才生成的;
  • stream不會修改數據源,總是返回新的stream;
  • stream的操作是懶執行(lazy)的:僅當最終的結果需要的時候才會執行,比如上面的例子中,結果僅需要前3個長度大於7的字符串,那么在找到前3個長度符合要求的字符串后, filter()將停止執行;

使用stream的步驟如下:

  1. 創建stream;
  2. 通過一個或多個中間操作(intermediate operations)將初始stream轉換為另一個stream;
  3. 通過中止操作(terminal operation)獲取結果;該操作觸發之前的懶操作的執行,中止操作后,該stream關閉,不能再使用了;

在上面的例子中, wordList.stream() 和 wordList.parallelStream() 是創建stream, filter() 是中間操作,過濾后生成一個新的stream, count() 是中止操作,獲取結果。

2. 創建Stream的方式

  1. 從array或list創建stream:
Stream<Integer> integerStream = Stream.of(10, 20, 30, 40); String[] cityArr = {"Beijing", "Shanghai", "Chengdu"}; Stream<String> cityStream = Stream.of(cityArr); Stream<String> nameStream = Arrays.asList("Daniel", "Peter", "Kevin").stream(); Stream<String> cityStream2 = Arrays.stream(cityArr, 0, 1); Stream<String> emptyStream = Stream.empty(); 
  1. 通過 generate 和 iterate 創建無窮stream:
Stream<String> echos = Stream.generate(() -> "echo"); Stream<Integer> integers = Stream.iterate(0, num -> num + 1); 
  1. 通過其它API創建stream:
Stream<String> lines = Files.lines(Paths.get("test.txt")) String content = "AXDBDGXC"; Stream<String> contentStream = Pattern.compile("[ABC]{1,3}").splitAsStream(content); 

3. Stream轉換

  1. filter() 用於過濾,即使原stream中滿足條件的元素構成新的stream:
List<String> langList = Arrays.asList("Java", "Python", "Swift", "HTML"); Stream<String> filterStream = langList.stream().filter(lang -> lang.equalsIgnoreCase("java")); 
  1. map() 用於映射,遍歷原stream中的元素,轉換后構成新的stream:
List<String> langList = Arrays.asList("Java", "Python", "Swift", "HTML"); Stream<String> mapStream = langList.stream().map(String::toUpperCase); 
  1. flatMap() 用於將 [["ABC", "DEF"], ["FGH", "IJK"]] 的形式轉換為 ["ABC", "DEF", "FGH", "IJK"] :
Stream<String> cityStream = Stream.of("Beijing", "Shanghai", "Shenzhen"); // [['B', 'e', 'i', 'j', 'i', 'n', 'g'], ['S', 'h', 'a', 'n', 'g', 'h', 'a', 'i'], ...] Stream<Stream<Character>> characterStream1 = cityStream.map(city -> characterStream(city)); Stream<String> cityStreamCopy = Stream.of("Beijing", "Shanghai", "Shenzhen"); // ['B', 'e', 'i', 'j', 'i', 'n', 'g', 'S', 'h', 'a', 'n', 'g', 'h', 'a', 'i', ...] Stream<Character> characterStreamCopy = cityStreamCopy.flatMap(city -> characterStream(city)); 

其中, characterStream() 返回有參數字符串的字符構成的Stream ;

  1. limit() 表示限制stream中元素的數量, skip() 表示跳過stream中前幾個元素, concat 表示將多個stream連接起來, peek() 主要用於debug時查看stream中元素的值:
Stream<Integer> limitStream = Stream.of(18, 20, 12, 35, 89).sorted().limit(3); Stream<Integer> skipStream = Stream.of(18, 20, 12, 35, 89).sorted(Comparator.reverseOrder()).skip(1); Stream<Integer> concatStream = Stream.concat(Stream.of(1, 2, 3), Stream.of(4, 5, 6)); concatStream.peek(i -> System.out.println(i)).count(); 

peek() 是 intermediate operation ,所以后面需要一個 terminal operation ,如 count() 才能在輸出中看到結果;

  1. 有狀態的(stateful)轉換,即元素之間有依賴關系,如 distinct() 返回由唯一元素構成的stream, sorted() 返回排序后的stream:
Stream<String> distinctStream = Stream.of("Beijing", "Tianjin", "Beijing").distinct(); Stream<String> sortedStream = Stream.of("Beijing", "Shanghai", "Chengdu").sorted(Comparator.comparing (String::length).reversed()); 

4. Stream reduction

reduction 就是從stream中取出結果,是 terminal operation ,因此經過 reduction 后的stream不能再使用了。

4.1 Optional

Optional 表示或者有一個T類型的對象,或者沒有值;

  1. 創建Optional對象

直接通過Optional的類方法: of() / empty() / ofNullable() :

Optional<Integer> intOpt = Optional.of(10); Optional<String> emptyOpt = Optional.empty(); Optional<Double> doubleOpt = Optional.ofNullable(5.5); 
  1. 使用Optional對象

你當然可以這么使用:

if (intOpt.isPresent()) { intOpt.get(); } 

但是,最好這么使用:

doubleOpt.orElse(0.0); doubleOpt.orElseGet(() -> 1.0); doubleOpt.orElseThrow(RuntimeException::new); List<Double> doubleList = new ArrayList<>(); doubleOpt.ifPresent(doubleList::add);

map() 方法與 ifPresent() 用法相同,就是多個返回值, flatMap() 用於Optional的鏈式表達:

Optional<Boolean> addOk = doubleOpt.map(doubleList::add); Optional.of(4.0).flatMap(num -> Optional.ofNullable(num * 100)).flatMap(num -> Optional.ofNullable(Math.sqrt (num))); 

4.2 簡單的reduction

主要包含以下操作: findFirst() / findAny() / allMatch / anyMatch() / noneMatch ,比如:

Optional<String> firstWord = wordStream.filter(s -> s.startsWith("Y")).findFirst(); Optional<String> anyWord = wordStream.filter(s -> s.length() > 3).findAny(); wordStream.allMatch(s -> s.length() > 3); wordStream.anyMatch(s -> s.length() > 3); wordStream.noneMatch(s -> s.length() > 3); 

4.3 reduce方法

  1. reduce(accumulator) :參數是一個執行雙目運算的 Functional Interface ,假如這個參數表示的操作為op,stream中的元素為x, y, z, …,則 reduce() 執行的就是 x op y op z ...,所以要求op這個操作具有結合性(associative),即滿足: (x op y) op z = x op (y op z),滿足這個要求的操作主要有:求和、求積、求最大值、求最小值、字符串連接、集合並集和交集等。另外,該函數的返回值是Optional的:
Optional<Integer> sum1 = numStream.reduce((x, y) -> x + y); 
  1. reduce(identity, accumulator) :可以認為第一個參數為默認值,但需要滿足 identity op x = x ,所以對於求和操作, identity 的值為0,對於求積操作, identity 的值為1。返回值類型是stream元素的類型:
Integer sum2 = numStream.reduce(0, Integer::sum); 

5. collect結果

  1. collect() 方法

reduce() 和 collect() 的區別是:

  • reduce() 的結果是一個值;
  • collect() 可以對stream中的元素進行各種處理后,得到stream中元素的值;

Collectors 接口提供了很方便的創建 Collector 對象的工廠方法:

// collect to Collection Stream.of("You", "may", "assume").collect(Collectors.toList()); Stream.of("You", "may", "assume").collect(Collectors.toSet()); Stream.of("You", "may", "assume").collect(Collectors.toCollection(TreeSet::new)); // join element Stream.of("You", "may", "assume").collect(Collectors.joining()); Stream.of("You", "may", "assume").collect(Collectors.joining(", ")); // summarize element IntSummaryStatistics summary = Stream.of("You", "may", "assume").collect(Collectors.summarizingInt(String::length)); summary.getMax(); 
  1. foreach() 方法
  • foreach() 用於遍歷stream中的元素,屬於 terminal operation ;
  • forEachOrdered() 是按照stream中元素的順序遍歷,也就無法利用並發的優勢;

    Stream.of(“You”, “may”, “assume”, “you”, “can”, “fly”).parallel().forEach(w -> System.out.println(w));Stream.of(“You”, “may”, “assume”, “you”, “can”, “fly”).forEachOrdered(w -> System.out.println(w));

  1. toArray() 方法

得到由stream中的元素得到的數組,默認是Object[],可以通過參數設置需要結果的類型:

Object[] words1 = Stream.of("You", "may", "assume").toArray(); String[] words2 = Stream.of("You", "may", "assume").toArray(String[]::new); 
  1. toMap() 方法

toMap : 將stream中的元素映射為 的形式,兩個參數分別用於生成對應的key和value的值。比如有一個字符串stream,將首字母作為key,字符串值作為value,得到一個map:

Stream<String> introStream = Stream.of("Get started with UICollectionView and the photo library".split(" ")); Map<String, String> introMap = introStream.collect(Collectors.toMap(s -> s.substring(0, 1), s -> s)); 

如果一個key對應多個value,則會拋出異常,需要使用第三個參數設置如何處理沖突,比如僅使用原來的value、使用新的value,或者合並:

Stream<String> introStream = Stream.of("Get started with UICollectionView and the photo library".split(" ")); Map<Integer, String> introMap2 = introStream.collect(Collectors.toMap(s -> s.length(), s -> s, (existingValue, newValue) -> existingValue)); 

如果value是一個集合,即將key對應的所有value放到一個集合中,則需要使用第三個參數,將多個value合並:

Stream<String> introStream3 = Stream.of("Get started with UICollectionView and the photo library".split(" ")); Map<Integer, Set<String>> introMap3 = introStream3.collect(Collectors.toMap(s -> s.length(), s -> Collections.singleton(s), (existingValue, newValue) -> { HashSet<String> set = new HashSet<>(existingValue); set.addAll(newValue); return set; })); introMap3.forEach((k, v) -> System.out.println(k + ": " + v)); 

如果value是對象自身,則使用 Function.identity() ,如:

Map<Integer, Person> idToPerson = people.collect(Collectors.toMap(Person::getId, Function.identity())); 

toMap() 默認返回的是HashMap,如果需要其它類型的map,比如TreeMap,則可以在第四個參數指定構造方法:

Map<Integer, String> introMap2 = introStream.collect(Collectors.toMap(s -> s.length(), s -> s, (existingValue, newValue) -> existingValue, TreeMap::new)); 

6. Grouping和Partitioning

  1. groupingBy() 表示根據某一個字段或條件進行分組,返回一個Map,其中key為分組的字段或條件,value默認為list, groupingByConcurrent() 是其並發版本:
Map<String, List<Locale>> countryToLocaleList = Stream.of(Locale.getAvailableLocales()) .collect(Collectors.groupingBy(l -> l.getDisplayCountry())); 
  1. 如果 groupingBy() 分組的依據是一個bool條件,則key的值為true/false,此時與 partitioningBy() 等價,且 partitioningBy() 的效率更高:
// predicate Map<Boolean, List<Locale>> englishAndOtherLocales = Stream.of(Locale.getAvailableLocales()) .collect(Collectors.groupingBy(l -> l.getDisplayLanguage().equalsIgnoreCase("English"))); // partitioningBy Map<Boolean, List<Locale>> englishAndOtherLocales2 = Stream.of(Locale.getAvailableLocales()) .collect(Collectors.partitioningBy(l -> l.getDisplayLanguage().equalsIgnoreCase("English"))); 
  1. groupingBy() 提供第二個參數,表示 downstream ,即對分組后的value作進一步的處理
  • 返回set,而不是list:
Map<String, Set<Locale>> countryToLocaleSet = Stream.of(Locale.getAvailableLocales()).collect(Collectors.groupingBy(l -> l.getDisplayCountry(), Collectors.toSet())); 
  • 返回value集合中元素的數量:
Map<String, Long> countryToLocaleCounts = Stream.of(Locale.getAvailableLocales()) .collect(Collectors.groupingBy(l -> l.getDisplayCountry(), Collectors.counting())); 
  • 對value集合中的元素求和:
Map<String, Integer> cityToPopulationSum = Stream.of(cities) .collect(Collectors.groupingBy(City::getName, Collectors.summingInt(City::getPopulation))); 
  • 對value的某一個字段求最大值,注意value是Optional的:
Map<String, Optional<City>> cityToPopulationMax = Stream.of(cities) .collect(Collectors.groupingBy(City::getName, Collectors.maxBy(Comparator.comparing(City::getPopulation)))); 
  • 使用mapping對value的字段進行map處理:
Map<String, Optional<String>> stateToNameMax = Stream.of(cities) .collect(Collectors.groupingBy(City::getState, Collectors.mapping(City::getName, Collectors.maxBy (Comparator.comparing(String::length))))); Map<String, Set<String>> stateToNameSet = Stream.of(cities) .collect(Collectors.groupingBy(City::getState, Collectors.mapping(City::getName, Collectors.toSet()))); 
  • 通過 summarizingXXX 獲取統計結果:
Map<String, IntSummaryStatistics> stateToPopulationSummary = Stream.of(cities) .collect(Collectors.groupingBy(City::getState, Collectors.summarizingInt(City::getPopulation))); 
  • reducing() 可以對結果作更復雜的處理,但是 reducing() 卻並不常用:
Map<String, String> stateToNameJoining = Stream.of(cities) .collect(Collectors.groupingBy(City::getState, Collectors.reducing("", City::getName, (s, t) -> s.length() == 0 ? t : s + ", " + t))); 

比如上例可以通過mapping達到同樣的效果:

Map<String, String> stateToNameJoining2 = Stream.of(cities) .collect(Collectors.groupingBy(City::getState, Collectors.mapping(City::getName, Collectors.joining(", ") ))); 

7. Primitive Stream

Stream<Integer> 對應的Primitive Stream就是 IntStream ,類似的還有 DoubleStream 和 LongStream 。

  1. Primitive Stream的構造: of() , range() , rangeClosed() , Arrays.stream() :
IntStream intStream = IntStream.of(10, 20, 30); IntStream zeroToNintyNine = IntStream.range(0, 100); IntStream zeroToHundred = IntStream.rangeClosed(0, 100); double[] nums = {10.0, 20.0, 30.0}; DoubleStream doubleStream = Arrays.stream(nums, 0, 3); 
  1. Object Stream與Primitive Stream之間的相互轉換,通過 mapToXXX() 和 boxed() :
// map to Stream<String> cityStream = Stream.of("Beijing", "Tianjin", "Chengdu"); IntStream lengthStream = cityStream.mapToInt(String::length); // box Stream<Integer> oneToNine = IntStream.range(0, 10).boxed(); 
  1. 與Object Stream相比,Primitive Stream的特點:
  • toArray() 方法返回的是對應的Primitive類型:
int[] intArr = intStream.toArray(); 
  • 自帶統計類型的方法,如: max() , average() , summaryStatistics() :
OptionalInt maxNum = intStream.max(); IntSummaryStatistics intSummary = intStream.summaryStatistics(); 

8. Parallel Stream

  1. Stream支持並發操作,但需要滿足以下幾點:
  • 構造一個paralle stream,默認構造的stream是順序執行的,調用 paralle() 構造並行的stream:
IntStream scoreStream = IntStream.rangeClosed(10, 30).parallel(); 
  • 要執行的操作必須是可並行執行的,即並行執行的結果和順序執行的結果是一致的,而且必須保證stream中執行的操作是線程安全的:
int[] wordLength = new int[12]; Stream.of("It", "is", "your", "responsibility").parallel().forEach(s -> { if (s.length() < 12) wordLength[s.length()]++; }); 

這段程序的問題在於,多線程訪問共享數組 wordLength ,是非線程安全的。解決的思路有:1)構造AtomicInteger數組;2)使用 groupingBy() 根據length統計;

  1. 可以通過並行提高效率的常見場景
  • 使stream無序:對於 distinct() 和 limit() 等方法,如果不關心順序,則可以使用並行:
LongStream.rangeClosed(5, 10).unordered().parallel().limit(3); IntStream.of(14, 15, 15, 14, 12, 81).unordered().parallel().distinct(); 
  • 在 groupingBy() 的操作中,map的合並操作是比較重的,可以通過 groupingByConcurrent() 來並行處理,不過前提是parallel stream:
Stream.of(cities).parallel().collect(Collectors.groupingByConcurrent(City::getState)); 
  1. 在執行stream操作時不能修改stream對應的collection

stream本身是不存儲數據的,數據保存在對應的collection中,所以在執行stream操作的同時修改對應的collection,結果是未定義的:

// ok Stream<String> wordStream = wordList.stream(); wordList.add("number"); wordStream.distinct().count(); // ConcurrentModificationException Stream<String> wordStream = wordList.stream(); wordStream.forEach(s -> { if (s.length() >= 6) wordList.remove(s);}); 

9. Functional Interface

僅包含一個抽象方法的interface被成為 Functional Interface ,比如: Predicate , Function , Consumer 等。此時我們一般傳入一個lambda表達式或 Method Reference 。

常見的 Functional Interface 有:

Functional Interface     Parameter     Return Type     Description Types
Supplier<T> None T Supplies a value of type T Consumer<T> T void Consumes a value of type T BiConsumer<T, U> T,U void Consumes values of types T and U Predicate<T> T boolean A Boolean-valued function ToIntFunction<T> T int An int-, long-, or double-valued function ToLongFunction<T> T long ToDoubleFunction<T> T double IntFunction<R> int R A function with argument of type int, long, or double LongFunction<R> long DoubleFunction<R> double Function<T, R> T R A function with argument of type T BiFunction<T, U, R> T,U R A function with arguments of types T and U UnaryOperator<T> T T A unary operator on the type T BinaryOperator<T> T,T T A binary operator on the type T


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM