Flink 自定義觸發器


import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @Auther WeiJiQian
 * @描述 CountAndTimeTrigger : 滿足一定條數和時間觸發
 *  * 條數的觸發使用計數器計數
 *  * 時間的觸發,使用 flink 的 timerServer,注冊觸發器觸發
 */
public class CountAndTimeTrigger<W extends Window> extends Trigger<Object, W> {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    // 觸發的條數
    private final long size;
    // 觸發的時長
    private final long interval;
    private static final long serialVersionUID = 1L;
    // 條數計數器
    private final ReducingStateDescriptor<Integer> countStateDesc =
            new ReducingStateDescriptor<>("count", new ReduceSum(), IntSerializer.INSTANCE);
    // 時間計數器,保存下一次觸發的時間
    private final ReducingStateDescriptor<Long> timeStateDesc =
            new ReducingStateDescriptor<>("fire-interval", new ReduceMin(), LongSerializer.INSTANCE);

    public CountAndTimeTrigger(long size, long interval) {
        this.size = size;
        this.interval = interval;
    }

    // 每條元素到來時.
    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        // 注冊窗口結束的觸發器, 不需要會自動觸發
//        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        // count
        ReducingState<Integer> count = ctx.getPartitionedState(countStateDesc);
        //interval
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);
        // 每條數據 counter + 1
        count.add(1);
        if (count.get() >= size) {
            System.out.println("窗口結束: 計數器觸發 count : {}"+ count.get());
            // 滿足條數的觸發條件,先清 0 條數計數器
            count.clear();
            // 滿足條數時也需要清除時間的觸發器,如果不是創建結束的觸發器
            if (fireTimestamp.get() != window.maxTimestamp()) {
//                logger.info("delete trigger : {}, {}", sdf.format(fireTimestamp.get()), fireTimestamp.get());
                ctx.deleteProcessingTimeTimer(fireTimestamp.get());
            }
            fireTimestamp.clear();
            // fire 觸發計算
            return TriggerResult.FIRE;
        }

        // 觸發之后,下一條數據進來才設置時間計數器注冊下一次觸發的時間

        timestamp = ctx.getCurrentProcessingTime();
//        timestamp = System.currentTimeMillis();
        if (fireTimestamp.get() == null) {
//            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = timestamp + interval;
//            logger.info("register trigger : {}, {}", sdf.format(nextFireTimestamp), nextFireTimestamp);
            ctx.registerProcessingTimeTimer(nextFireTimestamp);
            fireTimestamp.add(nextFireTimestamp);
        }
        return TriggerResult.CONTINUE;
    }

    // 處理時間到的時候,開始處理
    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {

        // count
        ReducingState<Integer> count = ctx.getPartitionedState(countStateDesc);
        //interval
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);

        // time trigger and window end
        if (fireTimestamp.get() != null && time == window.maxTimestamp()) {
            System.out.println("窗口結束: 正常結束 {}" + time);
            // 窗口結束,清0條數和時間的計數器
            count.clear();
            ctx.deleteProcessingTimeTimer(fireTimestamp.get());
            fireTimestamp.clear();
            return TriggerResult.FIRE_AND_PURGE;
        } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
            System.out.println("窗口結束:時間計數器觸發, time : {}" + time);
            // 時間計數器觸發,清0條數和時間計數器
            count.clear();
            fireTimestamp.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        // count
        ReducingState<Integer> count = ctx.getPartitionedState(countStateDesc);
        //interval
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);

        // time trigger and window end
        if (time == window.maxTimestamp()) {
            System.out.println("窗口結束 : {}"+ time);
            // 窗口結束,清0條數和時間的計數器
            count.clear();
            ctx.deleteProcessingTimeTimer(fireTimestamp.get());
            fireTimestamp.clear();
            return TriggerResult.FIRE_AND_PURGE;
        } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
            System.out.println("時間計數器觸發, time : {}"+ time);
            // 時間計數器觸發,清0條數和時間計數器
            count.clear();
            fireTimestamp.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(countStateDesc).clear();
        ctx.getPartitionedState(timeStateDesc).clear();
    }


    // 多個slot 中的 數據合並.
    @Override
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        super.onMerge(window, ctx);
        ctx.mergePartitionedState(timeStateDesc);
        ctx.mergePartitionedState(countStateDesc);
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM