簡介
Stream 流處理,首先要澄清的是 java8 中的 Stream 與 I/O 流 InputStream 和 OutputStream 是完全不同的概念。
Stream 機制是針對集合迭代器的增強。流允許你用聲明式的方式處理數據集合(通過查詢語句來表達,而不是臨時編寫一個實現)。
本文后半部分將拿 Stream 中查詢語句與我們熟悉的 SQL 查詢語句做一些類別,方便大家的理解和記憶。
創建對象流
創建對象流的三種方式:
- 由集合對象創建流。對支持流處理的對象調用 stream()。支持流處理的對象包括
Collection
集合及其子類
List<Integer> list = Arrays.asList(1,2,3);
Stream<Integer> stream = list.stream();
- 由數組創建流。通過靜態方法 Arrays.stream() 將數組轉化為流(Stream)
IntStream stream = Arrays.stream(new int[]{3, 2, 1});
通過靜態方法 Stream.of() ,但是底層其實還是調用 Arrays.stream()
Stream<Integer> stream = Stream.of(1, 2, 3);
注意:
還有兩種比較特殊的流
- 空流:Stream.empty()
- 無限流:Stream.generate() 和 Stream.iterate()。可以配合 limit() 使用可以限制一下數量
// 接受一個 Supplier 作為參數
Stream.generate(Math::random).limit(10).forEach(System.out::println);
// 初始值是 0,新值是前一個元素值 + 2
Stream.iterate(0, n -> n + 2).limit(10).forEach(System.out::println);
流處理的特性
- 不存儲數據
- 不會改變數據源
- 不可以重復使用
為了體現流的特性,我准備了一組對應的測試用例:
public class StreamFeaturesTest {
/**
* 流的簡單例子
*/
@Test
public void test1() {
List<Integer> list = Stream.of(1, 2, 5, 9, 7, 3).filter(val-> val> 2).sorted().collect(Collectors.toList());
for (Integer item : list) {
System.out.println(item);
}
}
/**
* 流不會改變數據源
*/
@Test
public void test2() {
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
list.add(1);
Assert.assertEquals(3, list.stream().distinct().count());
Assert.assertEquals(4, list.size());
}
/**
* 流不可以重復使用
*/
@Test(expected = IllegalStateException.class)
public void test3() {
Stream<Integer> integerStream = Stream.of(1, 2, 3);
Stream<Integer> newStream = integerStream.filter(val -> val > 2);
integerStream.skip(1);
}
}
首先,test1() 向我們展示了流的一般用法,由下圖可見,源數據流經管道,最后輸出結果數據。
然后,我們先看 test3(),源數組產生的流對象 integerStream 在調用 filter() 之后,數據立即流向了 newStream。
正因為流“不保存數據”的特性,所以重復利用 integerStream 再次調用 skip(1) 方法,會拋出一個 IllegalStateException 的異常:
java.lang.IllegalStateException: stream has already been operated upon or closed
所以說流不存儲數據,且流不可以重復使用。最后,我們來看 test2(),盡管我們對 list 對象生成的流 list.stream() 做了去重操作 distinct() ,但是並不影響源數據對象 list。
流處理的操作類型
Stream 的所有操作連起來組合成了管道,管道有兩種操作:
第一種,中間操作(intermediate)。調用中間操作方法返回的是一個新的流對象。
第二種,終值操作(terminal)。在調用該方法后,將執行之前所有的中間操作,並返回結果。
流處理的執行順序
為了更好地演示效果,我們首先要了解一下 Stream.peek() 方法, 這個方法和 Stream.forEach() 使用方法類似,都接受 Consumer
作為參數。
流操作方法 | 流操作類型 |
---|---|
peek() | 中間操作 |
forEach() | 終值操作 |
所以,我們可以用 peek 來證明流的執行順序。
我們定義一個 Apple 對象:
public class Apple {
private int id; // 編號
private String color; // 顏色
private int weight; // 重量
private String birthplace; // 產地
public Apple(int id, String color, int weight, String birthplace) {
this.id = id;
this.color = color;
this.weight = weight;
this.birthplace = birthplace;
}
// getter/setter 省略
}
然后創建多個蘋果放到 appleStore 中
public class StreamTest {
private static final List<Apple> appleStore = Arrays.asList(
new Apple(1, "red", 500, "湖南"),
new Apple(2, "red", 100, "天津"),
new Apple(3, "green", 300, "湖南"),
new Apple(4, "green", 200, "天津"),
new Apple(5, "green", 100, "湖南")
);
public static void main(String[] args) {
appleStore.stream().filter(apple -> apple.getWeight() > 100)
.peek(apple -> System.out.println("通過第1層篩選 " + apple))
.filter(apple -> "green".equals(apple.getColor()))
.peek(apple -> System.out.println("通過第2層篩選 " + apple))
.filter(apple -> "湖南".equals(apple.getBirthplace()))
.peek(apple -> System.out.println("通過第3層篩選 " + apple))
.collect(Collectors.toList());
}
}
測試結果如下:
以上測試例子的執行順序示意圖:
總之,執行順序會走一個“之”字形。
注意:
如果我們注釋掉 .collect(Collectors.toList()), 我們會發現一行語句也不會打印出來。
這剛好證明了:
通過連續執行多個操作倒便就組成了 Stream 中的執行管道(pipeline)。需要注意的是這些管道被添加后並不會真正執行,只有等到調用終值操作之后才會執行。
用流收集數據與 SQL 統計函數
Collector 被指定和四個函數一起工作,並實現累加 entries 到一個可變的結果容器,並可選擇執行該結果的最終變換。 這四個函數就是:
接口函數 | 作用 | 返回值 |
---|---|---|
supplier() | 創建並返回一個新的可變結果容器 | Supplier |
accumulator() | 把輸入值加入到可變結果容器 | BiConsumer |
combiner() | 將兩個結果容器組合成一個 | BinaryOperator |
finisher() | 轉換中間結果為終值結果 | Function |
Collectors 則是重要的工具類,提供給我一些 Collector 實現。
Stream 接口中 collect() 就是使用 Collector 做參數的。
其中,collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner)
無非就是比 Collector 少一個 finisher,本質上是一樣的!
遍歷在傳統的 javaEE 項目中數據源比較單一而且集中,像這類的需求都我們可能通過關系數據庫中進行獲取計算。
現在的互聯網項目數據源成多樣化有:關系數據庫、NoSQL、Redis、mongodb、ElasticSearch、Cloud Server 等。這時就需我們從各數據源中匯聚數據並進行統計。
Stream + Lambda的組合就是為了讓 Java 語句更像查詢語句,取代繁雜的 for 循環。
我們設計一下建表語句
CREATE TABLE `applestore` (
`id` INT NOT NULL AUTO_INCREMENT COMMENT '編號',
`color` VARCHAR (50) COMMENT '顏色',
`weight` INT COMMENT '重量',
`birthplace` VARCHAR (50) COMMENT '產地',
PRIMARY KEY (`id`)
) COMMENT = '水果商店';
另外還有數據初始化語句
INSERT INTO applestore VALUES (1, "red", 500,"湖南");
INSERT INTO applestore VALUES (2, "red", 100,"湖南");
INSERT INTO applestore VALUES (3, "green", 300, "湖南");
INSERT INTO applestore VALUES (4, "green", 200, "天津");
INSERT INTO applestore VALUES (5, "green", 100, "湖南");
測試用例:
public class StreamStatisticsTest {
List<Apple> appleStore;
@Before
public void initData() {
appleStore = Arrays.asList(
new Apple(1, "red", 500, "湖南"),
new Apple(2, "red", 100, "天津"),
new Apple(3, "green", 300, "湖南"),
new Apple(4, "green", 200, "天津"),
new Apple(5, "green", 100, "湖南")
);
}
@Test
public void test1() {
Integer weight1 = appleStore.stream().collect(Collectors.summingInt(apple -> apple.getWeight()));
System.out.println(weight1);
Integer weight2 = appleStore.stream().collect(Collectors.summingInt(Apple::getWeight));
System.out.println(weight2);
}
}
求和
- Collectors.summingInt()
- Collectors.summingLong()
- Collectors.summingDouble()
通過引用 import static java.util.stream.Collectors.summingInt;
就可以直接調用 summingInt()
Apple::getWeight() 可以寫為 apple -> apple.getWeight(),求和函數的參數是結果轉換函數 Function
求平均值
- Collectors.averagingInt()
- Collectors.averagingKLong()
- Collectors.averagingDouble()
歸約
- Collectors.reducing()
@Test
public void reduce() {
Integer sum = appleStore.stream().collect(reducing(0, Apple::getWeight, (a, b) -> a + b));
System.out.println(sum);
}
- 歸約就是為了遍歷數據容器,將每個元素對象轉換為特定的值,通過累積函數,得到一個最終值。
- 轉換函數,函數輸入參數的對象類型是跟 Stream<T> 中的 T 一樣的對象類型,輸出的對象類型的是和初始值一樣的對象類型
- 累積函數,就是把轉換函數的結果與上一次累積的結果進行一次合並,如果是第一次累積,那么取初始值來計算
累積函數還可以作用於兩個 Stream<T> 合並時的累積,這個可以結合 groupingBy 來理解 - 初始值的對象類型,和每一次累積函數輸出值的對象類型是相同的,這樣才能一直進行累積函數的運算。
- 歸約不僅僅可以支持加法,還可以支持比如乘法以及其他更高級的累積公式。
計數只是歸約的一種特殊形式
- Collectors.counting(): 初始值為 0,轉換函數 f(x)=1(x 就是 Stream<T> 的 T 類型),累積函數就是“做加法”
分組
- Collectors.groupingBy()
分組就和 SQL 中的 GROUP BY 十分類似,所以 groupingBy() 的所有參數中有一個參數是 Collector接口,這樣就能夠和 求和/求平均值/歸約 一起使用。
- 傳入參數的接口是 Function 接口,實現這個接口可以是實現從 A 類型到 B 類型的轉換
- 其中有一個方法可以傳入參數
Supplier mapFactory
,這個可以通過自定義 Map工廠,來創建自定義的分組 Map
分區只是分組的一種特殊形式
- Collectors.partitioningBy() 傳入參數的是 Predicate 接口,
- 分區相當於把流中的數據,分組分成了“正反兩個陣營”
數值流
我們之前在求和時用到的例子,appleStore.stream().collect(summingInt(Apple::getWeight))
,我就被 IDEA 提醒:
appleStore.stream().collect(summingInt(Apple::getWeight))
The 'collect(summingInt())' can be replaced with 'mapToInt().sum()'
這就告訴我們可以先轉化為數值流,然后再用 IntStream 做求和。Java8引入了三個原始類型特化流接口:IntStream,LongStream,DoubleStream,分別將流中的元素特化為 int,long,double。
普通對象流和原始類型特化流之間可以相互轉化
- 其中 IntStream 和 LongStream 可以調用 asDoubleStream 變為 DoubleStream,但是這是單向的轉化方法。
- IntStream#boxed() 可以得到 Stream<Integer> ,這個也是一個單向方法,支持數值流轉換回對象流,LongStream 和 DoubleStream 也有類似的方法。
生成一個數值流
- IntStream.range(int startInclusive, int endExclusive)
- IntStream.rangeClosed(int startInclusive, int endInclusive)
- range 和 rangeClosed 的區別在於數值流是否包含 end 這個值。range 代表的區間是 [start, end) , rangeClosed 代表的區間是 [start, end]
- LongStream 也有 range 和 rangeClosed 方法,但是 DoubleStream 沒有!
flatMap
- Stream.flatMap 就是流中的每個對象,轉換產生一個對象流。
- Stream.flatMapToInt 指定流中的每個對象,轉換產生一個 IntStream 數值流;類似的,還有 flatMapToLong,flatMapToDouble
- IntStream.flatMap 數值流中的每個對象,轉換產生一個數值流
flatMap 可以代替一些嵌套循環來開展業務:
比如我們要求勾股數(即 aa+bb=c*c 的一組數中的 a,b,c),且我們要求 a 和 b 的范圍是 [1,100],我們在 Java8之前會這樣寫:
@Test
public void testJava() {
List<int[]> resultList = new ArrayList<>();
for (int a = 1; a <= 100; a++) {
for (int b = a; b <= 100; b++) {
double c = Math.sqrt(a * a + b * b);
if (c % 1 == 0) {
resultList.add(new int[]{a, b, (int) c});
}
}
}
int size = resultList.size();
for (int i = 0; i < size && i < 5; i++) {
int[] a = resultList.get(i);
System.out.println(a[0] + " " + a[1] + " " + a[2]);
}
}
Java8之后,我們可以用上 flatMap:
@Test
public void flatMap() {
Stream<int[]> stream = IntStream.rangeClosed(1, 100)
.boxed()
.flatMap(a -> IntStream.rangeClosed(a, 100)
.filter(b -> Math.sqrt(a * a + b * b) % 1 == 0)
.mapToObj(b -> new int[]{a, b, (int) Math.sqrt(a * a + b * b)})
);
stream.limit(5).forEach(a -> System.out.println(a[0] + " " + a[1] + " " + a[2]));
}
創建一個從 1 到 100 的數值范圍來創建 a 的值。對每個給定的 a 值,創建一個三元數流。
flatMap 方法在做映射的同時,還會把所有生成的三元數流扁平化成一個流。
總結
- Stream 主要包括對象流和數值流兩大類
- Stream.of() , Arrays.stream() , Collection#stream() , Stream.generate() , Stream.iterate() 方法創建對象流
- IntStream.range() 和 IntStream.rangeClosed() 可以創建數值流,對象流和數值流可以相互轉換
- Collector 收集器接口,可以實現歸約,統計函數(求和,求平均值,最大值,最小值),分組等功能
- 流的執行,需要調用終值操作。流中每個元素執行到不能繼續執行下去,才會轉到另一個元素執行。而不是分階段迭代數據容器中的所有元素!
- flatMap 可以給流中的每個元素生成一個對應的流,並且扁平化為一個流