Stream API
- Java 8集合中的Stream相當於高級版的Iterator
- Stream API通過Lambda表達式對集合進行各種非常便利高效的聚合操作,或者大批量數據操作
- Stream的聚合操作與數據庫SQL的聚合操作sorted、filter、map等非常類似
- 在數據操作方面,Stream不僅可以通過串行的方式實現數據操作,還可以通過並行的方式處理大批量數據,提高處理效率
// java.util.Collection
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
@Data
class Student {
private Integer height;
private String sex;
}
Map<String, List<Student>> map = Maps.newHashMap();
List<Student> list = Lists.newArrayList();
// 傳統的迭代方式
for (Student student : list) {
if (student.getHeight() > 160) {
String sex = student.getSex();
if (!map.containsKey(sex)) {
map.put(sex, Lists.newArrayList());
}
map.get(sex).add(student);
}
}
// Stream API,串行實現
map = list.stream().filter((Student s) -> s.getHeight() > 160).collect(Collectors.groupingBy(Student::getSex));
// Stream API,並行實現
map = list.parallelStream().filter((Student s) -> s.getHeight() > 160).collect(Collectors.groupingBy(Student::getSex));
優化遍歷
Stream操作分類
- 分為兩大類:中間操作(Intermediate operations)和終結操作(Terminal operations)
- 中間操作只對操作進行了記錄,即只會返回一個流,不會進行計算操作,而終結操作是實現了計算操作
- 中間操作又分為無狀態(Stateless)操作和有狀態(Stateful)操作
- 無狀態操作:元素的處理不受之前元素的影響
- 有狀態操作:該操作只有拿到所有元素之后才能繼續下去
- 終結操作又分為短路(Short-circuiting)操作與非短路(UnShort-circuiting)操作
- 短路操作:遇到某些符合條件的元素就可以得到最終結果
- 非短路操作:必須處理完所有元素才能得到最終結果
- 通常會將中間操作稱為懶操作,正是因為懶操作結合終結操作,數據源構成的處理管道(Pipeline),實現了Stream的高效

Stream源碼實現

- BaseStream和Stream為最頂端的接口類
- BaseStream定義了流的基本接口方法,如spliterator、isParallel等
- Stream定義了流的常用操作方法,如map、filter等
- ReferencePipeline是一個結構類,通過定義內部類組裝各種操作流
- 內部定義了Head、StatelessOp和StatefulOp三個內部類,實現了BaseStream和Stream的接口方法
- Sink接口定義每個Stream操作之間關系的協議,包含了begin、end、cancellationRequested、accept方法
- ReferencePipeline最終會將整個Stream流操作組裝成一個調用鏈
- 而調用鏈上的每個Stream操作的上下文關系就是通過Sink接口來定義實現的
Stream操作疊加

- 一個Stream的各個操作是由處理管道組裝的,並統一完成數據處理
- 在JDK中,每次的中斷操作都會以使用階段(Stage)命名
- 管道結構通常是由ReferencePipeline類實現的,ReferencePipeline包含Head、StatelessOp、StatefulOp三個內部類
- Head類主要用來定義數據源操作,初次調用.stream()時,會初次加載Head對象
- 接着加載中間操作,分為StatelessOp對象和StatefulOp對象
- 此時的Stage並沒有執行,而是通過AbstractPipeline生成了中間操作的Stage鏈表
- 當調用終結操作時,會生成一個最終的Stage
- 通過這個Stage觸發之前的中間操作,從最后一個Stage開始,遞歸產生一個Sink鏈
樣例
List<String> names = Arrays.asList("張三", "李四", "王老五", "李三", "劉老四", "王小二", "張四", "張五六七");
String maxLenStartWithZ = names.stream()
.filter(name -> name.startsWith("張"))
.mapToInt(String::length)
.max()
.toString();
names是ArrayList集合,names.stream會調用集合類基礎接口Collection的stream方法
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
Collection.stream方法會調用StreamSupport.stream方法,方法中初始化了一個ReferencePipeline的Head內部類對象
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
調用filter和map,兩者都是無狀態的中間操作,因此並沒有執行任何操作,只是分別創建了一個Stage來標識用戶的每一次操作
通常情況下,Stream的操作需要一個回調函數,所以一個完整的Stage是由數據來源、操作、回調函數組成的三元組表示
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
new StatelessOp會調用父類AbstractPipeline的構造函數,該構造函數會將前后的Stage聯系起來,生成一個Stage鏈表
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this; // 將當前的Stage的next指針指向之前的Stage
this.previousStage = previousStage; // 賦值當前Stage當全局變量previousStage
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
創建Stage時,會包含opWrapSink方法,該方法把一個操作的具體實現封裝在Sink類中,Sink采用處理->轉發的模式來疊加操作
調用max,會調用ReferencePipeline的max方法
由於max是終結操作,會創建一個TerminalOp操作,同時創建一個ReducingSink,並且將操作封裝在Sink類中
@Override
public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
return reduce(BinaryOperator.maxBy(comparator));
}
最后調用AbstractPipeline的wrapSink方法,生成一個Sink鏈表,Sink鏈表中的每一個Sink都封裝了一個操作的具體實現
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
當Sink鏈表生成完成后,Stream開始執行,通過Spliterator迭代集合,執行Sink鏈表中的具體操作
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
- java 8中的forEachRemaining會迭代集合
- 每迭代一次,都會執行一次filter操作,通過后就會觸發map操作,然后將結果放入到臨時數組object中,再進行下一次迭代
- 完成中間操作后,最后觸發終結操作max
Stream並行處理
List<String> names = Arrays.asList("張三", "李四", "王老五", "李三", "劉老四", "王小二", "張四", "張五六七");
String maxLenStartWithZ = names.stream()
.parallel()
.filter(name -> name.startsWith("張"))
.mapToInt(String::length)
.max()
.toString();
Stream的並行處理在執行終結操作之前,跟串行處理的實現是一樣的,在調用終結方法之后,會調用TerminalOp.evaluateParallel
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
- 並行處理指的是Stream結合了ForkJoin框架,對Stream處理進行了分片,Spliterator.estimateSize會估算出分片的數據量
- 通過預估的數據量獲取最小處理單元的閾值,如果當前分片大小大於最小處理單元的閾值,就繼續切分集合
- 每個分片都將會生成一個Sink鏈表,當所有分片操作完成后,ForkJoin框架將會合並分片任何結果集
合理使用Stream
- 在循環迭代次數較少的情況下,常規的迭代方式性能反而更好
- 在單核CPU服務器配置環境中,也是常規迭代方式更有優勢
- 在大數據循環迭代中,如果服務器是多核CPU的情況,采用Stream的並行迭代優勢明顯
小結
- Stream將整個操作分解成了鏈式結構,不僅簡化了遍歷操作,還為實現並行計算奠定了基礎
- Stream將遍歷元素的操作和對元素的計算分為中間操作和終結操作
- 中間操作又根據元素之間狀態有無干擾分為有狀態操作和無狀態操作,實現了鏈式結構中的不同階段
- 串行處理
- Stream在執行中間操作時,並不會做實際的數據操作處理,而是將這些中間操作串聯起來,最終由終結操作觸發
- 生成一個數據處理鏈表,通過Java 8的Spliterator迭代器進行數據處理
- 並行處理
- 對中間操作的處理跟串行處理的方式是一樣的,但在終結操作中,Stream將結合ForkJoin框架對集合進行切片處理
我是小架,我們下篇文章再見!