流(Stream)僅僅代表着數據流,並沒有數據結構,所以他遍歷完一次之后便再也無法遍歷(這點在編程時候需要注意,不像Collection,遍歷多少次里面都還有數據),它的來源可以是Collection、array、io等等。
3.1中間與終點方法
流作用是提供了一種操作大數據接口,讓數據操作更容易和更快。它具有過濾、映射以及減少遍歷數等方法,這些方法分兩種:中間方法和終端方法,“流”抽象天生就該是持續的,中間方法永遠返回的是Stream,因此如果我們要獲取最終結果的話,必須使用終點操作才能收集流產生的最終結果。區分這兩個方法是看他的返回值,如果是Stream則是中間方法,否則是終點方法。具體請參照Stream的api。
簡單介紹下幾個中間方法(filter、map)以及終點方法(collect、sum)
3.1.1Filter
在數據流中實現過濾功能是首先我們可以想到的最自然的操作了。Stream接口暴露了一個filter方法,它可以接受表示操作的Predicate實現來使用定義了過濾條件的lambda表達式。
1 List persons = …
2 Stream personsOver18 = persons.stream().filter(p -> p.getAge() > 18);//過濾18歲以上的人
3.1.2Map
假使我們現在過濾了一些數據,比如轉換對象的時候。Map操作允許我們執行一個Function的實現(Function<T,R>的泛型T,R分別表示執行輸入和執行結果),它接受入參並返回。首先,讓我們來看看怎樣以匿名內部類的方式來描述它:
1
2
3
4
5
6
7
8
9
|
1 Stream adult= persons 2 .stream() 3 .filter(p -> p.getAge() > 18) 4 .map(new Function() { 5 @Override 6 public Adult apply(Person person) { 7 return new Adult(person);//將大於18歲的人轉為成年人 8 } 9 });
|
現在,把上述例子轉換成使用lambda表達式的寫法:
1
2
3
|
Stream map = persons.stream()
.filter(p -> p.getAge() >
18
)
.map(person ->
new
Adult(person));
|
3.1.3Count
count方法是一個流的終點方法,可使流的結果最終統計,返回int,比如我們計算一下滿足18歲的總人數
1
2
3
4
|
int
countOfAdult=persons.stream()
.filter(p -> p.getAge() >
18
)
.map(person ->
new
Adult(person))
.count();
|
3.1.4Collect
collect方法也是一個流的終點方法,可收集最終的結果
1
2
3
4
|
List adultList= persons.stream()
.filter(p -> p.getAge() >
18
)
.map(person ->
new
Adult(person))
.collect(Collectors.toList());
|
或者,如果我們想使用特定的實現類來收集結果:
1
2
3
4
5
|
List adultList = persons
.stream()
.filter(p -> p.getAge() >
18
)
.map(person ->
new
Adult(person))
.collect(Collectors.toCollection(ArrayList::
new
));
|
篇幅有限,其他的中間方法和終點方法就不一一介紹了,看了上面幾個例子,大家明白這兩種方法的區別即可,后面可根據需求來決定使用。
3.2順序流與並行流
每個Stream都有兩種模式:順序執行和並行執行。
順序流:
1
|
List <Person> people = list.getStream.collect(Collectors.toList());
|
並行流:
1
|
List <Person> people = list.getStream.parallel().collect(Collectors.toList());
|
顧名思義,當使用順序方式去遍歷時,每個item讀完后再讀下一個item。而使用並行去遍歷時,數組會被分成多個段,其中每一個都在不同的線程中處理,然后將結果一起輸出。
3.2.1並行流原理:
1
2
3
4
5
6
|
List originalList = someData;
split1 = originalList(
0
, mid);
//將數據分小部分
split2 = originalList(mid,end);
new
Runnable(split1.process());
//小部分執行操作
new
Runnable(split2.process());
List revisedList = split1 + split2;
//將結果合並
|
大家對hadoop有稍微了解就知道,里面的 MapReduce 本身就是用於並行處理大數據集的軟件框架,其 處理大數據的核心思想就是大而化小,分配到不同機器去運行map,最終通過reduce將所有機器的結果結合起來得到一個最終結果,與MapReduce不同,Stream則是利用多核技術可將大數據通過多核並行處理,而MapReduce則可以分布式的。
3.2.2順序與並行性能測試對比
如果是多核機器,理論上並行流則會比順序流快上一倍,下面是測試代碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
long
t0 = System.nanoTime();
//初始化一個范圍100萬整數流,求能被2整除的數字,toArray()是終點方法
int
a[]=IntStream.range(
0
, 1_000_000).filter(p -> p %
2
==
0
).toArray();
long
t1 = System.nanoTime();
//和上面功能一樣,這里是用並行流來計算
int
b[]=IntStream.range(
0
, 1_000_000).parallel().filter(p -> p %
2
==
0
).toArray();
long
t2 = System.nanoTime();
//我本機的結果是serial: 0.06s, parallel 0.02s,證明並行流確實比順序流快
System.out.printf(
"serial: %.2fs, parallel %.2fs%n"
, (t1 - t0) * 1e-
9
, (t2 - t1) * 1e-
9
);
|