Stream
Stream是在Java SE 8 API添加的用於增強集合的操作接口,可以讓你以一種聲明的方式處理集合數據。將要處理的集合看作一種流的創建者,將集合內部的元素轉換為流並且在管道中傳輸, 並且可以在管道的節點上進行處理, 比如篩選,排序,聚合等。元素流在管道中經過中間操作(intermediate operation)的處理,最后由最終操作(terminal operation)得到前面處理的結果。Stream的繼承關系圖如下,且容我慢慢抽絲剝繭細細道來。
過濾,轉換,聚合,歸約
Stream.of("one", "two", "three", "four")
.filter(e -> e.length() > 3)
.peek(e -> System.out.println("Filtered value: " + e))
.map(String::toUpperCase)
.peek(e -> System.out.println("Mapped value: " + e))
.collect(Collectors.toList());
復制代碼
在沒有Stream之前,我們對集合數據的處理到多是外部遍歷,然后做數據的聚合用算,排序,merge等等。這屬於OO思想,在引入Java SE 8引入FP之后,FP的操作可以提高Java程序員的生產力,,基於類型推斷的lambda表達式可以 讓程序員寫出高效率、干凈、簡潔的代碼。可以避免冗余的代碼。根據給定的集合操作通過
stream()
方法創建初始流,配合map()
,flatMap()
,filter()
對集合數據進行過濾,轉換。api調用我這里就不多說了。直接從源碼入手,看上圖最核心的就是類為AbstractPipeline
,ReferencePipeline
和Sink
接口.AbstractPipeline
抽象類是整個Stream中流水線的高度抽象了源頭sourceStage
,上游previousStage
,下游nextStage
,定義evaluate
結束方法,而ReferencePipeline
則是抽象了過濾,轉換,聚合,歸約等功能,每一個功能的添加實際上可以理解為卷心菜,菜心就是源頭,每一次加入一個功能就相當於重新長出一片葉子包住了菜心,最后一個功能集成完畢之后整顆卷心菜就長大了。而Sink
接口呢負責把整個流水線串起來,然后在執行聚合,歸約時候調AbstractPipeline
抽象類的evaluate
結束方法,根據是否是並行執行,調用不同的結束邏輯,如果不是並行方法則執行terminalOp.evaluateSequential
否則就執行terminalOp.evaluateParallel
,非並行執行模式下則是執行的是AbstractPipeline
抽象類的wrapAndCopyInto
方法去調用copyInto
,調用前會先執行一下wrapSink
,用於剝開這個我們在流水線上產生的卷心菜。從下游向上游去遍歷AbstractPipeline
,然后包裝到Sink,然后在copyInto
方法內部迭代執行對應的方法。最后完成調用,
並行執行實際上是構建一個ForkJoinTask
並執行invoke
去提交到ForkJoinPool
線程池。
BaseStream
流的基本接口,該接口制定流可以支持無序,順序,並行的,Stream實現了BaseStream接口。
-
Iterator iterator();
外部迭代器
-
Spliterator spliterator();
用於創建一個內部迭代器
-
isParallel
用於判斷該stream是否是並行的
-
S sequential();
標識該stream創建是順序執行的
-
S parallel();
標識該stream創建是並行的,需要使用
ForkJoinPool
-
S unordered();
標識該stream創建是無序的
-
S onClose(Runnable closeHandler);
當stream關閉的時候執行一個方法回調去關閉流。
PipelineHelper
該抽象類主要定義了操作管道的核心方法,並且能收集到流管道內的所有信息。如通過
TerminalOp#evaluateParallel
用於執行並行流操作,通過TerminalOp#evaluateSequential
執行順序流的操作。
-
abstract StreamShape getSourceShape();
用於定義該流的中元素的原型,返回一個枚舉值,用於切片操作
limit
或者skip
枚舉值取值范圍 {REFERENCE:引用類型元素,INT_VALUE:int類型元素,LONG_VALUE:long類型元素,DOUBLE_VALUE:double類型元素}
-
abstract int getStreamAndOpFlags();
用於獲取流的中元素的原型和所有操作的組合,
Stream
中所有的定義流類型和操作的指令都包含在`StreamOpFlag``枚舉類中。先看下補碼 掩碼的運算位掩碼的常用CRUD操作 a&~b: 清除標志位b; a|b : 添加標志位b; a&b : 取出標志位b; a^b : 取出a與b的不同部分; 下面是對應流的標志位對應的表。 /* * Characteristics belong to certain types, see the Type enum. Bit masks for * the types are constructed as per the following table: * * DISTINCT SORTED ORDERED SIZED SHORT_CIRCUIT * SPLITERATOR 01 01 01 01 00 * STREAM 01 01 01 01 00 * OP 11 11 11 10 01 * TERMINAL_OP 00 00 10 00 01 * UPSTREAM_TERMINAL_OP 00 00 10 00 00 * * 01 = set/inject SET_BITS=0b01設置指令 * 10 = clear CLEAR_BITS=0b10清除指令 * 11 = preserve PRESERVE_BITS=0b11保存指令 */ 構造函數 private StreamOpFlag(int position, MaskBuilder maskBuilder) { this.maskTable = maskBuilder.build(); // Two bits per flag position *= 2; this.bitPosition = position; this.set = SET_BITS << position; this.clear = CLEAR_BITS << position; this.preserve = PRESERVE_BITS << position; } 復制代碼
-
StreamOpFlag.DISTINCT
DISTINCT(0,set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP))
output:StreamOpFlag.DISTINCT: StreamOpFlag(maskTable={SPLITERATOR=1, STREAM=1, OP=3, TERMINAL_OP=0, UPSTREAM_TERMINAL_OP=0}, bitPosition=0, set=1, clear=2, preserve=3)
ok,我們知道了StreamOpFlag.DISTINCT的[設置]偏移位是1,16進制表示:0x00000001。當getStreamAndOpFlags返回的包含
IS_DISTINCT
也就是0x00000001就表示對於流中遇到的X,Y元素{@code!X.equals(Y)}。對應的是包含Spliterator.DISTINCT
,標識該stream已經是distinct的了。-
StreamOpFlag.SIZED
SIZED(3, set(Type.SPLITERATOR).set(Type.STREAM).clear(Type.OP))
output:StreamOpFlag.SIZED: StreamOpFlag(maskTable={SPLITERATOR=1, STREAM=1, OP=2, TERMINAL_OP=0, UPSTREAM_TERMINAL_OP=0}, bitPosition=6, set=64, clear=128, preserve=192)【0x00000040】->[Spliterator.SIZED]
表示遍歷或拆分前從
estimateSize()
返回的值的特征值表示一個有限大小,在沒有修改源結構的情況下,該值表示完整遍歷流中元素數量的精確值,如果stream沒有SIZED|SUBSIZED屬性,則可以將estimateSize返回為Long.MAX_VALUE.這說明這個stream的estimateSize計算很復雜或本身就是一個infinite的steam流。這樣設置后,性能上會差一些,但是,不會對sorted方法產生影響。如果要對流進行並行操作,實現自定義的Spliterator
時,則需要重寫trySplit()
方法和long estimateSize()
方法。通過拆分Spliterator加入fork/join線程池中,然后實現並行的處理。-
StreamOpFlag.SORTED
SORTED(1, set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP))
output:StreamOpFlag.SORTED: StreamOpFlag(maskTable={SPLITERATOR=1, STREAM=1, OP=3, TERMINAL_OP=0, UPSTREAM_TERMINAL_OP=0}, bitPosition=2, set=4, clear=8, preserve=12) 【0x00000004】->[Spliterator.SORTED]
表示流里順序遵循定義的排序順序。如果包含該屬性,方法
getComparator()
返回關聯的比較器,或者返回null,如果設置了該屬性並且,方法getComparator()
返回null,這表明改流已經排好序了,如果方法getComparator()
返回不為null,那么在fromCharacteristics
方法處,該SORTED屬性會被取消掉。如果流里面的所有元素都是實現了Comparable,那排序順序就是按它們的自然順序,在sorted(x->{...})
方法執行可以傳一個lambda進去。如果有值傳輸進去,那么都回按照該lambda對該流進行排序-
StreamOpFlag.ORDERED
ORDERED(2, set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP).clear(Type.TERMINAL_OP) .clear(Type.UPSTREAM_TERMINAL_OP))
output:StreamOpFlag.ORDERED: StreamOpFlag(maskTable={SPLITERATOR=1, STREAM=1, OP=3, TERMINAL_OP=2, UPSTREAM_TERMINAL_OP=2}, bitPosition=4, set=16, clear=32, preserve=48)【0x00000010】->[Spliterator.ORDERED]
表示該流中的元素已經定義了順序。包含了ORDERED屬性,是拆分器保證
trySplit
拆分元素的強制前置條件,tryAdvance
方法也會按定義了的順序逐個元素進行拆分,forEachRemaining
方法也按定義了的順序執行內部迭代操作。一般集合的順序是升序。但是對於基於哈希的集合,例如HashSet,不保證順序。所以應該在不進行交換場景的並行計算中強制保證排序約束。-
StreamOpFlag.SHORT_CIRCUIT
SHORT_CIRCUIT(12, set(Type.OP).set(Type.TERMINAL_OP))
output:StreamOpFlag.SHORT_CIRCUIT: StreamOpFlag(maskTable={SPLITERATOR=0, STREAM=0, OP=1, TERMINAL_OP=1, UPSTREAM_TERMINAL_OP=0}, bitPosition=24, set=16777216, clear=33554432, preserve=50331648)【0x01000000】->[表示操作可能使流短路]
表示操作可能使流短路
-
-
abstract<P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
將此時間的管道內的元素應用到提供的
Spliterator
,並將結果發送到提供的接收器sink里
- abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);
用於輸出返回值的大小。
- abstract<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
用於將從
Spliterator
獲得的元素推入提供的接收器中Sink
。如果已知流管道中有短路階段(包含StreamOpflag#SHORT_CURRENT),則在每個元素之后執行一下Sink#cancellationRequested()
,如果返回請求true,則執行終止。這個方法被實現之后需要遵守Sink的協議即:Sink#begin->Sink#accept->Sink->end
- abstract <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
用於將從
Spliterator
獲得的元素推入提供的接收器中Sink
。在每個元素之后執行一下Sink#cancellationRequested()
,如果返回請求true,則執行終止。這個方法被實現之后需要遵守Sink的協議即:Sink#begin->Sink#accept->Sink->end
- abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);
該方法主要用於包裝sink,從下游向上游去遍歷
AbstractPipeline
,然后包裝到一個Sink內,用於然后在copyInto
方法內部迭代執行對應的方法。
- abstract Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown,IntFunction<P_OUT[]> generator);
用於構造一個節點Builder,轉換為數組去處理數組類型和PipelineHelper定義的輸出類型一樣。
- abstract<P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> spliterator,boolean flatten,IntFunction<P_OUT[]> generator);
該方法將源拆分器應用到管道內的所有元素。針對數組處理。如果管道沒有中間(
filter,map
)操作,並且源由一個節點支持(源頭),則該節點將被返回(內部遍歷然后返回)。這減少了由有狀態操作和返回數組的終端操作組成的管道的復制.例如:stream.sorted().toArray();該方法對應到AbstractPipeline
內部,代碼如下:
@Override
@SuppressWarnings("unchecked")
final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
boolean flatten,
IntFunction<E_OUT[]> generator) {
if (isParallel()) {
// @@@ Optimize if op of this pipeline stage is a stateful op
return evaluateToNode(this, spliterator, flatten, generator);
}
else {
Node.Builder<E_OUT> nb = makeNodeBuilder(
exactOutputSizeIfKnown(spliterator), generator);
return wrapAndCopyInto(nb, spliterator).build();
}
}
復制代碼
AbstractPipeline
“管道”類的抽象基類,是流接口及其原始專門化的核心實現。用來表示流管道的初始部分,封裝流源和零個或多個中間操作。對於順序流和沒有狀態中間操作的並行流、並行流,管道中數據的處理是在一次“阻塞”所有操作的過程中完成的也就是最后才去處理。對於具有狀態操作的並行流,執行被分成多個段,其中每個狀態操作標記一個段的結束,每個段被單獨評估,結果被用作下一個段的輸入。上述所有情況,都是達到終端操作才開始處理源數據。
- AbstractPipeline(Supplier> source,
int sourceFlags, boolean parallel)
創建源Source stage 第一個參數指定一個Supplier接口(工廠模式,只能生成Spliterator<?>的對象,根據傳入的lambda實現,
<? extends Spliterator<?
泛型的PECS原則了解一下。)
- AbstractPipeline(Spliterator<?> source,
int sourceFlags, boolean parallel)
創建源Source stage 第一個參數制定這個拆分器,和上面的構造方式一樣,直接分析一下這個方法:
AbstractPipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSpliterator = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
復制代碼
創建Stream 源階段的時候
previousStage
為null
,this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
用於設置當前階段的標識位。this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
添加源階段的對流的操作標識,這個combinedFlags
是流在整個管道內部所有操作的合集,在最后的規約操作的時候去解析出來。
- AbstractPipeline(AbstractPipeline previousStage, int opFlags)
根據上游創建下游
Pipeline
。
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
復制代碼
this.sourceStage = previousStage.sourceStage;
,用於上游和下游關聯,this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
將上游的操作標識位添加到本階段的操作標識位中。depth
記錄整個管道的中間操作數。
- final R evaluate(TerminalOp<E_OUT, R> terminalOp)
進行終端匯聚計算。執行最終的計算,得到結果,根據是否是並行執行,調用不同的結束邏輯,如果不是並行方法則執行
terminalOp.evaluateSequential
否則就執行terminalOp.evaluateParallel
。
- final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator)
處理流轉換數組。
final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
if (isParallel() && previousStage != null && opIsStateful()) {
depth = 0;
return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
}
else {
return evaluate(sourceSpliterator(0), true, generator);
}
}
復制代碼
轉換數組的時候,如果是並行流並且不是源階段,而且調用過
sorted
||limit
||skip
||distinct
這些有狀態的操作之后,這里是個模版方法調用。實際上是通過調用DistinctOps
||SortedOps
||SliceOps
這些實現的opEvaluateParallel
方法,提交到ForkJoin線程池來轉換數組。串行執行的時候直接執行evaluate(sourceSpliterator(0), true, generator);
- evaluate(sourceSpliterator(0), true, generator);
具體的執行方法,用於吧管道內部的輸出結果放到Node中。
@Override
@SuppressWarnings("unchecked")
final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
boolean flatten,
IntFunction<E_OUT[]> generator) {
if (isParallel()) {
// @@@ Optimize if op of this pipeline stage is a stateful op
return evaluateToNode(this, spliterator, flatten, generator);
}
else {
Node.Builder<E_OUT> nb = makeNodeBuilder(
exactOutputSizeIfKnown(spliterator), generator);
return wrapAndCopyInto(nb, spliterator).build();
}
}
@Override
final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,
Spliterator<P_IN> spliterator,
boolean flattenTree,
IntFunction<P_OUT[]> generator) {
return Nodes.collect(helper, spliterator, flattenTree, generator);
}
// Nodes.collect方法
public static <P_IN, P_OUT> Node<P_OUT> collect(PipelineHelper<P_OUT> helper,
Spliterator<P_IN> spliterator,
boolean flattenTree,
IntFunction<P_OUT[]> generator) {
long size = helper.exactOutputSizeIfKnown(spliterator);
if (size >= 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
if (size >= MAX_ARRAY_SIZE)
throw new IllegalArgumentException(BAD_SIZE);
P_OUT[] array = generator.apply((int) size);
new SizedCollectorTask.OfRef<>(spliterator, helper, array).invoke();
return node(array);
} else {
Node<P_OUT> node = new CollectorTask.OfRef<>(helper, generator, spliterator).invoke();
return flattenTree ? flatten(node, generator) : node;
}
}
復制代碼
如果是源是並行流的情況,以
ReferencePipeline
引用管道來看主要執行的是return Nodes.collect(helper, spliterator, flattenTree, generator);
,該collect方法內部根據切割器有無Spliterator.SUBSIZED
確定了生成的Node的長度,主要工作是創建一個Task提交到線程池。然后調用invoke拿到結果。示例代碼Arrays.asList("2","22","222").parallelStream().skip(2).toArray();
整個流程如下:
串行執行示例代碼Arrays.asList("2","22","222").stream().skip(2).toArray();
整個流程如下:
- final Spliterator<E_OUT> sourceStageSpliterator()
獲取Stream源頭設置的拆分器,如果設置有則返回並且把源拆分器置空,如果有Supplier則調用get方法返回拆分器並且把源拆分器置空。
- public final S sequential()
設置為串行流 ,設置源的paraller屬性為false。終態方法不允許重寫
- public final S sequential()
設置為並行流 ,設置源的paraller屬性為true。終態方法不允許重寫
- public void close()
關閉管道的方法,在關閉的時候會把管道使用標志設置為false,拆分器設置為null,如果源的回調關閉Job存在不為null時則invoker這個回調Job。
- public S onClose(Runnable closeHandler)
用於注冊關閉的回調job,在調用close的時候用於去執行這個回調job。
- public Spliterator<E_OUT> spliterator()
和
sourceStageSpliterator
方法一樣的功能,只不過不是終態方法,可以重寫用於自定義的拓展。
- public final boolean isParallel()
用於盤帶你當前管道是否是並行流。
- final int getStreamFlags()
獲取流的標志和Stream的包含的所有操作。
- private Spliterator<?> sourceSpliterator(int terminalFlags) {
獲取源拆分器,和
sourceStageSpliterator
方法一樣的功能,針對是並行流時候,並且是創建Stream階段的話有中間狀態,會組合流標志和操作構建拆分器。如果傳入的操作碼不等於0,那么則添加到拆分器的操作碼中。
- final StreamShape getSourceShape()
輸出Stream源的類型。(引用 OR int OR Double OR Long)
- final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator)
獲取期望的size,如果拆分器如果有SIZE標志,調用拆分器的getExactSizeIfKnown方法,否則返回-1。
- final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator)
封裝整個管道的階段,包裝在Sink中。把每一個階段串聯起來。包裝在Sink內部的
downstream
.
wrapAndCopyInto代碼執行流程如下:
看完三件事❤️
如果你覺得這篇內容對你還蠻有幫助,我想邀請你幫我三個小忙:
-
點贊,轉發,有你們的 『點贊和評論』,才是我創造的動力。
-
關注公眾號 『 java爛豬皮 』,不定期分享原創知識。
-
同時可以期待后續文章ing🚀