使用Stream已經快3年了,但是從未真正深入研究過Stream的底層實現。
今天開始把最近學到的Stream原理記錄一下。
本篇文章簡單描述一下自己對pipeline的理解。
基於下面一段代碼:
public static void main(String[] args) {
List<String> list = Arrays.asList("123", "123123");
list.stream().map(item -> item+"").forEach(System.out::print);
}
1. stream()方法
顯然,這里的list對象是一個ArrayList實例,debug代碼進入stream方法,可以看見進入到Collection.java
類中的stream()
中
這里的源碼如下:
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
關於分割迭代器的內容會在另外一篇文章詳解,這里不再贅述。
進入StreamSupport.stream()
方法:
StreamSupport.java
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
咱們可以看到Stream是一個ReferencePipeline.Head
類的實例,
通過idea的類圖結構功能,我們可以看到下面這個層次結構:
所有的流基本都是來自於BaseStream
,AbstractPipeline
,ReferencePipeline
這三個抽象類或接口。
ReferencePipeline的實現類一共就三種:
- Head
- StatelessOp
- StatefulOp
查看了源碼即可知道:AbstractPipeline其實就是一個雙向鏈表中的一個節點。【我是這么理解的】
Head:代表的是流的源頭,剛構建的時候流的一些屬性被包含在這個對象。比如這個集合的元素,畢竟流的存在還是為了對一組元素的操作。
StatelessOp:代表的是無狀態的操作,如map()
StatefulOp:代表的是有狀態的操作,如sorted()
圖中的每個節點都是一個AbstractPipeline的實現。
所以stream()方法執行之后,拿到的是一個ReferencePipeline.Head
實例,並沒有構建StatelessOp
,StatefulOp
實例。
2. map()方法
因為stream方法返回值是一個Head實例,而Head類並未重寫map方法,所以map方法的實際執行還是走的ReferencePipeline類的map方法,如下:
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
這里的返回是一個繼承於StatelessOp的匿名類。
關於Sink
和TerminalOp
的詳解后續會單獨開文章分析。
這里只需要理解這個map的返回值是一個繼承於StatelessOp的匿名類。(StatelessOp是一個ReferencePipeline的實現)
3. forEach()方法
前提:流是含有流源的對象,並且它支持0個或多個中間操作,1個終止操作的特性。
通過idea查看發現foreach的實現有2個:
第一個是Head的實現,因為流源構造出來之后,直接調用forEach,有它自己的實現,對迭代做了優化。這里可后續添加細致分析。
第二個是ReferencePipeline的實現,即調用終止操作的節點不是流源節點。
我們這里只分析ReferencePipeline
中的實現:
public void forEach(Consumer<? super P_OUT> action) {
/**
* ForEachOps.makeRef(action, false) 是構建終止操作,參考3.1
* evaluate()是觸發終止操作的調用,參考3.2
*/
evaluate(ForEachOps.makeRef(action, false));
}
這里的evaluate
方法可以想象成“執行”的意思。
ForEachOps.makeRef(action, false)
方法可以想象成“構造一個終止操作”。--終止操作是一個名詞,這里只是一個對象而已,如果這個“操作”沒有得到觸發,那么流什么也不會干。
所以這個evaluate
可以理解成fire action performed.
3.1 構建終止操作
首先來看看TerminalOp
接口,這是所有終止操作的抽象,每一個終止操作都是它的子類。
查看它的實現類,可以發現它的實現類的特點:
- FindOp in FindOps
示例:findFirst() - ReduceOp in ReduceOps
示例:reduce(BigDecimal.Zero, BigDecimal::add) - ForEachOp in ForEachOps
示例:forEach() - MatchOp in MatchOps
示例:anyMatch()
其中帶s的是一個工廠類,用於生產不同的“終止操作”。
不帶s的才是一個“終止操作”TerminalOp的實現類。
3.2 觸發終止操作
其實這里也不是僅僅觸發終止操作,這個方法里會把前面所有的中間操作apply到每一個元素上,並執行終止操作。
evaluate()
的實現如下,暫時這里不做過多討論,后續在sink的單獨一篇文章中,分析具體流的執行過程。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
總結
本文只是為了理解:流pipeline是一個什么概念,以及它有什么樣的基本特性?
1、流pipeline是一個雙向鏈表的節點,前后引用。
2、流由流源,中間操作和終止操作組成。
3、終止操作被觸發的時候,所有的操作(中間+終止)才會被一一應用到元素上。這稱為流的惰性。
4、有一些操作是具有短路的特性的,如:findFirst等。