戳更多文章:
DataStream算子將一個或多個DataStream轉換為新DataStream。程序可以將多個轉換組合成復雜的數據流拓撲。
DataStreamAPI和DataSetAPI主要的區別在於Transformation部分。
DataStream Transformation
map
- DataStream→DataStream
用一個數據元生成一個數據元。一個map函數,它將輸入流的值加倍:
DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } });
公眾號
- 全網唯一一個從0開始幫助Java開發者轉做大數據領域的公眾號~
- 大數據技術與架構或者搜索import_bigdata關注~
- 海量【java和大數據的面試題+視頻資料】整理在公眾號,關注后可以下載~

FlatMap
- DataStream→DataStream
采用一個數據元並生成零個,一個或多個數據元。將句子分割為單詞的flatmap函數:
dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } });
Filter
- DataStream→DataStream
計算每個數據元的布爾函數,並保存函數返回true的數據元。過濾掉零值的過濾器:
dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } });
KeyBy
- DataStream→KeyedStream
邏輯上將流分區為不相交的分區。具有相同Keys的所有記錄都分配給同一分區。在內部,keyBy()是使用散列分區實現的。指定鍵有不同的方法。
此轉換返回KeyedStream,其中包括使用被Keys化狀態所需的KeyedStream。
dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple
🌺注意:
如果出現以下情況,則類型不能成為key:
-
它是POJO類型但不覆蓋hashCode()方法並依賴於Object.hashCode()實現
-
任何類型的數組
Reduce
KeyedStream→DataStream
將當前數據元與最后一個Reduce的值組合並發出新值。
例如:reduce函數,用於創建部分和的流:
keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } });
Fold
KeyedStream→DataStream
具有初始值的被Keys化數據流上的“滾動”折疊。將當前數據元與最后折疊的值組合並發出新值。
折疊函數,當應用於序列(1,2,3,4,5)時,發出序列“start-1”,“start-1-2”,“start-1-2-3”,. ..
DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() { @Override public String fold(String current, Integer value) { return current + "-" + value; } });
聚合
- KeyedStream→DataStream
在被Keys化數據流上滾動聚合。min和minBy之間的差異是min返回最小值,而minBy返回該字段中具有最小值的數據元(max和maxBy相同)。
keyedStream.sum(0); keyedStream.sum("key"); keyedStream.min(0); keyedStream.min("key"); keyedStream.max(0); keyedStream.max("key"); keyedStream.minBy(0); keyedStream.minBy("key"); keyedStream.maxBy(0); keyedStream.maxBy("key");
Window函數
關於Flink的窗口概念,我們會在后面有詳細介紹。
- Window
KeyedStream→WindowedStream
可以在已經分區的KeyedStream上定義Windows。Windows根據某些特征(例如,在最后5秒內到達的數據)對每個Keys中的數據進行分組。
dataStream.keyBy(0) .window(TumblingEventTimeWindows .of(Time.seconds(5))); // Last 5 seconds of data
- Window Apply
WindowedStream→DataStream
AllWindowedStream→DataStream
將一般函數應用於整個窗口。下面是一個手動求和窗口數據元的函數。
注意:如果您正在使用windowAll轉換,則需要使用AllWindowFunction。
windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } });
- Window Reduce
WindowedStream→DataStream
將reduce函數應用於窗口並返回reduce后的值。
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); } });
- 提取時間戳
關於Time我們在后面有專門的章節進行介紹
DataStream→DataStream
從記錄中提取時間戳,以便使用使用事件時間語義的窗口。
stream.assignTimestamps (new TimeStampExtractor() {...});
Partition 分區
- 自定義分區
DataStream→DataStream
使用用戶定義的分區程序為每個數據元選擇目標任務。
dataStream.partitionCustom(partitioner, "someKey"); dataStream.partitionCustom(partitioner, 0);
- 隨機分區
DataStream→DataStream
根據均勻分布隨機分配數據元。
dataStream.shuffle();
- Rebalance (循環分區)
DataStream→DataStream
分區數據元循環,每個分區創建相等的負載。在存在數據傾斜時用於性能優化。
dataStream.rebalance();
- rescale
DataStream→DataStream
如果上游 算子操作具有並行性2並且下游算子操作具有並行性6,則一個上游 算子操作將分配元件到三個下游算子操作,而另一個上游算子操作將分配到其他三個下游 算子操作。另一方面,如果下游算子操作具有並行性2而上游 算子操作具有並行性6,則三個上游 算子操作將分配到一個下游算子操作,而其他三個上游算子操作將分配到另一個下游算子操作。
在不同並行度不是彼此的倍數的情況下,一個或多個下游 算子操作將具有來自上游 算子操作的不同數量的輸入。
請參閱此圖以獲取上例中連接模式的可視化:

dataStream.rescale();
- 廣播
DataStream→DataStream
向每個分區廣播數據元。
dataStream.broadcast();
- 全網唯一一個從0開始幫助Java開發者轉做大數據領域的公眾號~
- 大數據技術與架構或者搜索import_bigdata關注~
- 海量【java和大數據的面試題+視頻資料】整理在公眾號,關注后可以下載~
