java基礎-Steam[4]-Steam簡單示例以及運行原理


簡單Stream示例

@Test
public void test (){
    getData().stream().filter(person -> person.getAge() >24).map(Person::getName).forEach(System.out::println);
}
private static ArrayList<Person> getData() {
    ArrayList<Person> list=new ArrayList<>();
    list.add(new Person("1",20));
    list.add(new Person("2",22));
    list.add(new Person("3",24));
    list.add(new Person("4",26));
    list.add(new Person("5",28));
    list.add(new Person("6",20));
    list.add(new Person("7",18));
    list.add(new Person("8",16));
    return list;
}

流程分析:

public interface Collection<E> extends Iterable<E> {
    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }
}

這里的spliterator()為上一節中ArrayListSpliterator

Stream接口抽象類:AbstractPipeline

//
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
      //反向鏈接到pipeline鏈的頭部,如果是源,則是自己
    @SuppressWarnings("rawtypes")
    private final AbstractPipeline sourceStage;

    //上游pipeline
    @SuppressWarnings("rawtypes")
    private final AbstractPipeline previousStage;

    //操作標志
    protected final int sourceOrOpFlags;

    //下一個pipeline
    @SuppressWarnings("rawtypes")
    private AbstractPipeline nextStage;

    //深度
    //如果有一個階段是有狀態或者是並行的好像有區別
    private int depth;

    //combined source and operation flags 
    private int combinedFlags;

    //源spliterator,只在head pipeline有效
    //並且流被消費后會置為null
    private Spliterator<?> sourceSpliterator;

    //source supplier,只在 head pipeline 有效. 
    //並且流被消費后會置為null
    private Supplier<? extends Spliterator<?>> sourceSupplier;

    //True if this pipeline has been linked or consumed
    private boolean linkedOrConsumed;

    //在源階段,標記pipeline是否為stateful操作
    private boolean sourceAnyStateful;

    private Runnable sourceCloseAction;

    //是否是並行流
    private boolean parallel;
    
    
    AbstractPipeline(Spliterator<?> source,
                     int sourceFlags, boolean parallel) {
        this.previousStage = null;//首操作上一步為null
        this.sourceSpliterator = source;//Spliterator源
        this.sourceStage = this;
        //示例sourceFlags為80
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
        this.depth = 0;
        this.parallel = parallel;
    }
}

AbstractPipeline是所有Stream實現的父類,請參考第一張uml類圖

這里初始化可以看出,stream會組合出一條Stream pipeline linked list

Head Stream

static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
   		 Objects.requireNonNull(spliterator);
   		 return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
    }
}

簡單的創建一個AbstractPipeline:

previousStage為null

sourceSpliterator為ArrayListSpliterator

sourceStage:為當前pipeline

depth:為0

filter()

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    //filter是一個無狀態的操作
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SIZED) {
        //最后在TerminalOp中會調用該方法,組合一個Sink鏈,收集結果
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }
                //如果Predicate為true,才傳遞給下游downstream
                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}
//StatelessOp初始化
abstract class ReferencePipeline<P_IN, P_OUT> extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
        implements Stream<P_OUT>  {
   ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
        super(upstream, opFlags);
    }
    abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,StreamShape inputShape,int opFlags) {
            //調用ReferencePipeline(a,b)
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }

        @Override
        final boolean opIsStateful() {
            return false;
        }
    }
    
}
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        //以下幾行代碼完成上Pipeline鏈的關聯操作
        previousStage.linkedOrConsumed = true;
        previousStage.nextStage = this;
        this.previousStage = previousStage;
        
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }
}

初始化StatelessOp,AbstractPipeline的子類.

將Head Stream的nextStage指定為當前StatelessOp(fileter類型)

將當前StatelessOp的previousStage指定為前面的Head stream

同時sourceStage保存Head stream

map()

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    //AbstractPipeline的子類,初始化
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        //最后在TerminalOp中會調用該方法,組合一個Sink鏈,收集結果
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    //讓下游downstream接受mapper映射結果
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

foreach()

//java.util.stream.ReferencePipeline#forEach
public void forEach(Consumer<? super P_OUT> action) {
    evaluate(ForEachOps.makeRef(action, false));
}
//java.util.stream.ForEachOps#makeRef
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
                                              boolean ordered) {
    Objects.requireNonNull(action);
    return new ForEachOp.OfRef<>(action, ordered);
}
//java.util.stream.ForEachOps.ForEachOp.OfRef
static final class OfRef<T> extends ForEachOp<T> {
    final Consumer<? super T> consumer;

    OfRef(Consumer<? super T> consumer, boolean ordered) {
        super(ordered);
        this.consumer = consumer;
    }

    @Override
    public void accept(T t) {
        consumer.accept(t);
    }
}
//執行,java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
        ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
//java.util.stream.ForEachOps.ForEachOp#evaluateSequential
abstract static class ForEachOp<T> implements TerminalOp<T, Void>, TerminalSink<T, Void> {
    @Override
    public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                       Spliterator<S> spliterator) {
        return helper.wrapAndCopyInto(this, spliterator).get();
    }
}
//java.util.stream.AbstractPipeline#wrapAndCopyInto
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}
//java.util.stream.AbstractPipeline#wrapSink
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    Objects.requireNonNull(sink);

    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        //這里每個AbstractPipeline子類實現不一樣,在前面的幾個例子中也能看出來
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}
//java.util.stream.AbstractPipeline#copyInto
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        //開始遍歷集合,接受consumer接口
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

wrapSink()方法最后會將pipeline總所有操作組合為一個Sink,並且不包括head stream,因為head stream 的depth為0

spliterator.forEachRemaining()遍歷集合,接受consumer接口,也就是調用consumer.accept()方法

ForEachOp沒有實現AbstractPipeline,所以不在pipelinechain中,而是直接實現了Sink接口以及TerminalOp接口,所在wrapSink時,ForEachOp本身作為Sink鏈第一個(但最后被調用)

創建Sink單向鏈才是目的,AbstractPipeline只是保存了一個雙向鏈條以及根據不同類型封裝不同的accept方法

Sink

ChainedReference

abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
    protected final Sink<? super E_OUT> downstream;
    //初始化時必須保存downstream為上一個sink
    //構建一個sink單向鏈
    public ChainedReference(Sink<? super E_OUT> downstream) {
        this.downstream = Objects.requireNonNull(downstream);
    }

    @Override
    public void begin(long size) {
        downstream.begin(size);
    }

    @Override
    public void end() {
        downstream.end();
    }

    @Override
    public boolean cancellationRequested() {
        return downstream.cancellationRequested();
    }
}

accept()方法由子類定義,如java.util.stream.ForEachOps.ForEachOp.OfRef

圖解

stream


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM