Stream是Java 8 提供的高效操作集合類(Collection)數據的API。
1. 從Iterator到Stream
有一個字符串的list,要統計其中長度大於7的字符串的數量,用迭代來實現:
List<String> wordList = Arrays.asList("regular", "expression", "specified", "as", "a", "string", "must"); int countByIterator = 0; for (String word: wordList) { if (word.length() > 7) { countByIterator++; } }
用Stream實現:
long countByStream= wordList.stream().filter(w -> w.length() > 7).count();
顯然,用stream實現更簡潔,不僅如此,stream很容易實現並發操作,比如:
long countByParallelStream = wordList.parallelStream().filter(w -> w.length() > 7).count();
stream遵循的原則是:告訴我做什么,不用管我怎么做。比如上例:告訴stream通過多線程統計字符串長度,至於以什么順序、在哪個線程中執行,由stream來負責;而在迭代實現中,由於計算的方式已確定,很難優化了。
Stream和Collection的區別主要有:
- stream本身並不存儲數據,數據是存儲在對應的collection里,或者在需要的時候才生成的;
- stream不會修改數據源,總是返回新的stream;
- stream的操作是懶執行(lazy)的:僅當最終的結果需要的時候才會執行,比如上面的例子中,結果僅需要前3個長度大於7的字符串,那么在找到前3個長度符合要求的字符串后,
filter()
將停止執行;
使用stream的步驟如下:
- 創建stream;
- 通過一個或多個中間操作(intermediate operations)將初始stream轉換為另一個stream;
- 通過中止操作(terminal operation)獲取結果;該操作觸發之前的懶操作的執行,中止操作后,該stream關閉,不能再使用了;
在上面的例子中, wordList.stream()
和 wordList.parallelStream()
是創建stream, filter()
是中間操作,過濾后生成一個新的stream, count()
是中止操作,獲取結果。
2. 創建Stream的方式
- 從array或list創建stream:
Stream<Integer> integerStream = Stream.of(10, 20, 30, 40); String[] cityArr = {"Beijing", "Shanghai", "Chengdu"}; Stream<String> cityStream = Stream.of(cityArr); Stream<String> nameStream = Arrays.asList("Daniel", "Peter", "Kevin").stream(); Stream<String> cityStream2 = Arrays.stream(cityArr, 0, 1); Stream<String> emptyStream = Stream.empty();
- 通過
generate
和iterate
創建無窮stream:
Stream<String> echos = Stream.generate(() -> "echo"); Stream<Integer> integers = Stream.iterate(0, num -> num + 1);
- 通過其它API創建stream:
Stream<String> lines = Files.lines(Paths.get("test.txt")) String content = "AXDBDGXC"; Stream<String> contentStream = Pattern.compile("[ABC]{1,3}").splitAsStream(content);
3. Stream轉換
filter()
用於過濾,即使原stream中滿足條件的元素構成新的stream:
List<String> langList = Arrays.asList("Java", "Python", "Swift", "HTML"); Stream<String> filterStream = langList.stream().filter(lang -> lang.equalsIgnoreCase("java"));
map()
用於映射,遍歷原stream中的元素,轉換后構成新的stream:
List<String> langList = Arrays.asList("Java", "Python", "Swift", "HTML"); Stream<String> mapStream = langList.stream().map(String::toUpperCase);
flatMap()
用於將[["ABC", "DEF"], ["FGH", "IJK"]]
的形式轉換為["ABC", "DEF", "FGH", "IJK"]
:
Stream<String> cityStream = Stream.of("Beijing", "Shanghai", "Shenzhen"); // [['B', 'e', 'i', 'j', 'i', 'n', 'g'], ['S', 'h', 'a', 'n', 'g', 'h', 'a', 'i'], ...] Stream<Stream<Character>> characterStream1 = cityStream.map(city -> characterStream(city)); Stream<String> cityStreamCopy = Stream.of("Beijing", "Shanghai", "Shenzhen"); // ['B', 'e', 'i', 'j', 'i', 'n', 'g', 'S', 'h', 'a', 'n', 'g', 'h', 'a', 'i', ...] Stream<Character> characterStreamCopy = cityStreamCopy.flatMap(city -> characterStream(city));
其中, characterStream()
返回有參數字符串的字符構成的Stream ;
limit()
表示限制stream中元素的數量,skip()
表示跳過stream中前幾個元素,concat
表示將多個stream連接起來,peek()
主要用於debug時查看stream中元素的值:
Stream<Integer> limitStream = Stream.of(18, 20, 12, 35, 89).sorted().limit(3); Stream<Integer> skipStream = Stream.of(18, 20, 12, 35, 89).sorted(Comparator.reverseOrder()).skip(1); Stream<Integer> concatStream = Stream.concat(Stream.of(1, 2, 3), Stream.of(4, 5, 6)); concatStream.peek(i -> System.out.println(i)).count();
peek()
是 intermediate operation ,所以后面需要一個 terminal operation ,如 count()
才能在輸出中看到結果;
- 有狀態的(stateful)轉換,即元素之間有依賴關系,如
distinct()
返回由唯一元素構成的stream,sorted()
返回排序后的stream:
Stream<String> distinctStream = Stream.of("Beijing", "Tianjin", "Beijing").distinct(); Stream<String> sortedStream = Stream.of("Beijing", "Shanghai", "Chengdu").sorted(Comparator.comparing (String::length).reversed());
4. Stream reduction
reduction
就是從stream中取出結果,是 terminal operation
,因此經過 reduction
后的stream不能再使用了。
4.1 Optional
Optional 表示或者有一個T類型的對象,或者沒有值;
- 創建Optional對象
直接通過Optional的類方法: of()
/ empty()
/ ofNullable()
:
Optional<Integer> intOpt = Optional.of(10); Optional<String> emptyOpt = Optional.empty(); Optional<Double> doubleOpt = Optional.ofNullable(5.5);
- 使用Optional對象
你當然可以這么使用:
if (intOpt.isPresent()) { intOpt.get(); }
但是,最好這么使用:
doubleOpt.orElse(0.0); doubleOpt.orElseGet(() -> 1.0); doubleOpt.orElseThrow(RuntimeException::new); List<Double> doubleList = new ArrayList<>(); doubleOpt.ifPresent(doubleList::add);
map()
方法與 ifPresent()
用法相同,就是多個返回值, flatMap()
用於Optional的鏈式表達:
Optional<Boolean> addOk = doubleOpt.map(doubleList::add); Optional.of(4.0).flatMap(num -> Optional.ofNullable(num * 100)).flatMap(num -> Optional.ofNullable(Math.sqrt (num)));
4.2 簡單的reduction
主要包含以下操作: findFirst()
/ findAny()
/ allMatch
/ anyMatch()
/ noneMatch
,比如:
Optional<String> firstWord = wordStream.filter(s -> s.startsWith("Y")).findFirst(); Optional<String> anyWord = wordStream.filter(s -> s.length() > 3).findAny(); wordStream.allMatch(s -> s.length() > 3); wordStream.anyMatch(s -> s.length() > 3); wordStream.noneMatch(s -> s.length() > 3);
4.3 reduce方法
reduce(accumulator)
:參數是一個執行雙目運算的Functional Interface
,假如這個參數表示的操作為op,stream中的元素為x, y, z, …,則reduce()
執行的就是x op y op z ...
,所以要求op這個操作具有結合性(associative),即滿足:(x op y) op z = x op (y op z)
,滿足這個要求的操作主要有:求和、求積、求最大值、求最小值、字符串連接、集合並集和交集等。另外,該函數的返回值是Optional的:
Optional<Integer> sum1 = numStream.reduce((x, y) -> x + y);
reduce(identity, accumulator)
:可以認為第一個參數為默認值,但需要滿足identity op x = x
,所以對於求和操作,identity
的值為0,對於求積操作,identity
的值為1。返回值類型是stream元素的類型:
Integer sum2 = numStream.reduce(0, Integer::sum);
5. collect結果
collect()
方法
reduce()
和 collect()
的區別是:
reduce()
的結果是一個值;collect()
可以對stream中的元素進行各種處理后,得到stream中元素的值;
Collectors
接口提供了很方便的創建 Collector
對象的工廠方法:
// collect to Collection Stream.of("You", "may", "assume").collect(Collectors.toList()); Stream.of("You", "may", "assume").collect(Collectors.toSet()); Stream.of("You", "may", "assume").collect(Collectors.toCollection(TreeSet::new)); // join element Stream.of("You", "may", "assume").collect(Collectors.joining()); Stream.of("You", "may", "assume").collect(Collectors.joining(", ")); // summarize element IntSummaryStatistics summary = Stream.of("You", "may", "assume").collect(Collectors.summarizingInt(String::length)); summary.getMax();
foreach()
方法
foreach()
用於遍歷stream中的元素,屬於terminal operation
;-
forEachOrdered()
是按照stream中元素的順序遍歷,也就無法利用並發的優勢;Stream.of(“You”, “may”, “assume”, “you”, “can”, “fly”).parallel().forEach(w -> System.out.println(w));Stream.of(“You”, “may”, “assume”, “you”, “can”, “fly”).forEachOrdered(w -> System.out.println(w));
toArray()
方法
得到由stream中的元素得到的數組,默認是Object[],可以通過參數設置需要結果的類型:
Object[] words1 = Stream.of("You", "may", "assume").toArray(); String[] words2 = Stream.of("You", "may", "assume").toArray(String[]::new);
toMap()
方法
toMap
: 將stream中的元素映射為 的形式,兩個參數分別用於生成對應的key和value的值。比如有一個字符串stream,將首字母作為key,字符串值作為value,得到一個map:
Stream<String> introStream = Stream.of("Get started with UICollectionView and the photo library".split(" ")); Map<String, String> introMap = introStream.collect(Collectors.toMap(s -> s.substring(0, 1), s -> s));
如果一個key對應多個value,則會拋出異常,需要使用第三個參數設置如何處理沖突,比如僅使用原來的value、使用新的value,或者合並:
Stream<String> introStream = Stream.of("Get started with UICollectionView and the photo library".split(" ")); Map<Integer, String> introMap2 = introStream.collect(Collectors.toMap(s -> s.length(), s -> s, (existingValue, newValue) -> existingValue));
如果value是一個集合,即將key對應的所有value放到一個集合中,則需要使用第三個參數,將多個value合並:
Stream<String> introStream3 = Stream.of("Get started with UICollectionView and the photo library".split(" ")); Map<Integer, Set<String>> introMap3 = introStream3.collect(Collectors.toMap(s -> s.length(), s -> Collections.singleton(s), (existingValue, newValue) -> { HashSet<String> set = new HashSet<>(existingValue); set.addAll(newValue); return set; })); introMap3.forEach((k, v) -> System.out.println(k + ": " + v));
如果value是對象自身,則使用 Function.identity()
,如:
Map<Integer, Person> idToPerson = people.collect(Collectors.toMap(Person::getId, Function.identity()));
toMap()
默認返回的是HashMap,如果需要其它類型的map,比如TreeMap,則可以在第四個參數指定構造方法:
Map<Integer, String> introMap2 = introStream.collect(Collectors.toMap(s -> s.length(), s -> s, (existingValue, newValue) -> existingValue, TreeMap::new));
6. Grouping和Partitioning
groupingBy()
表示根據某一個字段或條件進行分組,返回一個Map,其中key為分組的字段或條件,value默認為list,groupingByConcurrent()
是其並發版本:
Map<String, List<Locale>> countryToLocaleList = Stream.of(Locale.getAvailableLocales()) .collect(Collectors.groupingBy(l -> l.getDisplayCountry()));
- 如果
groupingBy()
分組的依據是一個bool條件,則key的值為true/false,此時與partitioningBy()
等價,且partitioningBy()
的效率更高:
// predicate Map<Boolean, List<Locale>> englishAndOtherLocales = Stream.of(Locale.getAvailableLocales()) .collect(Collectors.groupingBy(l -> l.getDisplayLanguage().equalsIgnoreCase("English"))); // partitioningBy Map<Boolean, List<Locale>> englishAndOtherLocales2 = Stream.of(Locale.getAvailableLocales()) .collect(Collectors.partitioningBy(l -> l.getDisplayLanguage().equalsIgnoreCase("English")));
groupingBy()
提供第二個參數,表示downstream
,即對分組后的value作進一步的處理
- 返回set,而不是list:
Map<String, Set<Locale>> countryToLocaleSet = Stream.of(Locale.getAvailableLocales()).collect(Collectors.groupingBy(l -> l.getDisplayCountry(), Collectors.toSet()));
- 返回value集合中元素的數量:
Map<String, Long> countryToLocaleCounts = Stream.of(Locale.getAvailableLocales()) .collect(Collectors.groupingBy(l -> l.getDisplayCountry(), Collectors.counting()));
- 對value集合中的元素求和:
Map<String, Integer> cityToPopulationSum = Stream.of(cities) .collect(Collectors.groupingBy(City::getName, Collectors.summingInt(City::getPopulation)));
- 對value的某一個字段求最大值,注意value是Optional的:
Map<String, Optional<City>> cityToPopulationMax = Stream.of(cities) .collect(Collectors.groupingBy(City::getName, Collectors.maxBy(Comparator.comparing(City::getPopulation))));
- 使用mapping對value的字段進行map處理:
Map<String, Optional<String>> stateToNameMax = Stream.of(cities) .collect(Collectors.groupingBy(City::getState, Collectors.mapping(City::getName, Collectors.maxBy (Comparator.comparing(String::length))))); Map<String, Set<String>> stateToNameSet = Stream.of(cities) .collect(Collectors.groupingBy(City::getState, Collectors.mapping(City::getName, Collectors.toSet())));
- 通過
summarizingXXX
獲取統計結果:
Map<String, IntSummaryStatistics> stateToPopulationSummary = Stream.of(cities) .collect(Collectors.groupingBy(City::getState, Collectors.summarizingInt(City::getPopulation)));
reducing()
可以對結果作更復雜的處理,但是reducing()
卻並不常用:
Map<String, String> stateToNameJoining = Stream.of(cities) .collect(Collectors.groupingBy(City::getState, Collectors.reducing("", City::getName, (s, t) -> s.length() == 0 ? t : s + ", " + t)));
比如上例可以通過mapping達到同樣的效果:
Map<String, String> stateToNameJoining2 = Stream.of(cities) .collect(Collectors.groupingBy(City::getState, Collectors.mapping(City::getName, Collectors.joining(", ") )));
7. Primitive Stream
Stream<Integer>
對應的Primitive Stream就是 IntStream
,類似的還有 DoubleStream
和 LongStream
。
- Primitive Stream的構造:
of()
,range()
,rangeClosed()
,Arrays.stream()
:
IntStream intStream = IntStream.of(10, 20, 30); IntStream zeroToNintyNine = IntStream.range(0, 100); IntStream zeroToHundred = IntStream.rangeClosed(0, 100); double[] nums = {10.0, 20.0, 30.0}; DoubleStream doubleStream = Arrays.stream(nums, 0, 3);
- Object Stream與Primitive Stream之間的相互轉換,通過
mapToXXX()
和boxed()
:
// map to Stream<String> cityStream = Stream.of("Beijing", "Tianjin", "Chengdu"); IntStream lengthStream = cityStream.mapToInt(String::length); // box Stream<Integer> oneToNine = IntStream.range(0, 10).boxed();
- 與Object Stream相比,Primitive Stream的特點:
toArray()
方法返回的是對應的Primitive類型:
int[] intArr = intStream.toArray();
- 自帶統計類型的方法,如:
max()
,average()
,summaryStatistics()
:
OptionalInt maxNum = intStream.max(); IntSummaryStatistics intSummary = intStream.summaryStatistics();
8. Parallel Stream
- Stream支持並發操作,但需要滿足以下幾點:
- 構造一個paralle stream,默認構造的stream是順序執行的,調用
paralle()
構造並行的stream:
IntStream scoreStream = IntStream.rangeClosed(10, 30).parallel();
- 要執行的操作必須是可並行執行的,即並行執行的結果和順序執行的結果是一致的,而且必須保證stream中執行的操作是線程安全的:
int[] wordLength = new int[12]; Stream.of("It", "is", "your", "responsibility").parallel().forEach(s -> { if (s.length() < 12) wordLength[s.length()]++; });
這段程序的問題在於,多線程訪問共享數組 wordLength
,是非線程安全的。解決的思路有:1)構造AtomicInteger數組;2)使用 groupingBy()
根據length統計;
- 可以通過並行提高效率的常見場景
- 使stream無序:對於
distinct()
和limit()
等方法,如果不關心順序,則可以使用並行:
LongStream.rangeClosed(5, 10).unordered().parallel().limit(3); IntStream.of(14, 15, 15, 14, 12, 81).unordered().parallel().distinct();
- 在
groupingBy()
的操作中,map的合並操作是比較重的,可以通過groupingByConcurrent()
來並行處理,不過前提是parallel stream:
Stream.of(cities).parallel().collect(Collectors.groupingByConcurrent(City::getState));
- 在執行stream操作時不能修改stream對應的collection
stream本身是不存儲數據的,數據保存在對應的collection中,所以在執行stream操作的同時修改對應的collection,結果是未定義的:
// ok Stream<String> wordStream = wordList.stream(); wordList.add("number"); wordStream.distinct().count(); // ConcurrentModificationException Stream<String> wordStream = wordList.stream(); wordStream.forEach(s -> { if (s.length() >= 6) wordList.remove(s);});
9. Functional Interface
僅包含一個抽象方法的interface被成為 Functional Interface
,比如: Predicate
, Function
, Consumer
等。此時我們一般傳入一個lambda表達式或 Method Reference
。
常見的 Functional Interface
有:
Functional Interface Parameter Return Type Description Types
Supplier<T> None T Supplies a value of type T Consumer<T> T void Consumes a value of type T BiConsumer<T, U> T,U void Consumes values of types T and U Predicate<T> T boolean A Boolean-valued function ToIntFunction<T> T int An int-, long-, or double-valued function ToLongFunction<T> T long ToDoubleFunction<T> T double IntFunction<R> int R A function with argument of type int, long, or double LongFunction<R> long DoubleFunction<R> double Function<T, R> T R A function with argument of type T BiFunction<T, U, R> T,U R A function with arguments of types T and U UnaryOperator<T> T T A unary operator on the type T BinaryOperator<T> T,T T A binary operator on the type T