談談集合.Stream Api


1. 什么是stream API

Java8提供的stream API可以讓程序員像操作數據庫一樣操作集合。Stream API可以極大提高Java程序員的生產力,讓程序員寫出高效率、干凈、簡潔的代碼。同時它提供串行和並行兩種模式進行匯聚操作,並發模式能夠充分利用多核處理器的優勢,使用 fork/join 並行方式來拆分任務和加速處理過程。通常編寫並行代碼很難而且容易出錯, 但使用 Stream API 無需編寫一行多線程的代碼,就可以很方便地寫出高性能的並發程序。常用的stream API有如下;

+--------------------+       +------+   +------+   +---+   +-------+
| stream of elements +-----> |filter+-> |sorted+-> |map+-> |collect|
+--------------------+       +------+   +------+   +---+   +-------+

簡而言之,Stream API是一個非常高效的數據處理框架。

2. stream的幾個特點

  • 元素是特定類型的對象,形成一個隊列。 Java中的Stream並不會存儲元素,而是按需計算。
  • 數據源的來源。 可以是集合,數組,I/O channel, 產生器generator 和IntStream等
  • 聚合操作 類似SQL語句一樣的操作, 比如filter, map, reduce, find, match, sorted等。

3. Stream API使用列子

3.1 Stream分類

可以從不同的數據源創建stream。java collection包中的Collections,Lists,Sets這些類中新增stream()和parallelStream()方法,通過這些方法可以創建一個順序stream(sequential streams)或者一個並發的stream(Parallel streams)。並發stream(Parallel streams)更適合在多線程中使用,本文先介紹順序流(sequential streams)在結尾會描述並發stream(Parallel streams),

Arrays.asList("a1", "a2", "a3")
    .stream()
    .findFirst()
    .ifPresent(System.out::println);  // a1
1234

List對象上調用stream()方法可以返回一個常規的對象流。在下面的例子中我們不需要創建一個collection對象也可以使用stream:

Stream.of("a1", "a2", "a3")
    .findFirst()
    .ifPresent(System.out::println);  // a1
123

直接使用Stream.of()方法就能從一組對象創建一個stream對象,

除了常規的對象流,JAVA 8中的IntStream,LongStream,DoubleStream這些流能夠處理基本數據類型如:int,long,double。比如:IntStream可以使用range()方法能夠替換掉傳統的for循環

IntStream.range(1, 4)
    .forEach(System.out::println);
12

基本類型流(primitive streams)使用方式與常規對象流類型(regular object streams)大部分相同,但是基本類型流(primitive streams)能使用一些特殊的lambda表達式,比如:用IntFunction代替Function,用IntPredicate代替Predicate,同時基本類型流(primitive streams)中可以支持一些聚合方法,如:sum(),average()等。

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1)
    .average()
    .ifPresent(System.out::println);  // 5.0
1234

可以通過常規對象流(regular object stream)的mapToInt(), mapToLong(),mapToDouble(),基本類型對象流(primitive streams)中的mapToObj()等方法完成常規對象流和基本類型流之間的相互轉換

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);
123

下面這個例子中doubles stream先被映射成int stream,然后又被映射成String類型的對象流:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a

3.2 Stream API的處理順序

我們用下面的一個列子來引入Stream的處理順序:

  Stream.of("d2", "a2", "b1", "b3", "c")
                .filter(s -> {
                    System.out.println("filter: " + s);
                    return true;
                });

想象中,上面的列子會輸出下面的內容:

filter: d2
filter: a2
filter: b1
filter: b3
filter: c

但是當我們執行這段代碼的時候,控制台沒有輸出任何內容。下面會來講下出現這個現象的原因。在講這個原因之前我們先來引入兩個Stream相關的概念,能幫助我們更好的理解Stream API:

中間操作和最終操作

stream包含中間(intermediate operations)最終(terminal operation)兩種形式的操作。中間操作(intermediate operations)的返回值還是一個stream,因此可以通過鏈式調用將中間操作(intermediate operations)串聯起來。最終操作(terminal operation)只能返回void或者一個非stream的結果。在上述例子中:filter, map ,sorted是中間操作,而forEach是一個最終操作。更多關於stream的中可用的操作可以查看java doc。上面例子中的鏈式調用也被稱為操作管道流。

大多stream操作接受某種形式的lambda表達式作為參數,通過方法接口的形式指定操作的具體行為,這些方法接口的行為基本上都是無干擾(non-interfering)和無狀態(stateless)。無干擾(non-interfering)的方法的定義是:該方法不修改stream的底層數據源,比如上述例子中:沒有lambda表達式添加或者刪除myList中的元素。無狀態(stateless)方法的定義:操作的執行是獨立的,比如上述例子中,沒有lambda表達式在執行中依賴可能發生變化的外部變量或狀態。

簡單粗淺的總結下上面那段話:返回值是還是Stream類型的操作是中間操作,返回值是void或者是非Stream類型的操作的最終操作。Stream的API不會改變原始數據。

下面是Stream的接口,我們通過返回值就可以清楚的判斷哪些是中間操作哪些是最終操作。像我們平時常用的操作filter、map、distinct、sort和limit等都是中間操作。

public interface Stream<T> extends BaseStream<T, Stream<T>> {
    Stream<T> filter(Predicate<? super T> predicate);
    <R> Stream<R> map(Function<? super T, ? extends R> mapper);
    IntStream mapToInt(ToIntFunction<? super T> mapper);
    LongStream mapToLong(ToLongFunction<? super T> mapper);
    DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
    <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
    IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper);
    LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper);
    DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);
    Stream<T> distinct();
    Stream<T> sorted();
    Stream<T> sorted(Comparator<? super T> comparator);
    Stream<T> peek(Consumer<? super T> action);
    Stream<T> limit(long maxSize);
    Stream<T> skip(long n);
    void forEach(Consumer<? super T> action);
    void forEachOrdered(Consumer<? super T> action);
    Object[] toArray();
    <A> A[] toArray(IntFunction<A[]> generator);
    T reduce(T identity, BinaryOperator<T> accumulator);
    Optional<T> reduce(BinaryOperator<T> accumulator);
    <U> U reduce(U identity,
                 BiFunction<U, ? super T, U> accumulator,
                 BinaryOperator<U> combiner);
    <R> R collect(Supplier<R> supplier,
                  BiConsumer<R, ? super T> accumulator,
                  BiConsumer<R, R> combiner);
    <R, A> R collect(Collector<? super T, A, R> collector);
    Optional<T> min(Comparator<? super T> comparator);
    Optional<T> max(Comparator<? super T> comparator);
    long count();
    boolean anyMatch(Predicate<? super T> predicate);
    boolean allMatch(Predicate<? super T> predicate);
    boolean noneMatch(Predicate<? super T> predicate);
    Optional<T> findFirst();
    Optional<T> findAny();
    // Static factories
    public static<T> Builder<T> builder() {
        return new Streams.StreamBuilderImpl<>();
    }
    public static<T> Stream<T> empty() {
        return StreamSupport.stream(Spliterators.<T>emptySpliterator(), false);
    }
    public static<T> Stream<T> of(T t) {
        return StreamSupport.stream(new Streams.StreamBuilderImpl<>(t), false);
    }
    @SafeVarargs
    @SuppressWarnings("varargs") // Creating a stream from an array is safe
    public static<T> Stream<T> of(T... values) {
        return Arrays.stream(values);
    }
    public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
        Objects.requireNonNull(f);
        final Iterator<T> iterator = new Iterator<T>() {
            @SuppressWarnings("unchecked")
            T t = (T) Streams.NONE;
            @Override
            public boolean hasNext() {
                return true;
            }
            @Override
            public T next() {
                return t = (t == Streams.NONE) ? seed : f.apply(t);
            }
        };
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
                iterator,
                Spliterator.ORDERED | Spliterator.IMMUTABLE), false);
    }
    public static<T> Stream<T> generate(Supplier<T> s) {
        Objects.requireNonNull(s);
        return StreamSupport.stream(
                new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s), false);
    }
    public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b) {
        Objects.requireNonNull(a);
        Objects.requireNonNull(b);
        @SuppressWarnings("unchecked")
        Spliterator<T> split = new Streams.ConcatSpliterator.OfRef<>(
                (Spliterator<T>) a.spliterator(), (Spliterator<T>) b.spliterator());
        Stream<T> stream = StreamSupport.stream(split, a.isParallel() || b.isParallel());
        return stream.onClose(Streams.composedClose(a, b));
    }
    public interface Builder<T> extends Consumer<T> {
        @Override
        void accept(T t);
        default Builder<T> add(T t) {
            accept(t);
            return this;
        }
        Stream<T> build();
    }
}

有了上面中間操作和最終操作的基礎,我們再來看看上面的列子會發現列子中的操作只有中間操作而沒有最終操作。說到這里大家可能已經知道答案了:Stream的中操作只有執行到最終操作時才會被觸發。

下面就加上一個最終操作看看效果:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));

上面代碼輸出下面的結果:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c

我們會發現filter操作在過濾出一個元素后會立馬進入下一步執行,而不是等待將整個集合過濾完再操作。(map操作也是類似的行為)

3.3 執行效率與steream執行鏈順序的關系

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A");
    })
    .forEach(s -> System.out.println("forEach: " + s));

調整上面的filter和map的順序能大量減少map的執行次數,提升執行效率;

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

Sorting 是一種特殊的中間操作(intermediate operation),在對集合中元素進行排序過程中需要保存元素的狀態,因此Sorting 是一種有狀態的操作(stateful operation)。

首先,在整個輸入集上執行排序操作(即先對集合進行水平操作),由於輸入集合中的元素間存在多種組合,因此上面的例子中sorted操作被執行了8次。

可以通過對執行鏈重排序的方式,提升stream的執行效率。修改執行鏈順序之后由於filter操作的過濾,導致sorted操作的輸入集只有一個元素,在大數據量的情況下能夠大幅度提高執行效率。

3.4 流復用

Stream有一個特性,就是當你執行完任何一個最終操作(terminal operation)的時候流就被關閉了

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));        

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

在同一個stream中執行完anyMatch后再執行noneMatch就會拋出如下異常:

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)
12345

可以通過為每個最終操作(terminal operation)創建一個新的stream鏈的方式來解決上面的重用問題,Stream api中已經提供了一個stream supplier類來在已經存在的中間操作(intermediate operations )的stream基礎上構建一個新的stream。

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok
streamSupplier的每個get()方法會構造一個新的stream,我們可以在這個stream上執行期望的最終操作(terminal operation)。

3.5 一些高級操作

3.5.1 Collect(收集)操作

Collect(收集)是一種是十分有用的最終操作,它可以把stream中的元素轉換成另外一種形式,比如;list,set,map。Collect使用Collector作為參數,Collector包含四種不同的操作:supplier(初始構造器), accumulator(累加器), combiner(組合器), finisher(終結者)。這聽起來很復雜,但是一個好消息是java 8通過Collectors類內置了各種復雜的收集操作,因此對於大部分常用的操作來說,你不需要自己去實現collector類。

使用Collectors可很輕松的生成List、Set和Map對象。

List<Person> filtered =
    persons
        .stream()
        .filter(p -> p.name.startsWith("P"))
        .collect(Collectors.toList());

通過上面的demo可以看出,將stream轉換為List十分簡單,如果想轉換為Set的話,只需使用Collectors.toSet()就可以了。

3.5.2 FlatMap操作

我們已經了解:通過map方法可以將stream中的一種對象轉換成另外一種對象。但是map方法還是有使用場景限制,只能將一種對象映射為另外一種特定的已經存在的對象。是否能夠將一個對象映射為多種對象,或者映射成一個根本不存在的對象呢。這就是flatMap方法出現的目的。

FlatMap方法可以將一個stream中的每一個元素對象轉換為另一個stream中的另一種元素對象,因此可以將stream中的每個對象改造成零,一個或多個。flatMap操作的返回流包含這些改造后的對象。‘

下面給出一個列子:

 String[] words = new String[]{"Hello","World"};
        List<String> a = Arrays.stream(words)
                .map(word -> word.split(""))
                .flatMap(Arrays::stream)
                .distinct()
                .collect(toList());
        a.forEach(System.out::print);

flatMap方法接收一個Stream類型的參數,從它的名字可以看出來,這個方法的作用是將多個Stream合並成一個Stream(然不是我們想當然的生成一個Map)。

3.5.3 Reduce操作

reduce操作可以將stream中所有元素組合起來得到一個元素,JAVA8支持三中不同的reduce方法。

  1. 通過一個比較規則進行reduce
persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);
  1. 第二種reduce操作接收一個標識值和一個二元操作累加器作為參數,這個reduce方法可以把stream中所有用戶的名字和年齡匯總得到一個新用戶。
Person result =
persons
.stream()
.reduce(new Person("", 0), (p1, p2) -> {
p1.age += p2.age;
p1.name += p2.name;
return p1;
});

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76
1234567891011
  1. 第三種reduce方法,接收三個參數:一個標示值(identity value),一個二元操作累加器(BiFunction accumulator),一個二元組合方法。由於標識符參數未被嚴格限制為person類型,因此我們可以用這個reduce方法來獲取用戶的總年齡。
   Integer ageSum = persons
       .stream()
       .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

   System.out.println(ageSum);  // 76
   12345

計算的結果是76,通過添加調試輸出,我們可以詳細地了解執行引擎中發生了什么。

   Integer ageSum = persons
       .stream()
       .reduce(0,
           (sum, p) -> {
               System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
               return sum += p.age;
           },
           (sum1, sum2) -> {
               System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
               return sum1 + sum2;
           });

   // accumulator: sum=0; person=Max
   // accumulator: sum=18; person=Peter
   // accumulator: sum=41; person=Pamela
   // accumulator: sum=64; person=David
   12345678910111213141516

從調試輸出中可以看到,累加器做了所有的工作,它首先獲取值為0的標示值和第一個用戶Max,接下來的三步中持續sum值由於累加不斷變大,在最后一步匯總的年紀增長到76。

注意,上面的調試輸出中combiner沒有執行,通過parallel執行上面相同stream。

   Integer ageSum = persons
       .parallelStream()
       .reduce(0,
           (sum, p) -> {
               System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
               return sum += p.age;
           },
           (sum1, sum2) -> {
               System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
               return sum1 + sum2;
           });

   // accumulator: sum=0; person=Pamela
   // accumulator: sum=0; person=David
   // accumulator: sum=0; person=Max
   // accumulator: sum=0; person=Peter
   // combiner: sum1=18; sum2=23
   // combiner: sum1=23; sum2=12
   // combiner: sum1=41; sum2=35
   12345678910111213141516171819

通過並行的方式執行上面的stream操作,得到的是另外一種完全不相同的執行動作。在並行stream中combiner方法會被調用。這是由於累加器是被並行調用的,因此組合器需要對分開的累加操作進行求和。

4. 並行Stream

為了提高大量輸入時的執行效率,stream可以采用並行的放行執行。並行流(Parallel Streams)通過ForkJoinPool.commonPool() 方法獲取一個可用的ForkJoinPool。這個ForkJoinPool使用5個線程(實際上是由底層可用的物理cpu核數決定的)

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3
On my machine the common pool is initialized with a parallelism of 3 per default. This value can be decreased or increased by setting the following JVM parameter:
123

在我的機器上公共池初始化為每個默認3並行,這個值可以通過調整jvm參數來修改:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
1

Collections中包含parallelStream()方法,通過這個方法能夠為Collections中的元素創建並行流。另外也可以調用stream的parallel()方法將一個順序流轉變為一個並行流的拷貝。

為了了解並行流的執行動作,下面的例子會打印當前線程的執行信息。

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));
1234567891011121314

執行的結果如下:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
123456789101112131415

通過分析調試輸出,我們可以更好地了解哪一個線程執行了哪些stream操作。從上面的輸出中我們可以看到parallel stream使用了ForkJoinPool提供的所有可用的線程來執行流的各種操作。由於不能確定哪個線程會執行並行流的哪個操作,因此反復執行上面的代碼,打印的結果會不同。

擴充上面的例子,添加sort操作

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));
12345678910111213141516171819

執行結果如下:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]
123456789101112131415161718192021

這個執行結果看起來比較奇怪,看起來sort操作只是在main線程中順序執行的。實際上,parallel stream中的sort操作使用了JAVA 8的一個新方法:Arrays.parallelSort()。JAVA doc中是這樣描述Arrays.parallelSort()的:待排序數組的長度決定了排序操作是順序執行還是並行執行。java doc 描述如下:

If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.
1

回到上一章的例子,我們已經了解combiner方法只能在parallel streams中調用,讓我們來看下那些線程被實際調用:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });
12345678910111213141516171819

執行結果如下:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]
1234567

從控制台輸出可以看到accumulator和combiner操作都被可用的線程並行執行了。

總結起來:在大數據量輸入的時候,parallel streams可以帶來比較大的性能提升。但是應該記住,一些並行操作,比如:reduce,collect需要額外的計算(組合操作),但是在順序流中,這些組合操作是不需要的。

另外,我們知道所有的parallel stream操作共享一個jvm范圍內的ForkJoinPool,所以你應該注意避免在parallel stream上執行慢阻塞流操作,因為這些操作可能導致你應用中依賴parallel streams操作的其他部分也會響應變慢。

5. 使用詳細列子

package com.csx.demo.spring.boot.lambda;

import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.*;

public class LambdaStreamDemo {

    public static void main(String[] args) {

        List<Person> javaProgrammers = new ArrayList<Person>() {
            {
                add(new Person("Elsdon", "Jaycob", "Java programmer", "male", 43, 2000));
                add(new Person("Tamsen", "Brittany", "Java programmer", "female", 23, 1500));
                add(new Person("Floyd", "Donny", "Java programmer", "male", 33, 1800));
                add(new Person("Sindy", "Jonie", "Java programmer", "female", 32, 1600));
                add(new Person("Vere", "Hervey", "Java programmer", "male", 22, 1200));
                add(new Person("Maude", "Jaimie", "Java programmer", "female", 27, 1900));
                add(new Person("Shawn", "Randall", "Java programmer", "male", 30, 2300));
                add(new Person("Jayden", "Corrina", "Java programmer", "female", 35, 1700));
                add(new Person("Palmer", "Dene", "Java programmer", "male", 33, 2000));
                add(new Person("Addison", "Pam", "Java programmer", "female", 34, 1300));
            }
        };

        List<Person> phpProgrammers = new ArrayList<Person>() {
            {
                add(new Person("Jarrod", "Pace", "PHP programmer", "male", 34, 1550));
                add(new Person("Clarette", "Cicely", "PHP programmer", "female", 23, 1200));
                add(new Person("Victor", "Channing", "PHP programmer", "male", 32, 1600));
                add(new Person("Tori", "Sheryl", "PHP programmer", "female", 21, 1000));
                add(new Person("Osborne", "Shad", "PHP programmer", "male", 32, 1100));
                add(new Person("Rosalind", "Layla", "PHP programmer", "female", 25, 1300));
                add(new Person("Fraser", "Hewie", "PHP programmer", "male", 36, 1100));
                add(new Person("Quinn", "Tamara", "PHP programmer", "female", 21, 1000));
                add(new Person("Alvin", "Lance", "PHP programmer", "male", 38, 1600));
                add(new Person("Evonne", "Shari", "PHP programmer", "female", 40, 1800));
            }
        };

        //----------------------forEach使用----------------------
        //所有程序員的姓名
        //forEach方法接收一個Consumer參數,來遍歷處理每個對象
        System.out.println("---------------------forEach使用--------------------------");
        System.out.println("java programmer:");
        javaProgrammers.forEach((person) -> {
            System.out.println(person.getFirstName() + "." + person.getLastName());
        });
        System.out.println("php programmer:");
        phpProgrammers.forEach((person) -> {
            System.out.println(person.getFirstName() + "." + person.getLastName());
        });
        //給所有程序員的薪水上漲5%
        System.out.println("給程序員加薪 5% :");
        Consumer<Person> giveRaise = e -> e.setSalary(e.getSalary() / 100 * 5 + e.getSalary());
        javaProgrammers.forEach(giveRaise);
        phpProgrammers.forEach(giveRaise);

        //-----------------------filter使用---------------------
        //過濾器的使用
        //顯示月薪超所1400美元的php程序員
        System.out.println("---------------------filter使用--------------------------");
        System.out.println("顯示月薪超過1400的php程序員:");
        phpProgrammers.stream()
                .filter(p -> p.getSalary() > 1400)
                .forEach(person -> System.out.println(person.getFirstName() + "." + person.getLastName()));
        //定義filters,這些定義的Filter可以重用
        Predicate<Person> ageFilter = (p) -> (p.getAge() > 25);
        Predicate<Person> salaryFilter = (p) -> (p.getSalary() > 1400);
        Predicate<Person> genderFilter = (p) -> ("female".equals(p.getGender()));
        System.out.println("下面是年齡大於24歲且月薪在$1,400以上的女PHP程序員:");
        phpProgrammers.stream()
                .filter(ageFilter)
                .filter(salaryFilter)
                .filter(genderFilter)
                .forEach((p) -> System.out.printf("%s %s; ", p.getFirstName(), p.getLastName()));
        //重用filters
        System.out.println("下面是年齡大於24歲的女性 Java programmers:");
        javaProgrammers.stream()
                .filter(ageFilter)
                .filter(genderFilter)
                .forEach((p) -> System.out.printf("%s %s; ", p.getFirstName(), p.getLastName()));
        //-----------------------------map的使用---------------------------------
        System.out.println("---------------------map的使用--------------------------");
        System.out.println("將 PHP programmers 的 first name 拼接成字符串:");
        String phpDevelopers = phpProgrammers
                .stream()
                .map(Person::getFirstName)
                .collect(joining(" ; "));

        System.out.println("將 Java programmers 的 first name 存放到 Set:");
        Set<String> javaDevFirstName = javaProgrammers
                .stream()
                .map(Person::getFirstName)
                .collect(toSet());

        System.out.println("將 Java programmers 的 first name 存放到 TreeSet:");
        TreeSet<String> javaDevLastName = javaProgrammers
                .stream()
                .map(Person::getLastName)
                .collect(toCollection(TreeSet::new));
        //------------------------------limit使用--------------------------------------
        System.out.println("---------------------limit使用使用--------------------------");
        System.out.println("最前面的3個Java programmers:");
        javaProgrammers.stream()
                .limit(3)
                .forEach((p) -> System.out.printf("%s %s; ", p.getFirstName(), p.getLastName()));
        System.out.println("最前面的3個女性 Java programmers:");
        javaProgrammers.stream()
                .filter(genderFilter)
                .limit(3)
                .forEach((p) -> System.out.printf("%s %s; ", p.getFirstName(), p.getLastName()));
        //------------------------------sort排序使用---------------------------------------
        System.out.println("---------------------sort排序使用--------------------------");
        System.out.println("根據name排序,並顯示前5個 Java programmers:");
        List<Person> sortedJavaProgrammers = javaProgrammers
                .stream()
                .sorted(Comparator.comparing(Person::getFirstName))
                .limit(5)
                .collect(toList());
        sortedJavaProgrammers.forEach((p) -> System.out.printf("%s %s; ", p.getFirstName(), p.getLastName()));
        //------------------------------max和min方法的使用-------------------------------
        System.out.println("---------------------max和min方法使用--------------------------");
        System.out.println("工資最低的 Java programmer:");
        Person pears = javaProgrammers
                .stream()
                .min(Comparator.comparingInt(Person::getSalary))
                .get();
        System.out.printf("Name: %s %s; Salary: $%,d.", pears.getFirstName(), pears.getLastName(), pears.getSalary());

        System.out.println("工資最高的 Java programmer:");
        Person person = javaProgrammers
                .stream()
                .max(Comparator.comparingInt(Person::getSalary))
                .get();
        System.out.printf("Name: %s %s; Salary: $%,d.", person.getFirstName(), person.getLastName(), person.getSalary());

        //-------------------------Stream的一些高級用法------------------------
        System.out.println("---------------------Stream的一些高級用法--------------------");
        System.out.println("---------------------Collect的用法--------------------");

        System.out.println("---------------------將Stream轉為List--------------------");
        List<Person> list = javaProgrammers.stream()
                .filter(p -> p.getSalary() > 1000)
                .collect(Collectors.toList());

        System.out.println("---------------------將Stream轉為Set--------------------");
        Set<Person> set = javaProgrammers.stream()
                .filter(p -> p.getSalary() > 1000)
                .collect(Collectors.toSet());

        System.out.println("---------------------將Stream轉為(線程非安全)Map--------------------");
        System.out.println("---------------------將Java程序員按照性別不同分組--------------------");
        Map<String, List<Person>> listMap = javaProgrammers.stream()
                .filter(p -> p.getSalary() > 1000)
                .collect(groupingBy(Person::getGender));
        System.out.println("---------------------將Stream轉為(線程安全)Map--------------------");
        System.out.println("---------------------將Java程序員按照性別不同分組--------------------");
        ConcurrentMap<String, List<Person>> listConcurrentMap = javaProgrammers.stream()
                .filter(p -> p.getSalary() > 1000)
                .collect(groupingByConcurrent(Person::getGender));
        //將一個stream轉換為map,我們必須指定map的key和value如何映射。要注意的是key的值必須是唯一性的,
        // 否則會拋出IllegalStateException,但是可以通過使用合並函數(可選)繞過這個IllegalStateException異常:

        //將age作為key,firstName作為value,如果遇到age相同,則將兩個age相同的Person的FirstName相加作為Value
        Map<Integer, String> integerStringMap = javaProgrammers.stream()
                .collect(toMap(
                        p -> p.getAge(),
                        p -> p.getFirstName(),
                        (value1, value2) -> value1 + value2
                ));

        System.out.println("--------------------計算平均值--------------------");
        Double avAge = javaProgrammers.stream()
                .collect(averagingInt(p -> p.getAge()));
        System.out.println("java程序員平均年齡:"+avAge);

        System.out.println("--------------------獲取Java程序員age的統計信息--------------------");
        System.out.println("-------------包括年齡最大值,最小值和平均值等----------------");
        IntSummaryStatistics collect = javaProgrammers.stream()
                .collect(summarizingInt(p -> p.getAge()));

        System.out.println("--------------------獲取Java程序員的FirstName進行拼接--------------------");
        String s = javaProgrammers.stream()
                .map(p -> p.getFirstName())
                .collect(joining("and", "preFix", "endFix"));

        //也可以通過Collector.of()方法創建了一個自定義的collector,我們必須給這個collector提供四種功能:
        // supplier, accumulator, combiner,finisher.
        System.out.println("-------------------可以自定義collector--------------------");

        //-------------------flatMap的使用----------------------
        System.out.println("-------------------flatMap的使用----------------------");
        String[] words = new String[]{"Hello","World"};
        List<String> a = Arrays.stream(words)
                .map(word -> word.split(""))
                .flatMap(Arrays::stream)
                .distinct()
                .collect(toList());
        a.forEach(System.out::print);

        //------------------reduce的使用-----------------
        System.out.println("-------------------reduce的使用----------------------");
        javaProgrammers.stream()
                .reduce((p1,p2)->p1.getAge()>p2.getAge()?p1:p2)
                .ifPresent(System.out::println);
        //這個方法直接返回的是Person,將所有Java程序員的年齡相加起來,在set到傳進去的Person中
        Person reduce = javaProgrammers.stream()
                .reduce(new Person("", "", "", "", 10, 0), (p1, p2) -> {
                    p1.setAge(p1.getAge() + p2.getAge());
                    p1.setFirstName(p1.getFirstName() + p2.getFirstName());
                    return p1;
                });
        System.out.println(reduce);
        //這個方法返回一個標量值,注意,這種方式中第三個參數是不會被執行的,只有當並行模式下,第三個參數才會被執行
        Integer ageSum = javaProgrammers
                .stream()
                .reduce(0,
                        (sum, p) -> {
                            System.out.format("accumulator: sum=%s; person=%s\n", sum, p.getFirstName());
                            return sum += p.getAge();
                        },
                        (sum1, sum2) -> {
                            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
                            return sum1 + sum2;
                        });
        System.out.println(ageSum);
        //--------------------------------Streams 還可以是並行的(parallel)-------------
        ForkJoinPool commonPool = ForkJoinPool.commonPool();
        System.out.println(commonPool.getParallelism());

    }
}

6. 簡單總結

  • 可以通過集合對象(List、Set和Map等)的stream方法會的Stream,可以通過Stream.of方法獲得Stream,也可以通過IntStream.range方法獲得Stream;
  • Stream的操作分為中間操作和最終操作,中間操作會返回一個Stream,最終操作會關閉Stream,只有執行到最終操作才會觸發中間操作;Stream的API不會改變原始數據
  • 適當調整執行順序可以提升執行效率
  • Supplier可以對Stream進行復用。

7. 參考

公眾號推薦

歡迎大家關注我的微信公眾號「程序員自由之路」


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM