簡單Stream示例
@Test
public void test (){
getData().stream().filter(person -> person.getAge() >24).map(Person::getName).forEach(System.out::println);
}
private static ArrayList<Person> getData() {
ArrayList<Person> list=new ArrayList<>();
list.add(new Person("1",20));
list.add(new Person("2",22));
list.add(new Person("3",24));
list.add(new Person("4",26));
list.add(new Person("5",28));
list.add(new Person("6",20));
list.add(new Person("7",18));
list.add(new Person("8",16));
return list;
}
流程分析:
public interface Collection<E> extends Iterable<E> {
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
}
這里的spliterator()為上一節中
ArrayListSpliterator
Stream接口抽象類:AbstractPipeline
//
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
//反向鏈接到pipeline鏈的頭部,如果是源,則是自己
@SuppressWarnings("rawtypes")
private final AbstractPipeline sourceStage;
//上游pipeline
@SuppressWarnings("rawtypes")
private final AbstractPipeline previousStage;
//操作標志
protected final int sourceOrOpFlags;
//下一個pipeline
@SuppressWarnings("rawtypes")
private AbstractPipeline nextStage;
//深度
//如果有一個階段是有狀態或者是並行的好像有區別
private int depth;
//combined source and operation flags
private int combinedFlags;
//源spliterator,只在head pipeline有效
//並且流被消費后會置為null
private Spliterator<?> sourceSpliterator;
//source supplier,只在 head pipeline 有效.
//並且流被消費后會置為null
private Supplier<? extends Spliterator<?>> sourceSupplier;
//True if this pipeline has been linked or consumed
private boolean linkedOrConsumed;
//在源階段,標記pipeline是否為stateful操作
private boolean sourceAnyStateful;
private Runnable sourceCloseAction;
//是否是並行流
private boolean parallel;
AbstractPipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;//首操作上一步為null
this.sourceSpliterator = source;//Spliterator源
this.sourceStage = this;
//示例sourceFlags為80
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
}
AbstractPipeline是所有Stream實現的父類,請參考第一張uml類圖
這里初始化可以看出,stream會組合出一條Stream pipeline linked list
Head Stream
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
}
簡單的創建一個
AbstractPipeline
:previousStage為null
sourceSpliterator為
ArrayListSpliterator
sourceStage:為當前pipeline
depth:為0
filter()
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
//filter是一個無狀態的操作
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
//最后在TerminalOp中會調用該方法,組合一個Sink鏈,收集結果
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
//如果Predicate為true,才傳遞給下游downstream
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
//StatelessOp初始化
abstract class ReferencePipeline<P_IN, P_OUT> extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {
ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
super(upstream, opFlags);
}
abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,StreamShape inputShape,int opFlags) {
//調用ReferencePipeline(a,b)
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
@Override
final boolean opIsStateful() {
return false;
}
}
}
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
//以下幾行代碼完成上Pipeline鏈的關聯操作
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
}
初始化
StatelessOp
,AbstractPipeline的子類.將Head Stream的
nextStage
指定為當前StatelessOp(fileter類型)將當前StatelessOp的previousStage指定為前面的Head stream
同時sourceStage保存Head stream
map()
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
//AbstractPipeline的子類,初始化
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
//最后在TerminalOp中會調用該方法,組合一個Sink鏈,收集結果
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
//讓下游downstream接受mapper映射結果
downstream.accept(mapper.apply(u));
}
};
}
};
}
foreach()
//java.util.stream.ReferencePipeline#forEach
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
//java.util.stream.ForEachOps#makeRef
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfRef<>(action, ordered);
}
//java.util.stream.ForEachOps.ForEachOp.OfRef
static final class OfRef<T> extends ForEachOp<T> {
final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
@Override
public void accept(T t) {
consumer.accept(t);
}
}
//執行,java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
//java.util.stream.ForEachOps.ForEachOp#evaluateSequential
abstract static class ForEachOp<T> implements TerminalOp<T, Void>, TerminalSink<T, Void> {
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
}
//java.util.stream.AbstractPipeline#wrapAndCopyInto
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
//java.util.stream.AbstractPipeline#wrapSink
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
//這里每個AbstractPipeline子類實現不一樣,在前面的幾個例子中也能看出來
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
//java.util.stream.AbstractPipeline#copyInto
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
//開始遍歷集合,接受consumer接口
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
wrapSink()
方法最后會將pipeline總所有操作組合為一個Sink,並且不包括head stream,因為head stream 的depth為0
spliterator.forEachRemaining()
遍歷集合,接受consumer接口,也就是調用consumer.accept()方法ForEachOp沒有實現AbstractPipeline,所以不在pipelinechain中,而是直接實現了Sink接口以及TerminalOp接口,所在
wrapSink
時,ForEachOp本身作為Sink鏈第一個(但最后被調用)創建Sink單向鏈才是目的,AbstractPipeline只是保存了一個雙向鏈條以及根據不同類型封裝不同的accept方法
Sink
ChainedReference
abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
protected final Sink<? super E_OUT> downstream;
//初始化時必須保存downstream為上一個sink
//構建一個sink單向鏈
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
accept()方法由子類定義,如
java.util.stream.ForEachOps.ForEachOp.OfRef