之前有小伙伴在群里說:滑動窗口使用觸發器讓每條數據都觸發一次計算
但是他並沒有得到預期的結果:每條數據都觸發一次計算,輸出一條結果,而是每天數據都輸出了很多條結果
為什么會這樣呢?
寫了個小案例,來解釋這種情況
為了方便使用自定義的 source 開發數據:
class StringSourceFunction extends SourceFunction[String] { var flag = true override def cancel(): Unit = { flag = false } override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (flag) { val str = StringUtil.getRandomString(1).toUpperCase ctx.collect(str + "," + StringUtil.getRandomString(1).toUpperCase) Thread.sleep(1000) } } }
就是個簡單的 souce,每秒對外發出隨機的 string 字符串(基本一分鍾 60 條)
對應的計算程序如下:
env.addSource(new StringSourceFunction) .windowAll(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(10))) // 每條數據觸發一次計算 //.trigger(CountTrigger.of(1)) .process(new ProcessAllWindowFunction[String, String, TimeWindow] { override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = { // 窗口 val window = context.window.toString // 簡單計算下窗口里面的元素個數 var count: Long = 0 elements.iterator.foreach(s => count += 1) out.collect("time : " + sdf.format(System.currentTimeMillis()) + ", window : " + window + ", element counter : " + count) } }) .print("")
定義了一個 一分鍾的窗口,滑動間隔是10秒,一條數據就應該屬於6個窗口
比如: 5 這條數據屬於:(-50,10)(-40,20)(-30,30)(-20,40)(-10,50)(0,60) 這6 個窗口
注釋 trigger 看結果:
10秒滑動間隔,就是10秒有一個滑動一次,一個窗口結束,觸發一次計算,輸出一個結果(前面6個窗口,因為剛啟動數據不夠60條)
開啟了tirgger 結果就完全不一樣了
可以看到,第一條數據進去的時候,觸發了6次計算,因為它屬於6個窗口,tirgger 會觸發6次
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文