有的時候,我們需要創建有環執行流圖,比如將一些處理過后還不滿足條件的數據,返回到最開始重新處理。
之前在做的時候,會考慮將處理后還不滿足的數據,寫入到單獨的 Topic 中重新消費處理
今天發現 Flink Iterate 算子,發現也能滿足需求
官網介紹: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/
Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream.
通過將一個算子的輸出重定向到某個先前的算子,在流中創建“feedback”循環。 這對於定義不斷更新模型的算法特別有用。 以下代碼從流開始,並連續應用迭代主體。 大於0的元素將被發送回反饋通道,其余元素將被轉發到下游。
官網 Demo
// 創建 IterativeStream IterativeStream<Long> iteration = initialStream.iterate(); // 迭代操作 DataStream<Long> iterationBody = iteration.map (/*do something*/); // filter 過濾需要返回的內容 DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { // 滿足條件的反饋 return value > 0; } }); // 將 feedback 流 反饋到 iteration 流中 iteration.closeWith(feedback); // 輸出部分 DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { // 滿足條件的輸出 return value <= 0; } });
Scala Demo
業務場景:基於 Key 的窗口求和,如果窗口結果不滿足條件,就重新進入窗口,再求和
object FeedbackStreamDemo { def main(args: Array[String]): Unit = { // environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val source = env.addSource(new SimpleStringSource) val mapStream = source.map(str => { val arr = str.split(",") println("map : " + str) (arr(0), arr(1).toLong) }) .disableChaining() val itStrema = mapStream.iterate(ds => { // 迭代過程 val dsMap = ds.map(str => { (str._1, str._2 + 1) }) .keyBy(_._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .process(new ProcessWindowFunction[(String, Long), (String, Long), String, TimeWindow] { override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = { // process 簡單的窗口求和 val it = elements.toIterator var sum = 0l while (it.hasNext) { val current = it.next() sum = sum + current._2 } out.collect(key, sum) } }) // 反饋分支:窗口輸出數據小於 500,反饋到 mapStream,重新窗口求和 (dsMap.filter(s => { s._2 < 500 }) , // 輸出分支:大於等於 500 的就處理完了,直接輸出 dsMap.filter(s => { s._2 >= 500 }) ) }) .disableChaining() itStrema.print("result:") env.execute("FeedbackStreamDemo") } }

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

