Flink 中窗口是很重要的一個功能,而窗口又經常配合觸發器一起使用。
Flink 自帶的觸發器大概有:
CountTrigger: 指定條數觸發
ContinuousEventTimeTrigger:指定事件時間觸發
ContinuousProcessingTimeTrigger:指定處理時間觸發
ProcessingTimeTrigger: 默認觸發器,窗口結束觸發
EventTimeTrigger: 默認處理時間觸發器,窗口結束觸發
NeverTrigger:全局窗口觸發器,不觸發
但是沒有可以指定時間和條數一起作為觸發條件的觸發器,所有就自己實現了一個(參考:ProcessingTimeTrigger、CountTrigger)
看下調用觸發器的窗口代碼:
val stream = env.addSource(kafkaSource) .map(s => { s }) .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(60))) .trigger(CountAndTimeTrigger.of(10, Time.seconds(10))) .process(new ProcessAllWindowFunction[String, String, TimeWindow] { override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = { var count = 0 elements.iterator.foreach(s => { count += 1 }) logger.info("this trigger have : {} item", count) } })
很簡單的一段代碼:定義了一個60秒的窗口,觸發器是自己實現的10條數據或者 10 秒觸發一次的觸發器,窗口函數就輸出窗口數據的條數
下面看下自定義觸發器 CountAndTimeTrigger 的核心代碼如下:
/** * CountAndTimeTrigger : 滿足一定條數和時間觸發 * 條數的觸發使用計數器計數 * 時間的觸發,使用 flink 的 timerServer,注冊觸發器觸發 * * @param <W> */ public class CountAndTimeTrigger<W extends Window> extends Trigger<Object, W> { private Logger logger = LoggerFactory.getLogger(this.getClass()); // 觸發的條數 private final long size; // 觸發的時長 private final long interval; private static final long serialVersionUID = 1L; // 條數計數器 private final ReducingStateDescriptor<Long> countStateDesc = new ReducingStateDescriptor<>("count", new ReduceSum(), LongSerializer.INSTANCE); // 時間計數器,保存下一次觸發的時間 private final ReducingStateDescriptor<Long> timeStateDesc = new ReducingStateDescriptor<>("fire-interval", new ReduceMin(), LongSerializer.INSTANCE); public CountAndTimeTrigger(long size, long interval) { this.size = size; this.interval = interval; } @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { // 注冊窗口結束的觸發器, 不需要會自動觸發 // ctx.registerProcessingTimeTimer(window.maxTimestamp()); // count ReducingState<Long> count = ctx.getPartitionedState(countStateDesc); //interval ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc); // 每條數據 counter + 1 count.add(1L); if (count.get() >= size) { logger.info("countTrigger triggered, count : {}", count.get()); // 滿足條數的觸發條件,先清 0 條數計數器 count.clear(); // 滿足條數時也需要清除時間的觸發器,如果不是創建結束的觸發器 if (fireTimestamp.get() != window.maxTimestamp()) { // logger.info("delete trigger : {}, {}", sdf.format(fireTimestamp.get()), fireTimestamp.get()); ctx.deleteProcessingTimeTimer(fireTimestamp.get()); } fireTimestamp.clear(); // fire 觸發計算 return TriggerResult.FIRE; } // 觸發之后,下一條數據進來才設置時間計數器注冊下一次觸發的時間 timestamp = ctx.getCurrentProcessingTime(); if (fireTimestamp.get() == null) { // long start = timestamp - (timestamp % interval); long nextFireTimestamp = timestamp + interval; // logger.info("register trigger : {}, {}", sdf.format(nextFireTimestamp), nextFireTimestamp); ctx.registerProcessingTimeTimer(nextFireTimestamp); fireTimestamp.add(nextFireTimestamp); } return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { // count ReducingState<Long> count = ctx.getPartitionedState(countStateDesc); //interval ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc); // time trigger and window end if (time == window.maxTimestamp()) { logger.info("window close : {}", time); // 窗口結束,清0條數和時間的計數器 count.clear(); ctx.deleteProcessingTimeTimer(fireTimestamp.get()); fireTimestamp.clear(); return TriggerResult.FIRE_AND_PURGE; } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) { logger.info("timeTrigger trigger, time : {}", time); // 時間計數器觸發,清0條數和時間計數器 count.clear(); fireTimestamp.clear(); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } }
主要是在數據進來的時候,調用 onElement 做條數的計數器,滿足條件就觸發, onProcessingTime 是 flink 的 timeservice 調用的,作為定時觸發的觸發器
在時間和條數的定時器都有清除時間和條數計數器的計數,讓計數器在下一條數據到的時候,重新開始計數
特別需要注意:窗口結束的時候,會自動觸發調用 onProcessingTime ,一定要包含在觸發器邏輯里面,不然不能獲取窗口的完整數據
// time trigger and window end if (time == window.maxTimestamp()) { logger.info("window close : {}", time); // 窗口結束,清0條數和時間的計數器 count.clear(); ctx.deleteProcessingTimeTimer(fireTimestamp.get()); fireTimestamp.clear(); return TriggerResult.FIRE_AND_PURGE; }
如在獲取到窗口觸發時間是窗口的結束時間(即窗口的結束時間減1,Java的時間精度是到毫秒,如 10秒的窗口時間是:(00000, 10000)0000-10000 ,實際上窗口結束時間就是 9999)
看執行的結果:
從 “14:42:00,002 INFO - window close : 1573281719999” 窗口結束
到 “14:42:10,015 INFO - countTrigger triggered, count : 10 ” , “14:42:19,063 INFO - countTrigger triggered, count : 10” 條數觸發
到 “14:42:36,499 INFO - timeTrigger trigger, time : 1573281756496” 時間觸發
最后 窗口結束 “14:43:00,002 INFO - window close : 1573281779999”
搞定
完整代碼:https://github.com/springMoon/flink-rookie/tree/master/src/main/scala/com/venn/stream/api/trigger
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文