ReferencePipeline


用於描述中間管道階段或者管道源階段的抽象基類。

繼承 AbstractPipeline,實現 Stream

AbstractPipeline

繼承 PipelineHelper,實現 BaseStream

文檔說明

  • 管道類的抽象父類,是 Stream 接口及其原生特化的核心實現。它會管理流管道的構建及評估
  • AbstractPipeline 代表一個流管道的的初始部分,它封裝了流的源,以及一個或多個中間操作
    • 每個 AbstractPipeline 對象通常被稱為 stage(階段),每個階段描述的,要么是流的源,要么是一個中間操作
  • 一個具體的中間階段通常通過一個 AbstractPipeline 構建,一個 “類型特化(shape-specific)管道” 繼承它(如:IntPipeline)后仍舊抽象,再由一個“操作特化(operation-specific)”的具體類去繼承前者
  • AbstractPipeline 包含了大多數評價管道的機制,並且實現方法,這些實現的方法都會被操作使用到;“類型特化”的類會添加一些輔助方法,用於將結果集裝到合適的“類型特化”的容器當中(也就是避免裝箱拆箱的操作)
  • 在鏈接一個新的中間操作,或者執行一個終止操作后,這個流就會被標記為已消費,並且不會再有更多的中間或終止操作添加到這個流實例當中
  • implNote
  • 對於串行流,以及所有中間操作都無狀態的的並行流兩種情況,管道的計算是在單個的過程中完成的,所謂的單個的過程就是,將所有的操作都放到一起完成
  • 對於有狀態操作的並行流,執行會被分為多個“段”,當中每個有狀態的操作都會在段的結尾打上標識,然后每個段都會單獨地進行計算,並且每段的輸出結果都會作為下一段的輸入
  • 在所有的情況中,在一個終止操作開始之前,源數據都不會被消費

屬性

  • previousStage
    • 上游的 pipeline,源階段置 null
  • sourceStage
    • 指向管道頭的反指向鏈,源階段為自身
  • sourceOrOpFlags
  • combinedFlags
  • depth
    • 當前管道對象與流的源(串行)或前狀態(並行)之間,中間操作的個數
    • 在管道准備進行計算的時間點有效
  • parallel
    • 是否並行,只在流的源階段有效
  • sourceSpliterator
    • 源的 spliteraror,只在管道頭有效
    • 在管道被消費前,若為“非空”,則 sourceSupplier 必須為 null
    • 在管道被消費后,若為“非空”,則將其置 null
  • sourceSupplier
    • 源的 supplier,只在管道頭有效
    • 在管道被消費前,若為“非空”,則 sourceSpliterator 必須為 null
    • 在管道被消費后,若為“非空”,則將其置 null

構造方法

構造方法一:構造流管道的頭

上游 pipeline 置 null,將帶有數據源引用的 spliterator 賦給當前 pipeline,

將當前階段標記為源階段,配置操作屬性,當前深度初始化為0,設置 串行/並行 標記

AbstractPipeline(Supplier<? extends Spliterator<?>> source,
                     int sourceFlags, boolean parallel) {
    this.previousStage = null;
    this.sourceSupplier = source;
    this.sourceStage = this;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}

 

構造方法二:構造用於追加一個中間操作到一個既有的 pipeline 上

斷言,若傳入的上游 pipeline 已被鏈接過消費,則拋出異常;

將上游 pipeline 標記為已消費;將上游 pipeline 的下游指向當前 pipeline;將當前 pipeline 的上游指向上游 pipeline;

配置操作屬性;深度 +1

AbstractPipeline(Spliterator<?> source,
                     int sourceFlags, boolean parallel) {
    this.previousStage = null;
    this.sourceSpliterator = source;
    this.sourceStage = this;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}

 

抽象方法

abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink)

說明文檔

  • 接收一個接收操作結果的 Sink,並返回一個接收當前操作的輸入類型的元素並執行操作的 Sink,將結果傳遞到提供的 Sink 中
  • apiNote
  • 實現類可能使用 flag 參數去優化 sink 的包裝
  • 例如,如果輸入已經是 DISTINCT ,那么 distinct() 方法的實現只需要返回 sink

 

實現方法

wrapAndCopyInto

@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}
  • 通過 wrapSink 方法獲取最初的 sink
  • 通過 copyInto 方法遍歷元素
  • 循例將執行到的 sink 返回,即使沒什么意義

wrapSink

@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    Objects.requireNonNull(sink);

    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}
  • 傳入下一階段的 sink,獲取當前階段的 sink,循環獲取最初階段的 sink

copyInto

@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}
  • 判斷是否有短路操作(filter 之類的中間操作)
  • 遵循 Sink 中定義的操作順序調用遍歷方法,最終調用數據源的 tryAdvance方法

 

ReferencePipeline.Head

表示 ReferencePipeline 的源階段

繼承了 ReferencePipeline(二者在大部分屬性的設定上是類似的,但存在一些屬性是不同的,比如說 Head 的 previousStage 是空的,而 ReferencePipeline 則存在 previousStage,等等)

構造方法

本質上是調用的是 AbstractPipeline 的構造方法一,

ReferencePipeline(Supplier<? extends Spliterator<?>> source, int sourceFlags, boolean parallel) {
    super(source, sourceFlags, parallel);
}
ReferencePipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
    super(source, sourceFlags, parallel);
}

ReferencePipeline.StatelessOp

一個基類,針對一個流的無狀態的中間階段,同樣集成了 ReferencePipeline

構造方法

本質上調用的是 AbstractPipeline 的構造方法二

StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
    super(upstream, opFlags);
    assert upstream.getOutputShape() == inputShape;
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



猜您在找
 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM