Flink 實現指定時長或消息條數的觸發器


Flink 中窗口是很重要的一個功能,而窗口又經常配合觸發器一起使用。

Flink 自帶的觸發器大概有:

CountTrigger: 指定條數觸發

ContinuousEventTimeTrigger:指定事件時間觸發
ContinuousProcessingTimeTrigger:指定處理時間觸發

ProcessingTimeTrigger: 默認觸發器,窗口結束觸發
EventTimeTrigger: 默認處理時間觸發器,窗口結束觸發

NeverTrigger:全局窗口觸發器,不觸發

但是沒有可以指定時間和條數一起作為觸發條件的觸發器,所有就自己實現了一個(參考:ProcessingTimeTrigger、CountTrigger)

看下調用觸發器的窗口代碼:

val stream = env.addSource(kafkaSource)
      .map(s => {
        s
      })
      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .trigger(CountAndTimeTrigger.of(10, Time.seconds(10)))
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
      override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
        var count = 0

        elements.iterator.foreach(s => {
          count += 1
        })
        logger.info("this trigger have : {} item", count)
      }
    })

很簡單的一段代碼:定義了一個60秒的窗口,觸發器是自己實現的10條數據或者 10 秒觸發一次的觸發器,窗口函數就輸出窗口數據的條數

下面看下自定義觸發器 CountAndTimeTrigger 的核心代碼如下:

/**
 * CountAndTimeTrigger : 滿足一定條數和時間觸發
 * 條數的觸發使用計數器計數
 * 時間的觸發,使用 flink 的 timerServer,注冊觸發器觸發
 *
 * @param <W>
 */
public class CountAndTimeTrigger<W extends Window> extends Trigger<Object, W> {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    // 觸發的條數
    private final long size;
    // 觸發的時長
    private final long interval;
    private static final long serialVersionUID = 1L;
    // 條數計數器
    private final ReducingStateDescriptor<Long> countStateDesc =
            new ReducingStateDescriptor<>("count", new ReduceSum(), LongSerializer.INSTANCE);
    // 時間計數器,保存下一次觸發的時間
    private final ReducingStateDescriptor<Long> timeStateDesc =
            new ReducingStateDescriptor<>("fire-interval", new ReduceMin(), LongSerializer.INSTANCE);

    public CountAndTimeTrigger(long size, long interval) {
        this.size = size;
        this.interval = interval;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        // 注冊窗口結束的觸發器, 不需要會自動觸發
//        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        // count
        ReducingState<Long> count = ctx.getPartitionedState(countStateDesc);
        //interval
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);
        // 每條數據 counter + 1
        count.add(1L);
        if (count.get() >= size) {
            logger.info("countTrigger triggered, count : {}", count.get());
            // 滿足條數的觸發條件,先清 0 條數計數器
            count.clear();
            // 滿足條數時也需要清除時間的觸發器,如果不是創建結束的觸發器
            if (fireTimestamp.get() != window.maxTimestamp()) {
//                logger.info("delete trigger : {}, {}", sdf.format(fireTimestamp.get()), fireTimestamp.get());
                ctx.deleteProcessingTimeTimer(fireTimestamp.get());
            }
            fireTimestamp.clear();
            // fire 觸發計算
            return TriggerResult.FIRE;
        }

        // 觸發之后,下一條數據進來才設置時間計數器注冊下一次觸發的時間
        timestamp = ctx.getCurrentProcessingTime();
        if (fireTimestamp.get() == null) {
//            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = timestamp + interval;
//            logger.info("register trigger : {}, {}", sdf.format(nextFireTimestamp), nextFireTimestamp);
            ctx.registerProcessingTimeTimer(nextFireTimestamp);
            fireTimestamp.add(nextFireTimestamp);
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {

        // count
        ReducingState<Long> count = ctx.getPartitionedState(countStateDesc);
        //interval
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);

        // time trigger and window end
        if (time == window.maxTimestamp()) {
            logger.info("window close : {}", time);
            // 窗口結束,清0條數和時間的計數器
            count.clear();
            ctx.deleteProcessingTimeTimer(fireTimestamp.get());
            fireTimestamp.clear();
            return TriggerResult.FIRE_AND_PURGE;
        } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
            logger.info("timeTrigger trigger, time : {}", time);
            // 時間計數器觸發,清0條數和時間計數器
            count.clear();
            fireTimestamp.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }
}

主要是在數據進來的時候,調用  onElement 做條數的計數器,滿足條件就觸發, onProcessingTime 是 flink 的 timeservice 調用的,作為定時觸發的觸發器

在時間和條數的定時器都有清除時間和條數計數器的計數,讓計數器在下一條數據到的時候,重新開始計數

特別需要注意:窗口結束的時候,會自動觸發調用 onProcessingTime ,一定要包含在觸發器邏輯里面,不然不能獲取窗口的完整數據

// time trigger and window end
        if (time == window.maxTimestamp()) {
            logger.info("window close : {}", time);
            // 窗口結束,清0條數和時間的計數器
            count.clear();
            ctx.deleteProcessingTimeTimer(fireTimestamp.get());
            fireTimestamp.clear();
            return TriggerResult.FIRE_AND_PURGE;
        } 

如在獲取到窗口觸發時間是窗口的結束時間(即窗口的結束時間減1,Java的時間精度是到毫秒,如 10秒的窗口時間是:(00000, 10000)0000-10000 ,實際上窗口結束時間就是  9999)

看執行的結果:

 

從 “14:42:00,002 INFO - window close : 1573281719999” 窗口結束

到 “14:42:10,015 INFO - countTrigger triggered, count : 10 ” , “14:42:19,063 INFO - countTrigger triggered, count : 10”  條數觸發

到 “14:42:36,499 INFO - timeTrigger trigger, time : 1573281756496” 時間觸發

最后 窗口結束 “14:43:00,002 INFO - window close : 1573281779999”

搞定

 完整代碼:https://github.com/springMoon/flink-rookie/tree/master/src/main/scala/com/venn/stream/api/trigger

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM