FLINK-函數-CoProcessFunction


CoProcessFunction

  對於兩條輸入流,DataStream API提供了CoProcessFunction這樣的low-level操作。CoProcessFunction提供了操作每一個輸入流的方法: processElement1()和processElement2()。

  類似於ProcessFunction,這兩種方法都通過Context對象來調用。這個Context對象可以訪問事件數據,定時器時間戳,TimerService,以及side outputs。CoProcessFunction也提供了onTimer()回調函數。下面的例子展示了如何使用CoProcessFunction來合並兩條流。

實現低階join通常遵循此套路:

  1.為一個(或兩個)輸入創建一個狀態對象。

  2.當從輸入源收到元素時,更新狀態。

  3.從另一個輸入接收元素后,檢索狀態並生成連接的結果。

實例

根據id將兩個流中的數據匹配在一起組合成新的流數據,默認兩個流的最大延遲時間為60s。超過60s還未匹配成功,意味着當前只有一個流來臨,則任務流信息異常,需要將數據側流輸出。

// 流1 要先按照id分組
DataStreamSource<String> sourceStream1 = env.addSource(consumer);
KeyedStream<String, Tuple> stream1 = sourceStream1.keyBy(1);
// 流2 要先按照id分組
DataStreamSource<String> sourceStream2 = env.addSource(consumer);
KeyedStream<String, Tuple> stream2 = sourceStream1.keyBy(1);

// 定義兩個側切流的outputTag
OutputTag<String> outputTag1 = new OutputTag<>("stream1");
OutputTag<String> outputTag2 = new OutputTag<>("stream2");
stream1.connect(stream2).process(new CoProcessFunction<String, String, Tuple2<String, String>>() {

    // 流1的狀態
    ValueState<String> state1;
    // 流2的狀態
    ValueState<String> state2;
    
    // 定義一個用於刪除定時器的狀態
    ValueState<Long> timeState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 初始化狀態
        state1 = getRuntimeContext().getState(new ValueStateDescriptor<>("state1", String.class));
        state2 = getRuntimeContext().getState(new ValueStateDescriptor<>("state2", String.class));
        timeState = getRuntimeContext().getState(new ValueStateDescriptor<>("timeState", Long.class));
    }
    
    // 流1的處理邏輯
    @Override
    public void processElement1(String value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
        String value2 = state2.value();
        // 流2不為空表示流2先來了,直接將兩個流拼接發到下游
        if (value2 != null) {
            out.collect(Tuple2.of(value, value2));
            // 清空流2對用的state信息
            state2.clear();
            // 流2來了就可以刪除定時器了,並把定時器的狀態清除
            ctx.timerService().deleteEventTimeTimer(timeState.value());
            timeState.clear();
        } else {
            // 流2還沒來,將流1放入state1中,
            state1.update(value);
            // 並注冊一個1分鍾的定時器,流1中的 eventTime + 60s
            long time = 1111L + 60000;
            timeState.update(time);
            ctx.timerService().registerEventTimeTimer(time);
        }
    }
    
    // 流2的處理邏輯與流1的處理邏輯類似
    @Override
    public void processElement2(String value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
        String value1 = state1.value();
        if (value1 != null) {
            out.collect(Tuple2.of(value1, value));
            state1.clear();
            ctx.timerService().deleteEventTimeTimer(timeState.value());
            timeState.clear();
        } else {
            state2.update(value);
            long time = 1111L + 60000;
            timeState.update(time);
            ctx.timerService().registerEventTimeTimer(time);
        }
    }
    
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
        super.onTimer(timestamp, ctx, out);
        // 定時器觸發了,即1分鍾內沒有收到兩個流
        // 流1不為空,則將流1側切輸出
        if (state1.value() != null) {
        ctx.output(outputTag1, state1.value());
        }
    
        // 流2不為空,則將流2側切輸出
        if (state2.value() != null) {
        ctx.output(outputTag2, state2.value());
        }
    
        state1.clear();
        state2.clear();
    }
});

注意:整體的邏輯思路是:
  流1先來,先把流1保存進流1的狀態;
  流2先來,先把流2保存進流2的狀態;
  再注冊一個60s的定時器,如果60s內流2來了,則把兩個流連接發送下游;如果60內流2沒有來,則把流1數據測流輸出
  流2的處理邏輯也是這樣。
  另外再加一個定時器的狀態,用於清除定時器,因為60s內如果另一個流數據來的話,此時已經不需要定時器了,及時刪除定時器。所以這里用了一個狀態標志定時器。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM