Flink從入門到放棄(入門篇4) DataStreamAPI


戳更多文章:

1-Flink入門

2-本地環境搭建&構建第一個Flink應用

3-DataSet API

4-DataSteam API

5-集群部署

6-分布式緩存

7-重啟策略

8-Flink中的窗口

9-Flink中的Time

Flink時間戳和水印

Broadcast廣播變量

FlinkTable&SQL

Flink實戰項目實時熱銷排行

Flink寫入RedisSink

17-Flink消費Kafka寫入Mysql

DataStream算子將一個或多個DataStream轉換為新DataStream。程序可以將多個轉換組合成復雜的數據流拓撲。
DataStreamAPI和DataSetAPI主要的區別在於Transformation部分。

DataStream Transformation

map

  • DataStream→DataStream
    用一個數據元生成一個數據元。一個map函數,它將輸入流的值加倍:
DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } }); 

公眾號

  • 全網唯一一個從0開始幫助Java開發者轉做大數據領域的公眾號~
  • 大數據技術與架構或者搜索import_bigdata關注~
  • 海量【java和大數據的面試題+視頻資料】整理在公眾號,關注后可以下載~
 
image

FlatMap

  • DataStream→DataStream

采用一個數據元並生成零個,一個或多個數據元。將句子分割為單詞的flatmap函數:

dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } }); 

Filter

  • DataStream→DataStream
    計算每個數據元的布爾函數,並保存函數返回true的數據元。過濾掉零值的過濾器:
dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } }); 

KeyBy

  • DataStream→KeyedStream

邏輯上將流分區為不相交的分區。具有相同Keys的所有記錄都分配給同一分區。在內部,keyBy()是使用散列分區實現的。指定鍵有不同的方法。

此轉換返回KeyedStream,其中包括使用被Keys化狀態所需的KeyedStream。

dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple 

🌺注意:

如果出現以下情況,則類型不能成為key:

  • 它是POJO類型但不覆蓋hashCode()方法並依賴於Object.hashCode()實現

  • 任何類型的數組

Reduce

KeyedStream→DataStream

將當前數據元與最后一個Reduce的值組合並發出新值。
例如:reduce函數,用於創建部分和的流:

keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } }); 

Fold

KeyedStream→DataStream

具有初始值的被Keys化數據流上的“滾動”折疊。將當前數據元與最后折疊的值組合並發出新值。

折疊函數,當應用於序列(1,2,3,4,5)時,發出序列“start-1”,“start-1-2”,“start-1-2-3”,. ..

DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() { @Override public String fold(String current, Integer value) { return current + "-" + value; } }); 

聚合

  • KeyedStream→DataStream

在被Keys化數據流上滾動聚合。min和minBy之間的差異是min返回最小值,而minBy返回該字段中具有最小值的數據元(max和maxBy相同)。

keyedStream.sum(0); keyedStream.sum("key"); keyedStream.min(0); keyedStream.min("key"); keyedStream.max(0); keyedStream.max("key"); keyedStream.minBy(0); keyedStream.minBy("key"); keyedStream.maxBy(0); keyedStream.maxBy("key"); 

Window函數

關於Flink的窗口概念,我們會在后面有詳細介紹。

  • Window
    KeyedStream→WindowedStream

可以在已經分區的KeyedStream上定義Windows。Windows根據某些特征(例如,在最后5秒內到達的數據)對每個Keys中的數據進行分組。

dataStream.keyBy(0) .window(TumblingEventTimeWindows .of(Time.seconds(5))); // Last 5 seconds of data 
  • Window Apply
    WindowedStream→DataStream
    AllWindowedStream→DataStream

將一般函數應用於整個窗口。下面是一個手動求和窗口數據元的函數。

注意:如果您正在使用windowAll轉換,則需要使用AllWindowFunction。

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); 
  • Window Reduce
    WindowedStream→DataStream

將reduce函數應用於窗口並返回reduce后的值。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); } }); 
  • 提取時間戳

關於Time我們在后面有專門的章節進行介紹

DataStream→DataStream

從記錄中提取時間戳,以便使用使用事件時間語義的窗口。

stream.assignTimestamps (new TimeStampExtractor() {...}); 

Partition 分區

  • 自定義分區
    DataStream→DataStream
    使用用戶定義的分區程序為每個數據元選擇目標任務。
dataStream.partitionCustom(partitioner, "someKey"); dataStream.partitionCustom(partitioner, 0); 
  • 隨機分區
    DataStream→DataStream
    根據均勻分布隨機分配數據元。
dataStream.shuffle(); 
  • Rebalance (循環分區)
    DataStream→DataStream
    分區數據元循環,每個分區創建相等的負載。在存在數據傾斜時用於性能優化。
dataStream.rebalance(); 
  • rescale
    DataStream→DataStream

如果上游 算子操作具有並行性2並且下游算子操作具有並行性6,則一個上游 算子操作將分配元件到三個下游算子操作,而另一個上游算子操作將分配到其他三個下游 算子操作。另一方面,如果下游算子操作具有並行性2而上游 算子操作具有並行性6,則三個上游 算子操作將分配到一個下游算子操作,而其他三個上游算子操作將分配到另一個下游算子操作。

在不同並行度不是彼此的倍數的情況下,一個或多個下游 算子操作將具有來自上游 算子操作的不同數量的輸入。

請參閱此圖以獲取上例中連接模式的可視化:

 
image
dataStream.rescale(); 
  • 廣播
    DataStream→DataStream
    向每個分區廣播數據元。
dataStream.broadcast(); 
  • 全網唯一一個從0開始幫助Java開發者轉做大數據領域的公眾號~
  • 大數據技術與架構或者搜索import_bigdata關注~
  • 海量【java和大數據的面試題+視頻資料】整理在公眾號,關注后可以下載~
 
image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM