Flink 反饋流 Demo


有的時候,我們需要創建有環執行流圖,比如將一些處理過后還不滿足條件的數據,返回到最開始重新處理。

之前在做的時候,會考慮將處理后還不滿足的數據,寫入到單獨的 Topic 中重新消費處理

今天發現 Flink Iterate 算子,發現也能滿足需求

官網介紹: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/

Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream.

通過將一個算子的輸出重定向到某個先前的算子,在流中創建“feedback”循環。 這對於定義不斷更新模型的算法特別有用。 以下代碼從流開始,並連續應用迭代主體。 大於0的元素將被發送回反饋通道,其余元素將被轉發到下游。

官網 Demo

// 創建 IterativeStream
IterativeStream<Long> iteration = initialStream.iterate();
// 迭代操作
DataStream<Long> iterationBody = iteration.map (/*do something*/);
// filter 過濾需要返回的內容
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
      // 滿足條件的反饋
        return value > 0;
    }
});
// 將 feedback 流 反饋到 iteration 流中
iteration.closeWith(feedback);
// 輸出部分
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
      // 滿足條件的輸出
        return value <= 0;
    }
});

Scala Demo

業務場景:基於 Key 的窗口求和,如果窗口結果不滿足條件,就重新進入窗口,再求和

object FeedbackStreamDemo {

  def main(args: Array[String]): Unit = {
    // environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val source = env.addSource(new SimpleStringSource)

    val mapStream = source.map(str => {
      val arr = str.split(",")
      println("map : " + str)
      (arr(0), arr(1).toLong)
    })
      .disableChaining()

    val itStrema = mapStream.iterate(ds => {
      // 迭代過程
      val dsMap = ds.map(str => {
        (str._1, str._2 + 1)
      })
        .keyBy(_._1)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
        .process(new ProcessWindowFunction[(String, Long), (String, Long), String, TimeWindow] {
          override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {
            // process 簡單的窗口求和
            val it = elements.toIterator
            var sum = 0l
            while (it.hasNext) {
              val current = it.next()
              sum = sum + current._2
            }
            out.collect(key, sum)
          }
        })

      // 反饋分支:窗口輸出數據小於 500,反饋到 mapStream,重新窗口求和
      (dsMap.filter(s => {
        s._2 < 500
      })
        ,
        // 輸出分支:大於等於 500 的就處理完了,直接輸出
        dsMap.filter(s => {
          s._2 >= 500
        })
      )
    })
      .disableChaining()

    itStrema.print("result:")
    env.execute("FeedbackStreamDemo")
  }
}

 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM