Flink 滑動窗口使用觸發器會觸發多個窗口的計算


之前有小伙伴在群里說:滑動窗口使用觸發器讓每條數據都觸發一次計算

但是他並沒有得到預期的結果:每條數據都觸發一次計算,輸出一條結果,而是每天數據都輸出了很多條結果

為什么會這樣呢?

寫了個小案例,來解釋這種情況

為了方便使用自定義的 source 開發數據:

class StringSourceFunction extends SourceFunction[String] {

  var flag = true

  override def cancel(): Unit = {

    flag = false
  }

  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {

    while (flag) {
      val str = StringUtil.getRandomString(1).toUpperCase
      ctx.collect(str + "," + StringUtil.getRandomString(1).toUpperCase)
      Thread.sleep(1000)
    }
  }

}

就是個簡單的 souce,每秒對外發出隨機的 string 字符串(基本一分鍾 60 條)

對應的計算程序如下:

env.addSource(new StringSourceFunction)
      .windowAll(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(10)))
      // 每條數據觸發一次計算
      //.trigger(CountTrigger.of(1))
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
      override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
        // 窗口
        val window = context.window.toString
        // 簡單計算下窗口里面的元素個數
        var count: Long = 0
        elements.iterator.foreach(s => count += 1)


        out.collect("time : " + sdf.format(System.currentTimeMillis()) + ", window : " + window + ", element counter : " + count)
      }
    })
      .print("")

定義了一個 一分鍾的窗口,滑動間隔是10秒,一條數據就應該屬於6個窗口

比如: 5 這條數據屬於:(-50,10)(-40,20)(-30,30)(-20,40)(-10,50)(0,60) 這6 個窗口

注釋 trigger 看結果:

10秒滑動間隔,就是10秒有一個滑動一次,一個窗口結束,觸發一次計算,輸出一個結果(前面6個窗口,因為剛啟動數據不夠60條)

開啟了tirgger 結果就完全不一樣了

可以看到,第一條數據進去的時候,觸發了6次計算,因為它屬於6個窗口,tirgger 會觸發6次

 歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM