什么是Stream?
Stream它並不是一個容器,它只是對容器的功能進行了增強,添加了很多便利的操作,例如查找、過濾、分組、排序等一系列的操作。並且有串行、並行兩種執行模式,並行模式充分的利用了多核處理器的優勢,使用fork/join框架進行了任務拆分,同時提高了執行速度。簡而言之,Stream就是提供了一種高效且易於使用的處理數據的方式。
- 特點:
- Stream自己不會存儲元素。
- Stream的操作不會改變源對象。相反,他們會返回一個持有結果的新Stream。
- Stream 操作是延遲執行的。它會等到需要結果的時候才執行。也就是執行終端操作的時候。
- 圖解:
一個Stream的操作就如上圖,在一個管道內,分為三個步驟,第一步是創建Stream,從集合、數組中獲取一個流,第二步是中間操作鏈,對數據進行處理。第三步是終端操作,用來執行中間操作鏈,返回結果。
怎么創建Stream?
- 由集合創建:
Java8 中的 Collection 接口被擴展,提供了兩個獲取流的方法,這兩個方法是default方法,也就是說所有實現Collection接口的接口都不需要實現就可以直接使用:
- default Stream<E> stream() : 返回一個順序流。
-
default Stream<E> parallelStream() : 返回一個並行流。
例如: List<Integer> integerList = new ArrayList<>(); integerList.add(1); integerList.add(2); Stream<Integer> stream = integerList.stream(); Stream<Integer> stream1 = integerList.parallelStream();
- 由數組創建:
Java8 中的 Arrays 的靜態方法 stream() 可以獲取數組流:
- static <T> Stream<T> stream(T[] array): 返回一個流
- 重載形式,能夠處理對應基本類型的數組:
public static IntStream stream(int[] array)
public static LongStream stream(long[] array)
public static DoubleStream stream(double[] array)
例如: int[] intArray = {1,2,3}; IntStream stream = Arrays.stream(intArray);
- 由值創建:
可以使用靜態方法 Stream.of(), 通過顯示值 創建一個流。它可以接收任意數量的參數。
-
public static<T> Stream<T> of(T... values) : 返回一個流。
例如:
Stream<Integer> integerStream = Stream.of(1, 2, 3, 4, 5, 6, 7, 8);
- 由函數創建:創建無限流
可以使用靜態方法 Stream.iterate() 和 Stream.generate()創建無限流。
- 迭代
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) -
生成
public static<T> Stream<T> generate(Supplier<T> s)
例如: Stream.generate(Math::random).limit(5).forEach(System.out::print); List<Integer> collect = Stream.iterate(0,i -> i + 1).limit(5).collect(Collectors.toList());
-
注意:使用無限流一定要配合limit截斷,不然會無限制創建下去。
Stream的中間操作
如果Stream只有中間操作是不會執行的,當執行終端操作的時候才會執行中間操作,這種方式稱為延遲加載或惰性求值。多個中間操作組成一個中間操作鏈,只有當執行終端操作的時候才會執行一遍中間操作鏈,具體是因為什么我們在后面再說明。下面看下Stream有哪些中間操作。
- Stream
<T>
distinct():
去重,通過流所生成元素的 hashCode() 和 equals() 去除重復元素。
- Stream
<T>
filter(Predicate<? super T> predicate):
Predicate函數在上一篇當中我們已經講過,它是斷言型接口,所以filter方法中是接收一個和Predicate函數對應Lambda表達式,返回一個布爾值,從流中過濾某些元素。
- Stream
<T>
sorted(Comparator<? super T> comparator):
指定比較規則進行排序。
- Stream
<T>
limit(long maxSize):
截斷流,使其元素不超過給定數量。如果元素的個數小於maxSize,那就獲取所有元素。
- Stream
<T>
skip(long n):
跳過元素,返回一個扔掉了前 n 個元素的流。若流中元素不足 n 個,則返回一個空流。與 limit(n) 互補。
- Stream
<R>
map(Function<? super T, ? extends R> mapper):
接收一個Function函數作為參數,該函數會被應用到每個元素上,並將其映射成一個新的元素。也就是轉換操作,map還有三個應用於具體類型方法,分別是:mapToInt,mapToLong和mapToDouble。這三個方法也比較好理解,比如mapToInt就是把原始Stream轉換成一個新的Stream,這個新生成的Stream中的元素都是int類型。這三個方法可以免除自動裝箱/拆箱的額外消耗。
- Stream
<R>
flatMap(Function<? super T, ? extends Stream<? extends R>> mapper):
接收一個Function函數作為參數,將流中的每個值都轉換成另一個流,然后把所有流連接成一個流。flatMap也有三個應用於具體類型的方法,分別是:flatMapToInt、flatMapToLong、flatMapToDouble,其作用於map的三個衍生方法相同。
Stream的終端操作
終端操作執行中間操作鏈,並返回結果。終端操作我們就不一一介紹了,只介紹一下常用的操作。詳細可看java.util.stream.Stream接口中的方法。
- void forEach(Consumer<? super T> action):
內部迭代(需要用戶去做迭代,稱為外部迭代。相反,Stream API使用內部迭代幫你把迭代做了)
users.stream().forEach(user -> System.out.println(user.getName()));
- <R, A> R collect(Collector<? super T, A, R> collector):
收集、將流轉換為其他形式,比如轉換成List、Set、Map。collect方法是用Collector作為參數,Collector接口中方法的實現決定了如何對流執行收集操作(如收集到 List、Set、Map)。但是 Collectors 實用類提供了很多靜態方法,可以方便地創建常見收集器實例。例舉一些常用的:
List<User> users = Lists.newArrayList(); users.add(new User(15, "A", ImmutableList.of("1元", "5元"))); users.add(new User(25, "B", ImmutableList.of("10元", "50元"))); users.add(new User(21, "C", ImmutableList.of("100元"))); //收集名稱到List List<String> nameList = users.stream().map(User::getName).collect(Collectors.toList()); //收集名稱到List Set<String> nameSet = users.stream().map(User::getName).collect(Collectors.toSet()); //收集到map,名字作為key,user對象作為value Map<String, User> userMap = users.stream() .collect(Collectors.toMap(User::getName, Function.identity(), (k1, k2) -> k2));
-
其他終端操作:
- boolean allMatch(Predicate<? super T> predicate); 檢查是否匹配所有元素。
- boolean anyMatch(Predicate<? super T> predicate); 檢查是否至少匹配一個元素。
- boolean noneMatch(Predicate<? super T> predicate); 檢查是否沒有匹配所有元素。
- Optional
<T>
findFirst(); 返回當前流中的第一個元素。 - Optional
<T>
findAny(); 返回當前流中的任意元素。 - long count(); 返回流中元素總數。
- Optional
<T>
max(Comparator<? super T> comparator); 返回流中最大值。 - Optional
<T>
min(Comparator<? super T> comparator); 返回流中最小值。 - T reduce(T identity, BinaryOperator
<T>
accumulator); 可以將流中元素反復結合起來,得到一個值。 返回 T。這是一個歸約操作。
Fork/Join框架
上面我們提到過,說Stream的並行模式使用了Fork/Join框架,這里簡單說下Fork/Join框架是什么?Fork/Join框架是java7中加入的一個並行任務框架,可以將任務拆分為多個小任務,每個小任務執行完的結果在合並成為一個結果。在任務的執行過程中使用工作竊取(work-stealing)算法,減少線程之間的競爭。
- Fork/Join圖解
- 工作竊取圖解
Stream是怎么實現的
先看下整體類圖:藍色箭頭代表繼承,綠色箭頭代表實現,紅色箭頭代表內部類。
實際上Stream只有兩種操作,中間操作、終端操作,中間操作只是一種標記,只有終端操作才會實際觸發執行。所以Stream流水線式的操作大致應該是用某種方式記錄中間操作,只有調用終端操作才會將所有的中間操作疊加在一起在一次迭代中全部執行。這里只做簡單的介紹,想詳細了解的可以參考下面的參考資料中的鏈接。
- 操作怎么記錄?
Stream的操作記錄是通過ReferencePipeline記錄的,ReferencePipeline有三個內部類Head、StatelessOp、StatefulOp,Stream中使用Stage的概念來描述一個完整的操作,並用某種實例化后的ReferencePipeline來代表Stage,Head用於表示第一個Stage,即調用諸如Collection.stream()方法產生的Stage,很顯然這個Stage里不包含任何操作,StatelessOp和StatefulOp分別表示無狀態和有狀態的Stage,對應於無狀態和有狀態的中間操作。
- 操作怎么疊加?
操作是記錄完了,但是前面的Stage並不知道后面Stage到底執行了哪種操作,以及回調函數是哪種形式。這就需要有某種協議來協調相鄰Stage之間的調用關系。
這種協議由Sink接口完成,Sink接口包含的方法如下表所示:
- void begin(long size),開始遍歷元素之前調用該方法,通知Sink做好准備。
- void end(),所有元素遍歷完成之后調用,通知Sink沒有更多的元素了。
- boolean cancellationRequested(),是否可以結束操作,可以讓短路操作盡早結束。
- void accept(T t),遍歷元素時調用,接受一個待處理元素,並對元素進行處理。Stage把自己包含的操作和回調方法封裝到該方法里,前一個Stage只需要調用當前Stage.accept(T t)方法就行了。
每個Stage都會將自己的操作封裝到一個Sink里,前一個Stage只需調用后一個Stage的accept()方法即可,並不需要知道其內部是如何處理的。有了Sink對操作的包裝,Stage之間的調用問題就解決了,執行時只需要從流水線的head開始對數據源依次調用每個Stage對應的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。
- 操作怎么執行?
Sink完美封裝了Stream每一步操作,並給出了[處理->轉發]的模式來疊加操作。這一連串的齒輪已經咬合,就差最后一步撥動齒輪啟動執行。是什么啟動這一連串的操作呢?也許你已經想到了啟動的原始動力就是結束操作(Terminal Operation),一旦調用某個結束操作,就會觸發整個流水線的執行。