Java 8 Stream 教程


Java 8 Stream Tutorial#


本文采用實例驅動的方式,對JAVA8的stream API進行一個深入的介紹。雖然JAVA8中的stream API與JAVA I/O中的InputStream和OutputStream在名字上比較類似,但是其實是另外一個東西,Stream API是JAVA函數式編程中的一個重要組成部分。

本文描述如何使用JAVA8的Stream API。通過本文,你可以了解Stream API的執行順序,不同的執行順序會對stream api的執行效率有較大的影響。本文會詳細描述Stream API中的reducecollectflatMap等操作,結尾部分會深入講解parallel streams

如果你對JAVA8中新增的概念:lambda表達式,函數式接口,方法引用不熟悉。可以從:Java 8 Tutorial一文中獲取相關的知識。


Streams如何工作?###

stream是一個可以對個序列中的元素執行各種計算操作的一個元素序列。

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
    .stream()
    .filter(s -> s.startsWith("c"))
    .map(String::toUpperCase)
    .sorted()
    .forEach(System.out::println);

// C1
// C2

stream包含中間(intermediate operations)和最終(terminal operation)兩種形式的操作。中間操作(intermediate operations)的返回值還是一個stream,因此可以通過鏈式調用將中間操作(intermediate operations)串聯起來。最終操作(terminal operation)只能返回void或者一個非stream的結果。在上述例子中:filter, map ,sorted是中間操作,而forEach是一個最終操作。更多關於stream的中可用的操作可以查看java doc。上面例子中的鏈式調用也被稱為操作管道流。

大多stream操作接受某種形式的lambda表達式作為參數,通過方法接口的形式指定操作的具體行為,這些方法接口的行為基本上都是無干擾(non-interfering)和無狀態(stateless)。無干擾(non-interfering)的方法的定義是:該方法不修改stream的底層數據源,比如上述例子中:沒有lambda表達式添加或者刪除myList中的元素。無狀態(stateless)方法的定義:操作的執行是獨立的,比如上述例子中,沒有lambda表達式在執行中依賴可能發生變化的外部變量或狀態。


streams分類###

可以從不同的數據源創建stream。java collection包中的Collections,Lists,Sets這些類中新增stream()和parallelStream()方法,通過這些方法可以創建一個順序stream(sequential streams)或者一個並發的stream(Parallel streams)。並發stream(Parallel streams)更適合在多線程中使用,本文先介紹順序流(sequential streams)在結尾會描述並發stream(Parallel streams),

Arrays.asList("a1", "a2", "a3")
    .stream()
    .findFirst()
    .ifPresent(System.out::println);  // a1

List對象上調用stream()方法可以返回一個常規的對象流。在下面的例子中我們不需要創建一個collection對象也可以使用stream:

Stream.of("a1", "a2", "a3")
    .findFirst()
    .ifPresent(System.out::println);  // a1

直接使用Stream.of()方法就能從一組對象創建一個stream對象,

除了常規的對象流,JAVA 8中的IntStream,LongStream,DoubleStream這些流能夠處理基本數據類型如:int,long,double。比如:IntStream可以使用range()方法能夠替換掉傳統的for循環

IntStream.range(1, 4)
    .forEach(System.out::println);

// 1
// 2
// 3

基本類型流(primitive streams)使用方式與常規對象流類型(regular object streams)大部分相同,但是基本類型流(primitive streams)能使用一些特殊的lambda表達式,比如:用IntFunction代替Function,用IntPredicate代替Predicate,同時基本類型流(primitive streams)中可以支持一些聚合方法,如:sum(),average()等。

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1)
    .average()
    .ifPresent(System.out::println);  // 5.0

可以通過常規對象流(regular object stream)的mapToInt(), mapToLong(),mapToDouble(),基本類型對象流(primitive streams)中的mapToObj()等方法完成常規對象流和基本類型流之間的相互轉換

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

下面這個例子中doubles stream先被映射成int stream,然后又被映射成String類型的對象流:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3

處理順序###

前面描述了如何創建和使用各種stream,現在開始深入了解stream執行引擎的工作原理。

Laziness(延遲加載)是中間操作(intermediate operations)的一個重要特性。如下面這個例子:中間操作(terminal operation)缺失,當執行這個代碼片段的時候,並不會在控制台打印相應的內容,這是因為只有最終操作(terminal operation)存在的時候,中間操作(intermediate operations)才會執行。

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    });

給上面的例子添加最終操作(terminal operation)forEach:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));

執行結果如下:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c

執行結果比較讓人驚奇,想當然的做法是水平執行此流上的所有元素。但是實際上是每一個元素沿着鏈垂直移動,第一個字符串"d2"執行完filterforEach后第二個元素"a2"才開始執行。

這種沿着鏈垂直移動的行為可以降低每一個元素上進行操作的數量,如我們在下面的例子中所示:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .anyMatch(s -> {最終操作
        System.out.println("anyMatch: " + s);
        return s.startsWith("A");
    });

// map:      d2
// anyMatch: D2
// map:      a2
// anyMatch: A2

當對給定元素執行判斷為真時anyMatch操作會立刻返回true,在上面例子中執行到元素“A2”的時候,元素判斷為真anyMatch立刻返回true,由於流是沿着鏈垂直移動的,因此上面的map操作只會執行兩次。

注:stream的執行流程類似shell中管道:ps xxx | grep "sss" | grep "ccc",是按照輸入行的形式進行處理。


執行效率與steream執行鏈順序的關系###

下面的例子由兩個中間操作(intermediate operations)map和filter以及一個最終操作(terminal operation)forEach構成,我們觀察這些動作是如何執行的。

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A");
    })
    .forEach(s -> System.out.println("forEach: " + s));

// map:     d2
// filter:  D2
// map:     a2
// filter:  A2
// forEach: A2
// map:     b1
// filter:  B1
// map:     b3
// filter:  B3
// map:     c
// filter:  C

你可能已經猜想到:mapfilter操作被執行了5次,但是forEach操作只被執行了1次。我們可以通過修改操作的執行順序(如:將filter操作移到操作鏈的頭部),大幅度降低執行次數

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// map:     a2
// forEach: A2
// filter:  b1中間操作
// filter:  b3
// filter:  c

修改后map只被執行了1次,如果此時數據量比較大則操作管道的執行效率會有較大的提升,在處理復雜方法鏈的時候需要注意執行順序對執行效率的影響。

給上面的例子添加sort操作。

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

執行結果如下:

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2

Sorting 是一種特殊的中間操作(intermediate operation),在對集合中元素進行排序過程中需要保存元素的狀態,因此Sorting 是一種有狀態的操作(stateful operation)。

首先,在整個輸入集上執行排序操作(即先對集合進行水平操作),由於輸入集合中的元素間存在多種組合,因此上面的例子中sorted操作被執行了8次。

可以通過對執行鏈重排序的方式,提升stream的執行效率。修改執行鏈順序之后由於filter操作的過濾,導致sorted操作的輸入集只有一個元素,在大數據量的情況下能夠大幅度提高執行效率。

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// filter:  b1
// filter:  b3
// filter:  c
// map:     a2
// forEach: A2

流復用###

Java 8 streams不能被復用,當你執行完任何一個最終操作(terminal operation)的時候流就被關閉了。

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

在同一個stream中執行完anyMatch后再執行noneMatch就會拋出如下異常:

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)

可以通過為每個最終操作(terminal operation)創建一個新的stream鏈的方式來解決上面的重用問題,Stream api中已經提供了一個stream supplier類來在已經存在的中間操作(intermediate operations )的stream基礎上構建一個新的stream。

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

streamSupplier的每個get()方法會構造一個新的stream,我們可以在這個stream上執行期望的最終操作(terminal operation)。


高級操作###

Streams支持多種不同的操作(operations),我們已經了解過filter,map等比較重要的操作。你可以通過Stream Javadoc進一步了解更多的操作。現在我們開始深入探討更復雜的操作:collect flatMap reduce

假設存在如下的用戶列表:

class Person {
    String name;
    int age;

    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return name;
    }
}

List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));

Collect(收集)###

Collect(收集)是一種是十分有用的最終操作,它可以把stream中的元素轉換成另外一種形式,比如;list,set,map。Collect使用Collector作為參數,Collector包含四種不同的操作:supplier(初始構造器), accumulator(累加器), combiner(組合器), finisher(終結者)。這聽起來很復雜,但是一個好消息是java 8通過Collectors類內置了各種復雜的收集操作,因此對於大部分常用的操作來說,你不需要自己去實現collector類。

從一個十分常見的用類開始:

List<Person> filtered =
    persons
        .stream()
        .filter(p -> p.name.startsWith("P"))
        .collect(Collectors.toList());

System.out.println(filtered);    // [Peter, Pamela]

通過上面的demo可以看出,將stream轉換為List十分簡單,如果想轉換為Set的話,只需使用Collectors.toSet()就可以了。

下面的例子暫時將用戶按年齡分組:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age));

personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]

Collectors類功能繁多,你可以通過Collectors對stream中的元素進行匯聚,比如:計算所有用戶的年紀。

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age));

System.out.println(averageAge);     // 19.0

可以通過summarizing collectors能返回一個內置的統計對象,通過這個對象能夠獲取更加全面的統計信息,比如用戶年紀中的最大值,最小值,平均年紀等結果。

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age));

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

下面的例子展示如何將所有用戶連接成一個字符串:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18)
    .map(p -> p.name)
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.

join collector的三個參數分別表示:連接符,字符串前綴,字符串后綴(可選)。

將一個stream轉換為map,我們必須指定map的key和value如何映射。要注意的是key的值必須是唯一性的,否則會拋出IllegalStateException,但是可以通過使用合並函數(可選)繞過這個IllegalStateException異常:

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2));

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}

前文已經介紹了jdk內置的一些很有用的collectors,接下來開始介紹如何構造我們自己所需的collector,我們的目標是將stream中所有用戶的用戶名變成大寫並用"|"符號連接成一個字符串。為了達成這個目標我們通過Collector.of()方法創建了一個新的collector,我們必須給這個collector提供四種功能:supplier, accumulator, combiner,finisher.

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator
        (j1, j2) -> j1.merge(j2),               // combiner
        StringJoiner::toString);                // finisher

String names = persons
    .stream()
    .collect(personNameCollector);

System.out.println(names);  // MAX | PETER | PAMELA | DAVID

由於JAVA中String是一個不可變對象,因此我們需要一個輔助類(比如StringJoiner)來幫助collect構造我們的字符串。supplier創建了一個包含適當分隔符的StringJoiner對象,accumulator用來將每個用戶名轉為大寫並添加到supplier創建的StringJoiner中,combiner將兩個StringJoiners對象連接成一個,最后一步的finisher從StringJoiner中構建出所希望的得到的string對象。


FlatMap###

我們已經了解:通過map方法可以將stream中的一種對象轉換成另外一種對象。但是map方法還是有使用場景限制,只能將一種對象映射為另外一種特定的已經存在的對象。是否能夠將一個對象映射為多種對象,或者映射成一個根本不存在的對象呢。這就是flatMap方法出現的目的。

FlatMap方法可以將一個stream中的每一個元素對象轉換為另一個stream中的另一種元素對象,因此可以將stream中的每個對象改造成零,一個或多個。flatMap操作的返回流包含這些改造后的對象。

為了演示flatMap,定義一個繼承關系如下:

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) {
        this.name = name;
    }
}

class Bar {
    String name;

    Bar(String name) {
        this.name = name;
    }
}

通過流實例化一隊對象:

List<Foo> foos = new ArrayList<>();

// create foos
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));

// create bars
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

完成上述操作之后我們得到三個foos,每個foos包含三個bars。

FlatMap接收一個返回值為stream的函數做參數,通過傳遞合適的函數,就可以解析每一個foo下對應的bar對象

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3

正如所見,我們成功地將三個對象的stream轉換成一個包含九個對象的stream

最后,上面的示例代碼可以簡化為一個單一管道流:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

FlatMap也支持JAVA8中新引入的Optional類,Optionals flatMap能返回一個另外的類的optional包裝類,可以用來減少對null的檢查。

假設有如下這種多層級結構:

class Outer {
    Nested nested;
}

class Nested {
    Inner inner;
}

class Inner {
    String foo;
}

為了獲取內部outer實例的內部foo對象,需要添加一系列空指針判斷

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}

可以采用optionals flatMap 操作獲得相同的結果:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);

上面的例子中flatMap的每次調用都會返回一個用Optional對象,如果有返回值則這個Optional對象是這個返回值的包裝類,如果返回值不存在則返回null。


Reduce(減少)###

reduce操作可以將stream中所有元素組合起來得到一個元素,JAVA8支持三中不同的reduce方法。

第一種能從stream元素序列中提取一個特定的元素。比如下面的從用戶列表中選擇年紀最大的用戶操作:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela

上面的實例中reduce方法接收一個二元累加計算函數(BinaryOperator accumulator function)作為參數,二元操作(BinaryOperator)實際就是上在兩個操作數共享同一類型。示例中函數比較兩人年齡,返回的最大年齡的人。

第二種reduce操作接收一個標識值和一個二元操作累加器作為參數,這個reduce方法可以把stream中所有用戶的名字和年齡匯總得到一個新用戶。

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76

第三種reduce方法,接收三個參數:一個標示值(identity value),一個二元操作累加器(BiFunction accumulator),一個二元組合方法。由於標識符參數未被嚴格限制為person類型,因此我們可以用這個reduce方法來獲取用戶的總年齡。

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum);  // 76

計算的結果是76,通過添加調試輸出,我們可以詳細地了解執行引擎中發生了什么。

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David

從調試輸出中可以看到,累加器做了所有的工作,它首先獲取值為0的標示值和第一個用戶Max,接下來的三步中持續sum值由於累加不斷變大,在最后一步匯總的年紀增長到76。

注意,上面的調試輸出中combiner沒有執行,通過parallel執行上面相同stream。

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35

Executing this stream in parallel results in an entirely different execution behavior. Now the combiner is actually called. Since the accumulator is called in parallel, the combiner is needed to sum up the separate accumulated values.

通過並行的方式執行上面的stream操作,得到的是另外一種完全不相同的執行動作。在並行stream中combiner方法會被調用。這是由於累加器是被並行調用的,因此組合器需要對分開的累加操作進行求和。

下一章會詳細描述並行stream。


Parallel Streams(並行流)###

為了提高大量輸入時的執行效率,stream可以采用並行的放行執行。並行流(Parallel Streams)通過ForkJoinPool.commonPool() 方法獲取一個可用的ForkJoinPool。這個ForkJoinPool使用5個線程(實際上是由底層可用的物理cpu核數決定的)。

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3

On my machine the common pool is initialized with a parallelism of 3 per default. This value can be decreased or increased by setting the following JVM parameter:
在我的機器上公共池初始化為每個默認3並行,這個值可以通過調整jvm參數來修改:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

Collections中包含parallelStream()方法,通過這個方法能夠為Collections中的元素創建並行流。另外也可以調用stream的parallel()方法將一個順序流轉變為一個並行流的拷貝。

為了了解並行流的執行動作,下面的例子會打印當前線程的執行信息。

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

執行的結果如下:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

通過分析調試輸出,我們可以更好地了解哪一個線程執行了哪些stream操作。從上面的輸出中我們可以看到parallel stream使用了ForkJoinPool提供的所有可用的線程來執行流的各種操作。由於不能確定哪個線程會執行並行流的哪個操作,因此反復執行上面的代碼,打印的結果會不同。

擴充上面的例子,添加sort操作

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

執行結果如下:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

這個執行結果看起來比較奇怪,看起來sort操作只是在main線程中順序執行的。實際上,parallel stream中的sort操作使用了JAVA 8的一個新方法:Arrays.parallelSort()。JAVA doc中是這樣描述Arrays.parallelSort()的:待排序數組的長度決定了排序操作是順序執行還是並行執行。java doc 描述如下:

If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.

回到上一章的例子,我們已經了解combiner方法只能在parallel streams中調用,讓我們來看下那些線程被實際調用:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });

執行結果如下:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]

從控制台輸出可以看到accumulator和combiner操作都被可用的線程並行執行了。

總結起來:在大數據量輸入的時候,parallel streams可以帶來比較大的性能提升。但是應該記住,一些並行操作,比如:reduce,collect需要額外的計算(組合操作),但是在順序流中,這些組合操作是不需要的。

另外,我們知道所有的parallel stream操作共享一個jvm范圍內的ForkJoinPool,所以你應該注意避免在parallel stream上執行慢阻塞流操作,因為這些操作可能導致你應用中依賴parallel streams操作的其他部分也會響應變慢。


結尾###

如果你想更多了解JAVA 8 的stream,你可以閱讀stream的JAVA doc,如果你想更深入了解stream的底層機制,你可以閱讀Martin Fowlers的文章Collection Pipelines

如果你對js也感興趣,你可以查看Stream.js(一個用js實現的java 8 stream api),你也可以查看我寫的java8教程。

希望這個教程對你有幫助,你也喜歡閱讀這個教程。這個教程的源碼和例子在github上,你可以免費fork或者在twitter上給我反饋。



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM