Stream中的Pipeline理解


使用Stream已經快3年了,但是從未真正深入研究過Stream的底層實現。
今天開始把最近學到的Stream原理記錄一下。

本篇文章簡單描述一下自己對pipeline的理解。

基於下面一段代碼:

public static void main(String[] args) {
    List<String> list = Arrays.asList("123", "123123");
    list.stream().map(item -> item+"").forEach(System.out::print);
}

1. stream()方法
顯然,這里的list對象是一個ArrayList實例,debug代碼進入stream方法,可以看見進入到Collection.java類中的stream()

這里的源碼如下:

    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

關於分割迭代器的內容會在另外一篇文章詳解,這里不再贅述。
進入StreamSupport.stream()方法:

StreamSupport.java
    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

咱們可以看到Stream是一個ReferencePipeline.Head類的實例,
通過idea的類圖結構功能,我們可以看到下面這個層次結構:

所有的流基本都是來自於BaseStreamAbstractPipelineReferencePipeline這三個抽象類或接口。
ReferencePipeline的實現類一共就三種:

  1. Head
  2. StatelessOp
  3. StatefulOp

查看了源碼即可知道:AbstractPipeline其實就是一個雙向鏈表中的一個節點。【我是這么理解的】
Head:代表的是流的源頭,剛構建的時候流的一些屬性被包含在這個對象。比如這個集合的元素,畢竟流的存在還是為了對一組元素的操作。
StatelessOp:代表的是無狀態的操作,如map()
StatefulOp:代表的是有狀態的操作,如sorted()


圖中的每個節點都是一個AbstractPipeline的實現。

所以stream()方法執行之后,拿到的是一個ReferencePipeline.Head實例,並沒有構建StatelessOpStatefulOp實例。

2. map()方法
因為stream方法返回值是一個Head實例,而Head類並未重寫map方法,所以map方法的實際執行還是走的ReferencePipeline類的map方法,如下:

    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

這里的返回是一個繼承於StatelessOp的匿名類。
關於SinkTerminalOp的詳解后續會單獨開文章分析。
這里只需要理解這個map的返回值是一個繼承於StatelessOp的匿名類。(StatelessOp是一個ReferencePipeline的實現)

3. forEach()方法

前提:流是含有流源的對象,並且它支持0個或多個中間操作,1個終止操作的特性。

通過idea查看發現foreach的實現有2個:

第一個是Head的實現,因為流源構造出來之后,直接調用forEach,有它自己的實現,對迭代做了優化。這里可后續添加細致分析。
第二個是ReferencePipeline的實現,即調用終止操作的節點不是流源節點。

我們這里只分析ReferencePipeline中的實現:

    public void forEach(Consumer<? super P_OUT> action) {
        /**
         *  ForEachOps.makeRef(action, false) 是構建終止操作,參考3.1
         *  evaluate()是觸發終止操作的調用,參考3.2
         */
        evaluate(ForEachOps.makeRef(action, false));
    }

這里的evaluate方法可以想象成“執行”的意思。
ForEachOps.makeRef(action, false)方法可以想象成“構造一個終止操作”。--終止操作是一個名詞,這里只是一個對象而已,如果這個“操作”沒有得到觸發,那么流什么也不會干。
所以這個evaluate可以理解成fire action performed.

3.1 構建終止操作
首先來看看TerminalOp接口,這是所有終止操作的抽象,每一個終止操作都是它的子類。

查看它的實現類,可以發現它的實現類的特點:

  • FindOp in FindOps
    示例:findFirst()
  • ReduceOp in ReduceOps
    示例:reduce(BigDecimal.Zero, BigDecimal::add)
  • ForEachOp in ForEachOps
    示例:forEach()
  • MatchOp in MatchOps
    示例:anyMatch()

其中帶s的是一個工廠類,用於生產不同的“終止操作”。
不帶s的才是一個“終止操作”TerminalOp的實現類。

3.2 觸發終止操作
其實這里也不是僅僅觸發終止操作,這個方法里會把前面所有的中間操作apply到每一個元素上,並執行終止操作。
evaluate()的實現如下,暫時這里不做過多討論,后續在sink的單獨一篇文章中,分析具體流的執行過程。

    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

總結

本文只是為了理解:流pipeline是一個什么概念,以及它有什么樣的基本特性?
1、流pipeline是一個雙向鏈表的節點,前后引用。
2、流由流源,中間操作和終止操作組成。
3、終止操作被觸發的時候,所有的操作(中間+終止)才會被一一應用到元素上。這稱為流的惰性。
4、有一些操作是具有短路的特性的,如:findFirst等。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM