Stream和Collection的區別是什么
流和集合的區別是什么?
粗略地說, 集合和流之間的差異就在於什么時候進行計算。集合是一個內存中的數據結構,它包含數據結構中目前所有的值--集合中的每個元素都得先計算出來才能添加到內存里。(你可以往集合里加東西或者刪東西,但是不管什么時候,集合中的每個元素都是放在內存里的,元素都得計算出來才能成為集合的一部分。)
相比之下,流則是在概念上固定的數據結構(你不能添加或者刪除元素),其元素則是按需計算的。這對編程有很大的好處。用戶僅僅從流中提取需要的值,而這些值--在用戶看不見的地方--只會按需生成。這是一種生產者 - 消費者的關系。從另一個角度來說,流就像一個延遲創建的集合:只有在消費者要求的時候才會計算值。
Stream是內部迭代
一個明顯的區別是迭代方式不同。Collection需要手動for-each或者使用Iterator在外部迭代。而Stream則開啟后可以直接對單個元素進行操作,內部幫你做好了迭代工作。
內部迭代的好處是可一個更好的並行。自己手寫迭代需要處理好每次迭代的內容。為了提高執行效率,也許會把多個處理邏輯寫到同一個遍歷里。比如,有同事看到從scala轉過來的同事的代碼,說他寫的代碼經常重復好多次。scala是函數式語言,和流天然集成。而我們慣性的做法,還是把一堆操作邏輯寫到同一個循環體中,來滿足自己對所謂的性能要求的潔癖。這常常會使得可讀性變差。很厭煩閱讀超過100行的代碼,尤其代碼還有首尾同步處理的邏輯(for, try-catch),很容易出錯。多寫一次循環來做這些事情,心理又過不去。
Stream開啟流之后,系統內部會分析對元素的操作是否可以並行,然后合並執行。也就是說,看起來,自己filter-map-filter-map-group很多次,但真實執行的時候並不是遍歷了很多次。至於到底遍歷了多少次。這是一個好問題,后面會說明這個問題。
使用流Stream的注意事項
流只能消費一次。比如,foreach只能遍歷一次stream。再次則會拋異常。
流操作
針對流的操作方式兩種:
中間操作
可以連接起來的流操作叫做中間操作。諸如filter或map等中間操作會返回另一個流。這讓多個操作可以連接起來形成一個查詢。但是,除非調用一個終端操作,比如collect,foreach, 否則中間操作不會執行----它們很懶。這是因為中間操作一般可以合並起來,在終端操作時一次性全部處理。
終端操作
關閉流的操作叫做終端操作。終端操作會從流的流水線生成結果。
使用流
本文demo源碼: https://github.com/Ryan-Miao/someTest/tree/master/src/main/java/com/test/java8/streams
新建一個Entity作為基本元素。
package com.test.java8.streams.entity;
/**
* Created by Ryan Miao on 12/11/17.
*/
public class Dish {
private final String name;
private final boolean vegetarian;
private final int calories;
private final Type type;
public Dish(String name, boolean vegetarian, int calories, Type type) {
this.name = name;
this.vegetarian = vegetarian;
this.calories = calories;
this.type = type;
}
public String getName() {
return name;
}
public boolean isVegetarian() {
return vegetarian;
}
public int getCalories() {
return calories;
}
public Type getType() {
return type;
}
public enum Type{
MEAT, FISH, OTHER
}
}
最常用,最簡單的用法
Stream API支持許多操作,這些操作能讓你快速完成復雜的數據查詢,比如篩選、切片、映射、查找、匹配和歸約。
package com.test.java8.streams;
import com.google.common.collect.Lists;
import com.test.java8.streams.entity.Dish;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* Created by Ryan Miao on 12/11/17.
*/
public class StreamExample {
private List<Dish> menu;
@Before
public void setUp(){
menu = Lists.newArrayList(
new Dish("pork", false, 800, Dish.Type.MEAT),
new Dish("beef", false, 700, Dish.Type.MEAT),
new Dish("chicken", false, 400, Dish.Type.MEAT),
new Dish("french fries", true, 530, Dish.Type.OTHER),
new Dish("rice", true, 350, Dish.Type.OTHER),
new Dish("season fruit", true, 120, Dish.Type.OTHER),
new Dish("pizza", true, 550, Dish.Type.OTHER),
new Dish("prawns", false, 300, Dish.Type.FISH),
new Dish("salmon", false, 450, Dish.Type.FISH)
);
}
@Test
public void demo(){
List<String> threeHighCaloricDishNames = menu.stream()
.filter(dish -> dish.getCalories() > 300)
.map(Dish::getName)
.limit(3)
.collect(toList());
System.out.println(threeHighCaloricDishNames);
}
}
stream()將一個集合轉換成一個流,流和list一樣,都是單元素的集合體。filter()接受一個布爾值lambda,即一個謂詞。當表達式的value是true的時候,該元素通過篩選。map()接受一個轉換lambda,將一個元素class映射成另一個class。collect收集器,匯總結果,觸發流,終端操作。

謂詞篩選filter
謂詞是一個返回boolean的函數,也就是條件,通過這個條件進行篩選。
@Test
public void testFilterMapLimit(){
List<Entity> entities = Lists.newArrayList(new Entity(100), new Entity(12), new Entity(33), new Entity(41));
List<Integer> collect = entities.stream()
.filter(entity -> entity.getId() < 100)
.map(Entity::getId)
.collect(Collectors.toList());
System.out.println(collect);
}
這里,filter的參數就是一個謂詞,配合filter,可以篩選結果,只有返回值是true的item會通過。
去重復distinct
distinct()
截短流limit
limit(n)
跳過元素skip
skip(n)。 通過limit(n)形成互補關系。
映射map
map, stream的核心操作。接收一個參數,用來把一個對象轉換為另一個。demo同上。下面看具體需求。
/**
* Returns a stream consisting of the results of applying the given
* function to the elements of this stream.
*
* <p>This is an <a href="package-summary.html#StreamOps">intermediate
* operation</a>.
*
* @param <R> The element type of the new stream
* @param mapper a <a href="package-summary.html#NonInterference">non-interfering</a>,
* <a href="package-summary.html#Statelessness">stateless</a>
* function to apply to each element
* @return the new stream
*/
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
接收一個Function函數,然后返回Stream. 而Function在前面已經介紹過了。我們看核心的方法。
/**
* Represents a function that accepts one argument and produces a result.
*
* <p>This is a <a href="package-summary.html">functional interface</a>
* whose functional method is {@link #apply(Object)}.
*
* @param <T> the type of the input to the function
* @param <R> the type of the result of the function
*
* @since 1.8
*/
@FunctionalInterface
public interface Function<T, R> {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);
}
Function函數的功能就是把參數轉換成另一個類型的對象,返回。也就是a -> {return b;}。
瞥一眼Peek
上面map的需求特別多,但有時候我並不想返回另一個對象,我只是想要把原來的對象加工一個下,還是返回原來的對象。用map也是可以的,只要返回同一個對象就行。但IDEA會推薦用peek()。
比如,我想把list的user全部取出來,把updateDate更新為當前時間。
@Test
public void testPeek(){
final List<Integer> list = Lists.newArrayList(1,2,3,4);
List<Entity> collect = list.stream()
.map(Entity::new)
.peek(e -> e.setUpdateTime(new Date()))
.collect(Collectors.toList());
System.out.println(collect);
}
源碼里是這樣寫的
/**
* Returns a stream consisting of the elements of this stream, additionally
* performing the provided action on each element as elements are consumed
* from the resulting stream.
*
* <p>This is an <a href="package-summary.html#StreamOps">intermediate
* operation</a>.
*
* <p>For parallel stream pipelines, the action may be called at
* whatever time and in whatever thread the element is made available by the
* upstream operation. If the action modifies shared state,
* it is responsible for providing the required synchronization.
*
* @apiNote This method exists mainly to support debugging, where you want
* to see the elements as they flow past a certain point in a pipeline:
* <pre>{@code
* Stream.of("one", "two", "three", "four")
* .filter(e -> e.length() > 3)
* .peek(e -> System.out.println("Filtered value: " + e))
* .map(String::toUpperCase)
* .peek(e -> System.out.println("Mapped value: " + e))
* .collect(Collectors.toList());
* }</pre>
*
* @param action a <a href="package-summary.html#NonInterference">
* non-interfering</a> action to perform on the elements as
* they are consumed from the stream
* @return the new stream
*/
Stream<T> peek(Consumer<? super T> action);
而Consumer同樣也在之前出現過
@FunctionalInterface
public interface Consumer<T> {
/**
* Performs this operation on the given argument.
*
* @param t the input argument
*/
void accept(T t);
}
也就是說,peek()的本意是將對象取出來,消一遍,並不是像我的說的那樣返回原對象,因為參數並不是Function, 而是Consumer。我之所以這么說是因為Function也可以做到這個功能,只要將返回值變為當前對象即可。而peek里,我們可以修改當前對象的屬性,也是會生效的。
流的扁平化faltMap
我們前面講的函數都是處理一個序列,一個list,一個Stream里的數據。如果一個Stream的元素也是另一個stream呢?我還想把這個Stream的元素的stream打散,最終輸出一個stream。比如下面這個例子。統計單詞列表中出現的字母。
final List<String> words = Lists.newArrayList( "Hello", "worlds");
List<String[]> rs = words.stream()
.map(w -> w.split(""))
.distinct()
.collect(Collectors.toList());
rs.forEach(e -> {
for (String i : e) {
System.out.print(i + ",");
}
System.out.println();
});
打印的結果為:
H,e,l,l,o,
w,o,r,l,d,s,
顯然,目標沒達到。map之后的stream已經變成Stream<Stream<String>>。應該如何把里面的Stream打開,最后拼接起來呢。最直觀的想法就是用一個新的list,將我們剛才foreach打印的步驟中的操作變成插入list。但這顯然不是函數式編程。
flatMap可以接收一個參數,返回一個流,這個流可以拼接到最外層的流。說的太啰嗦,看具體用法。
@Test
public void flatMap() {
final List<String> words = Lists.newArrayList( "Hello", "worlds");
List<String> collect = words.stream()
.map(w -> w.split(""))
.flatMap(a -> Arrays.stream(a))
.distinct()
.collect(Collectors.toList());
System.out.println(collect);
}
- 第一步,用map將一個String對象映射成String[]數組。
- 第二步,將這個返回的對象映射成Stream,這里的數組轉Stream即
Arrays::stream. - 第三步,用flatMap
以上可以合並為一步: .flatMap(w -> Arrays.stream(w.split("")))
最終打印結果:
[H, e, l, o, w, r, d, s]
查找和匹配
另一個常見的數據處理套路是看看數據集中的某些元素是否匹配一個給定的屬性。Stream API通過allMatch, anyMatch,noneMatch,findFirst,findAny方法提供了這樣的工具。
比如,找到任何一個匹配條件的。
@Test
public void anyMatchTest() {
final List<Entity> entities = Lists.newArrayList(new Entity(101),
new Entity(12), new Entity(33), new Entity(42));
boolean b = entities.stream().anyMatch(e -> {
System.out.println(e.getId());
return e.getId() % 2 == 0;
});
if (b) {
System.out.println("有偶數");
}
}
101
12
有偶數
上述只是確定下是不是存在,在很多情況下這就夠了。至於FindAny和FindFirst則是找到后返回,目前還沒遇到使用場景。
歸約Reduce
Google搜索提出的Map Reduce模型,Hadoop提供了經典的開源實現。在Java中,我們也可以手動實現這個。

reduce的操作在函數式編程中很常見,作用是將一個歷史值與當前值做處理。比如求和,求最大值。
求和的時候,我們會將每個元素累加給sum。用reduce即可實現:
/**
* 沒有初始值,返回Optional
*/
@Test
public void demo(){
OptionalInt rs = IntStream.rangeClosed(1, 100)
.reduce((left, right) -> {
System.out.println(left + "\t" + right);
return left + right;
});
if (rs.isPresent()){
System.out.println("===========");
System.out.println(rs.getAsInt());
}
}
打印結果為:
1 2
3 3
6 4
...
...
4851 99
4950 100
===========
5050
給一個初始值
int rs = IntStream.rangeClosed(1, 100)
.reduce(10, (a, b) -> a + b);
同樣,可以用來求最大值。
List<Integer> nums = Lists.newArrayList(3, 1, 4, 0, 8, 5);
Optional<Integer> max = nums.stream().reduce((a, b) -> b > a ? b : a);
這里的比較函數恰好是Integer的一個方法,為增強可讀性,可以替換為:
nums.stream().reduce(Integer::max).ifPresent(System.out::println);
接下來,回歸我們最初的目標,實現偉大的Map-Reduce模型。比如,想要知道有多少個菜(一個dish list)。
@Test
public void mapReduce() {
final ArrayList<Dish> dishes = Lists.newArrayList(
new Dish("pork", false, 800, Type.MEAT),
new Dish("beef", false, 700, Type.MEAT),
new Dish("chicken", false, 400, Type.MEAT),
new Dish("french fries", true, 530, Type.OTHER),
new Dish("rice", true, 350, Type.OTHER),
new Dish("season fruit", true, 120, Type.OTHER),
new Dish("pizza", true, 550, Type.OTHER),
new Dish("prawns", false, 300, Type.FISH),
new Dish("salmon", false, 450, Type.FISH)
);
Integer sum = dishes.stream()
.map(d -> 1)
.reduce(0, (a, b) -> a + b);
}
歸約的優勢和並行化
相比於用foreach逐步迭代求和,使用reduce的好處在於,這里的迭代被內部迭代抽象掉了,這讓內部實現得以選擇並行執行reduce操作。而迭代式求和例子要更新共享變量sum,這不是那么容易並行化的。如果你加入了同步,很可能會發現線程競爭抵消了並行本應帶來的性能提升!這種計算的並行化需要另一種方法:將輸入分塊,分塊求和,最后再合並起來。但這樣的話代碼看起來就完全不一樣了。后面會用分支/合並框架來做這件事。但現在重要的是要認識到,可變的累加模式對於並行化來說是死路一條。你需要一種新的模式,這正是reduce所提供的。傳遞給reduce的lambda不能更改狀態(如實例變量),而且操作必須滿足結合律才可以按任意順序執行。
流操作的狀態:無狀態和有狀態
你已經看到了很多的流操作,乍一看流操作簡直是靈丹妙葯,而且只要在從集合生成流的時候把Stream換成parallelStream就可以實現並行。但這些操作的特性並不相同。他們需要操作的內部狀態還是有些問題的。
諸如map和filter等操作會從輸入流中獲取每一個元素,並在輸出流中得到0或1個結果。這些操作一般是無狀態的:他們沒有內部狀態(假設用戶提供的lambda或者方法引用沒有內部可變狀態)。
但諸如reduce、sum、max等操作需要內部狀態來累積結果。在前面的情況下,內部狀態很小。在我們的例子里就是一個int或者double。不管流中有多少元素要處理,內部狀態都是有界的。
相反,諸如sort或distinct等操作一開始都和filter和map差不多--都是接受一個流,再生成一個流(中間操作), 但有一個關鍵的區別。從流中排序和刪除重復項都需要知道先前的歷史。例如,排序要求所有元素都放入緩沖區后才能給輸出流加入一個項目,這一操作的存儲要求是無界的。要是流比較大或是無限的,就可能會有問題(把質數流倒序會做什么呢?它應當返回最大的質數,但數學告訴我們他不存在)。我們把這些操作叫做有狀態操作。
注
以上內容均來自《Java8 In Action》。
