概述
Stream流是Java8新引入的一個特性, 它允許你以聲明性方式處理數據集合, 而不是像以前的指令式編程那樣需要編寫具體怎么實現。
比如炒菜, 用指令式編程需要編寫具體的實現
配菜();
熱鍋();
放油();
翻炒();
放調料();
出鍋();
而如果是Stream流這種聲明式方式, 只需要一步操作 炒菜(); 就可以完成上面的炒菜功能。它關注的是我要做什么, 而不是我要怎么做。
與Collection集合使用外部迭代不同, Stream 流使用內部迭代, 它幫你把迭代做了, 還把得到的流值存在了某個地方, 你只要給出一個函數說要做什么就可以了。
同一個流只能被消費一次, 下面這段代碼運行會拋異常 java.lang.IllegalStateException
@Test public void test16() { List<Dish> menu = Arrays.asList( new Dish("pork", false, 800, Dish.Type.MEAT), new Dish("beef", false, 700, Dish.Type.MEAT)); Stream<Dish> stream = menu.stream(); stream.forEach(System.out::println); stream.forEach(System.out::println); //java.lang.IllegalStateException: stream has already been operated upon or closed }
諸如filter、map、limit、sorted、distinct等中間操作會返回另一個流, 多個中間操作連接起來就形成了一條流水線。除非流水線上觸發一個終端操作, 如forEach、count、collect, 否則中間操作不會執行任何處理。
因為Stream流的一個特性就是惰性求值, 只有在觸發了終端操作時, 才會把前面所有的中間操作合並起來一次性全部處理完。
Stream API
在正式介紹Stream API之前, 先引入一些實體類和幾組數據集合, 后面的代碼示例會經常用到它們。
這里插入一個小技巧,使用IDEA插件 lombok 可以讓你不用重復的編寫實體類的Getter/Setter、構造方法等等,你只需要在實體類上添加一個 @Data 注解即可,lombok插件會在編譯期間自動幫你生成Getter/Setter方法、toString方法。

@Data public class Dish { private String name; private boolean vegetarian; private int calories; private Type type; public Dish(String name, boolean vegetarian, int calories, Type type) { this.name = name; this.vegetarian = vegetarian; this.calories = calories; this.type = type; } public enum Type { MEAT, FISH, OTHER } public enum CaloricLevel { DIET, NORMAL, FAT } @Override public String toString() { return name; } } @Data public class Transaction { private Trader trader; private int year; private int value; private String currency; public Transaction(Trader trader, int year, int value) { this.trader = trader; this.year = year; this.value = value; } } @Data public class Trader { private String name; private String city; public Trader(String name, String city) { this.name = name; this.city = city; } }

static List<Dish> menu; static List<Integer> nums; static List<Transaction> transactions; static { menu = Arrays.asList( new Dish("pork", false, 800, Dish.Type.MEAT), new Dish("beef", false, 700, Dish.Type.MEAT), new Dish("chicken", false, 400, Dish.Type.MEAT), new Dish("french fries", true, 530, Dish.Type.OTHER), new Dish("rice", true, 350, Dish.Type.OTHER), new Dish("season fruit", true, 120, Dish.Type.OTHER), new Dish("pizza", true, 550, Dish.Type.OTHER), new Dish("prawns", false, 300, Dish.Type.FISH), new Dish("salmon", false, 450, Dish.Type.FISH)); nums = Arrays.asList(1, 3, 5, 7, 9, 11, 13); Trader raoul = new Trader("Raoul", "Cambridge"); Trader mario = new Trader("Mario", "Milan"); Trader alan = new Trader("Alan", "Cambridge"); Trader brian = new Trader("Brian", "Cambridge"); transactions = Arrays.asList( new Transaction(brian, 2011, 300), new Transaction(raoul, 2012, 1000), new Transaction(raoul, 2011, 400), new Transaction(mario, 2012, 710), new Transaction(mario, 2012, 700), new Transaction(alan, 2012, 950) ); }
map()方法用於將流中的元素映射成一個新的元素。
@Test public void test17() { //獲取每道菜的名稱的長度 List<Integer> list = menu.stream() .map(Dish::getName) .map(String::length) .collect(Collectors.toList()); }
flatMap()方法會把一個流中的每個值轉換成另一個流,然后把所有的流扁平化,連接起來形成一個新的流。
@Test public void test18() { List<String> words = Arrays.asList("hello", "world"); List<String> list = words.stream() .map(i -> i.split("")) .flatMap(Arrays::stream)//流扁平化,形成一個新的流 .distinct()//過濾重復的元素 .collect(Collectors.toList()); System.out.println(list);//result: [h, e, l, o, w, r, d] }
findFirst()用於返回流中的第一個元素,findAny() 返回流中任意一個元素。因為流可能是空的,所以findFirst()和findAny()的返回類型都是Optional<T>, 當流沒有元素時,就返回一個空的Optional。
對於findFirst()和findAny(),如果不關心返回的元素是哪個,使用findAny()在並行流時限制更少。
@Test public void test19() { menu.stream() .filter(Dish::isVegetarian) .findAny() .ifPresent(i -> System.out.println(i.getName()));//會在Optional包含值的時候執行給定的代碼塊 }
你可以用 allMatch() 、noneMatch()和anyMatch()方法讓流匹配給定的謂詞Predicate<T>, 方法名就可見名知意, 分別對應 所有元素都要匹配、所有元素都不匹配、任意一個元素匹配。
通過reduce()方法可以對流進行歸約操作。
所謂規約操作就是將流中所有元素反復結合起來, 最終得到一個值.
@Test public void test20() { Integer sum1 = nums.stream().reduce(0, Integer::sum); System.out.println(sum1); Optional<Integer> o1 = nums.stream().reduce(Integer::sum);//求和 System.out.println(o1.get()); Optional<Integer> o2 = nums.stream().reduce(Integer::max);//最大值 System.out.println(o2.get()); Integer count = menu.stream().map(d -> 1).reduce(0, Integer::sum);//計算流中元素的個數 menu.stream().count(); }
下面通過一段對交易員數據集合transactions進行處理的示例, 總結下常用的幾種Stream API。
@Test public void test21() { //(1) 找出2011年發生的所有交易,並按交易額排序(從低到高)。 List<Transaction> list = transactions.stream().filter(i -> 2011 == i.getYear()).sorted(Comparator.comparing(Transaction::getValue)).collect(Collectors.toList()); //(2) 交易員都在哪些不同的城市工作過? Set<String> cities = transactions.stream().map(Transaction::getTrader).map(Trader::getCity).collect(Collectors.toSet()); //(3) 查找所有來自於劍橋的交易員,並按姓名排序。 List<Trader> trades = transactions.stream().map(Transaction::getTrader).filter(i -> "Cambridge".equals(i.getCity())).distinct().sorted(Comparator.comparing(Trader::getName)).collect(Collectors.toList()); //(4) 返回所有交易員的姓名字符串,按字母順序排序。 String names = transactions.stream().map(Transaction::getTrader).distinct().map(Trader::getName).sorted().reduce("", (a, b) -> a + b); //(5) 有沒有交易員是在米蘭工作的? boolean flag = transactions.stream().map(Transaction::getTrader).anyMatch(trader -> "Milan".equals(trader.getCity())); //(6) 打印生活在劍橋的交易員的所有交易的總額。 Integer sum = transactions.stream().filter(i -> "Cambridge".equals(i.getTrader().getCity())).map(Transaction::getValue).reduce(0, Integer::sum); //(7) 所有交易中,最高的交易額是多少? Integer max = transactions.stream().map(Transaction::getValue).reduce(0, Integer::max); //(8) 找到交易額最小的交易。 Optional<Transaction> first = transactions.stream().min(Comparator.comparingInt(Transaction::getValue)); System.out.println(first.get()); }
原始類型流特化: IntStream, LongStream, DoubleStream的簡單使用以及和Stream流之間的相互轉換。
@Test public void test22() { int calories = menu.stream().mapToInt(Dish::getCalories).sum(); //映射到數值流 mapToXxx IntStream intStream = menu.stream().mapToInt(Dish::getCalories); //轉換回基本類型對應的對象流 Stream<Integer> stream = intStream.boxed(); //intStream.mapToObj(Integer::valueOf); //默認值OptionalInt List<Dish> list = new ArrayList<>(); OptionalInt optionalInt = list.stream().mapToInt(Dish::getCalories).max(); System.out.println(optionalInt.orElse(88)); //result: 88 // 數值范圍 long count = IntStream.rangeClosed(1, 102).filter(i -> i % 3 == 0).count(); System.out.println(count);//result: 34 }
構建流的幾種方式
由集合創建流, 根據數值范圍創建數值流, 由值創建流, 由數組創建流, 由文件生成流, 由函數生成無限流。
@Test public void test24() { IntStream.rangeClosed(1, 100);//根據數值范圍創建數值流 Stream<String> stream = Stream.of("java8", "蓋聶", "少司命");//由值創建流 int sum = Arrays.stream(new int[]{1, 2, 3, 4}).sum();//由數組創建流 //由文件生成流 ===>下面示例Files.lines得到一個流,流中的每個元素對應文件中的一行 try (Stream<String> lines = Files.lines(Paths.get("1.txt"), Charset.defaultCharset())) { long count = lines.flatMap(line -> Arrays.stream(line.split(" "))) .distinct() .count(); } catch (IOException ex) { } //由函數生成流: 創建無限流 Stream.iterate(0, n -> n + 1) .limit(10) .forEach(System.out::println); Stream.iterate(new int[]{0, 1}, arr -> new int[]{arr[1], arr[0] + arr[1]}) //創建一個斐波納契元祖序列 .limit(10) .forEach(arr -> System.out.println("(" + arr[0] + ", " + arr[1] + ")")); Stream.generate(Math::random) .limit(5) .forEach(System.out::println); }
Collectors類中提供了一些靜態工廠方法, 用於流的歸約和匯總操作。
常見的有counting() 計算流中元素的個數,maxBy()和minBy() 取出流中某個屬性值最大或最小的元素,joining() 將對流中每一個對象應用 toString() 方法得到的所有字符串連接成一個字符串,reducing() 對流中的元素進行歸約操作等等。
下面是簡單的示例, 類中已經導入了Collectors類中的所有靜態方法。
@Test public void test1() { Long count = menu.stream().collect(counting());//菜單里有多少種菜 Optional<Dish> optionalDish = menu.stream().collect(maxBy(comparingInt(Dish::getCalories)));//菜單里熱量最高的菜 Integer totalCalories1 = menu.stream().collect(summingInt(Dish::getCalories));//菜單列表的總熱量 Double averageCalories = menu.stream().collect(averagingInt(Dish::getCalories));//菜單列表的熱量平均值 IntSummaryStatistics intSummaryStatistics = menu.stream().collect(summarizingInt(Dish::getCalories));//一次迭代,統計出菜單列表元素個數, 菜餚熱量最大值、最小值、平均值、總和 System.out.println(intSummaryStatistics.toString()); //result: IntSummaryStatistics{count=9, sum=4200, min=120, average=466.666667, max=800} String names = menu.stream().map(Dish::getName).collect(joining(","));//連接字符串 Integer totalCalories2 = menu.stream().collect(reducing(0, Dish::getCalories, Integer::sum));//菜單列表的總熱量 }
流的分組和分區操作 groupingBy(), partitioningBy()
所謂分組,就是將流中的元素按某個屬性根據一定的規則分為不同的小塊。比如常見的考試評定班級學生成績情況,分數<60 為不及格,60<=分數<80為良好,80<=分數為優秀,這個就是分組。
分區則比較特殊,它是根據一個謂詞Predicate<T>作為分類函數,也就是分出來的只會有兩種類型,對應的Map鍵就是布爾類型。
@Test public void test2() { //單級分組 Map<Type, List<Dish>> map1 = menu.stream().collect(groupingBy(Dish::getType)); //多級分組 result: {FISH={NORMAL=[salmon], DIET=[prawns]}, OTHER={NORMAL=[french fries, pizza], DIET=[rice, season fruit]}, MEAT={NORMAL=[chicken], FAT=[pork, beef]}} Map<Type, Map<CaloricLevel, List<Dish>>> map2 = menu.stream().collect(groupingBy(Dish::getType, groupingBy(dish -> { if (dish.getCalories() < 400) return DIET; else if (dish.getCalories() < 700) return NORMAL; else return FAT; }))); //菜單中每種類型的菜餚的數量 Map<Type, Long> map3 = menu.stream().collect(groupingBy(Dish::getType, counting()));//result: {FISH=2, OTHER=4, MEAT=3} //菜單中每種類型熱量最高的菜餚 Map<Type, Optional<Dish>> map4 = menu.stream().collect(groupingBy(Dish::getType, maxBy(comparingInt(Dish::getCalories))));//result:{FISH=Optional[salmon], OTHER=Optional[pizza], MEAT=Optional[pork]} //上面分組操作后的Optional<Dish>是一定有值的,所以這個Optional包裝沒什么意義,可以通過collectingAndThen()方法把Dish直接提取出來 Map<Type, Dish> map5 = menu.stream().collect(groupingBy(Dish::getType, collectingAndThen(maxBy(comparingInt(Dish::getCalories)), Optional::get)));//result:{FISH=Optional[salmon], OTHER=Optional[pizza], MEAT=Optional[pork]} //根據菜餚類型分組,獲取所有的菜餚名稱 result: {MEAT=[chicken, beef, pork], OTHER=[season fruit, pizza, rice, french fries], FISH=[salmon, prawns]} LinkedHashMap<Type, Set<String>> map6 = menu.stream().collect(groupingBy(Dish::getType, LinkedHashMap::new, mapping(Dish::getName, toSet()))); //在上面的例子中, toSet()方法生成的收集器我們是無法指定Set類型的, 可以使用toCollection()工廠方法來指定集合類型, 比如LInkedHashSet LinkedHashMap<Type, LinkedHashSet<String>> menu7 = menu.stream().collect(groupingBy(Dish::getType, LinkedHashMap::new, mapping(Dish::getName, toCollection(LinkedHashSet::new)))); //按菜餚是否素食進行分區 result: {false=[chicken, salmon, prawns, beef, pork], true=[rice, french fries, pizza, season fruit]} Map<Boolean, HashSet<Dish>> map9 = menu.stream().collect(partitioningBy(Dish::isVegetarian, toCollection(HashSet::new))); //獲取素食和非素食中熱量最高的菜餚 result: {false=pork, true=pizza} Map<Boolean, Dish> map10 = menu.stream().collect(partitioningBy(Dish::isVegetarian, collectingAndThen(maxBy(comparingInt(Dish::getCalories)), Optional::get))); //將前20個自然數按質數和非質數分區 Map<Boolean, List<Integer>> map11 = IntStream.rangeClosed(2, 20).boxed().collect(partitioningBy(this::isPrime)); } private boolean isPrime(int candidate) { int sqrt = (int) Math.sqrt(candidate); return IntStream.rangeClosed(2, sqrt).noneMatch(i -> candidate % i == 0); }
自定義收集器的兩種方式
- 實現Collector接口
- 使用Stream類的重載方法collect(),這種方式只有
IDENTITY_FINISH
特征(即對結果容器做最終類型轉換的finisher()方法返回的是一個恆等函數)的收集器才能使用。

@Test public void test3() { //粗糙的自定義收集器 List<Dish> list = menu.stream().collect(new ToListCollector<Dish>()); //對於IDENTITY_FINISH這種最終函數是恆等函數的收集操作,可以用Stream中的重載方法collect()實現同樣的效果 HashSet<Object> hashset = menu.stream().collect(HashSet::new, HashSet::add, HashSet::addAll); } public class ToListCollector<T> implements Collector<T, List<T>, List<T>> { /** * 創建一個空的結果容器,供數據收集使用 */ @Override public Supplier<List<T>> supplier() { return ArrayList::new; } /** * 將元素添加到結果容器 */ @Override public BiConsumer<List<T>, T> accumulator() { return List::add; } /** * 此方法定義了在使用並行流時,從各個子流進行歸約所得的結果容器要如何合並在一起 */ @Override public BinaryOperator<List<T>> combiner() { return (left, right) -> { left.addAll(right); return left; }; } /** * 對結果容器做最終類型轉換 */ @Override public Function<List<T>, List<T>> finisher() { return Function.identity(); } /** * 定義收集器的一些行為特征,比如無序歸約、並行歸約、最終類型轉換finisher()返回的函數是一個恆等函數 */ @Override public Set<Characteristics> characteristics() { return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT)); } }
調用流的 sequential() 或 parallel() 方法可以指定流順序/並行執行,其底層原理就是改變一個記錄是否並行執行的標志的布爾變量的值來實現的。
並行流內部使用了默認的 ForkJoinPool 分支/合並框架,它默認的線程數就是當前機器的處理器數量,這個值是由 Runtime.getRuntime().availableProcessors() 得到的,可以通過下面的方式改變線程池的大小,但不建議,因為一旦線程數超過了處理器的數量,就可能會引發並發訪問的共享資源競爭問題。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "128");//全局設置
下面這段代碼對原始迭代、並行流、順序流的幾種方式進行了測試,它們使用不同的實現方式對 1~10000000 之間的自然數求和,你會看到,在某些場景下如果不恰當的使用了並行流,反而會大大降低性能,比如Stream類的iterate()方法生成的流使用並行反而會增加額外開銷。
因為每次應用iterate()方法時都要依賴前一次應用的結果,因此無法有效的把流划分為多個小塊來並行處理,這里把流標記成並行,實則給原本的順序處理增加了額外的開銷

@Test public void test1() { long sec1 = this.measureSumPerf(ParallelStream::iterativeSum, 1000_0000); System.out.println(sec1);//4毫秒 long sec2 = this.measureSumPerf(ParallelStream::sequentialSum, 1000_0000); System.out.println(sec2);//16毫秒 //每次應用iterate()方法時都要依賴前一次應用的結果,因此無法有效的把流划分為多個小塊來並行處理,這里把流標記成並行,實則給原本的順序處理增加了額外的開銷 long sec3 = this.measureSumPerf(ParallelStream::parallelSum, 1000_0000); System.out.println(sec3);//241毫秒 } public long measureSumPerf(Function<Long, Long> adder, long n) { long fastest = Long.MAX_VALUE; for (int i = 0; i < 10; i++) { long start = System.nanoTime(); long sum = adder.apply(n); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Result: " + sum); if (duration < fastest) fastest = duration; } return fastest; } public class ParallelStream { public static long sequentialSum(long n) { return LongStream.iterate(1, i -> i + 1) .limit(n) .sum(); // return LongStream.rangeClosed(1, n).reduce(0, Long::sum);//4毫秒 } public static long iterativeSum(long n) { long sum = 0; for (long i = 1; i < n + 1; i++) { sum += i; } return sum; } public static long parallelSum(long n) { return LongStream.iterate(1, i -> i + 1) .limit(n) .parallel() .sum(); // return LongStream.rangeClosed(1, n).parallel().reduce(0, Long::sum);//2毫秒 } }
同理, 類似limit和findFirst這種依賴於元素順序的操作,在並行流上的性能一般會比順序流差。
如何調試Stream流
通過Stream類提供的peek()方法可以查看Stream流水線每一步中間操作的輸出結果
@Test public void test9() { List<Integer> numbers = Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9); List<Integer> result = numbers.stream() .peek(x -> System.out.println("from stream: " + x)) .map(x -> x + 17) .peek(x -> System.out.println("after map: " + x)) .filter(x -> x % 2 == 0) .peek(x -> System.out.println("after filter: " + x)) .limit(3) .peek(x -> System.out.println("after limit: " + x + "\n")) .collect(toList()); }
輸出結果如下
from stream: 2 after map: 19 from stream: 3 after map: 20 after filter: 20 after limit: 20 from stream: 4 after map: 21 from stream: 5 after map: 22 after filter: 22 after limit: 22 from stream: 6 after map: 23 from stream: 7 after map: 24 after filter: 24 after limit: 24
參考資料
作者:張小凡
出處:https://www.cnblogs.com/qingshanli/
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。如果覺得還有幫助的話,可以點一下右下角的【推薦】。