CoProcessFunction
對於兩條輸入流,DataStream API提供了CoProcessFunction這樣的low-level操作。CoProcessFunction提供了操作每一個輸入流的方法: processElement1()和processElement2()。
類似於ProcessFunction,這兩種方法都通過Context對象來調用。這個Context對象可以訪問事件數據,定時器時間戳,TimerService,以及side outputs。CoProcessFunction也提供了onTimer()回調函數。下面的例子展示了如何使用CoProcessFunction來合並兩條流。
實現低階join通常遵循此套路:
1.為一個(或兩個)輸入創建一個狀態對象。
2.當從輸入源收到元素時,更新狀態。
3.從另一個輸入接收元素后,檢索狀態並生成連接的結果。
實例
根據id將兩個流中的數據匹配在一起組合成新的流數據,默認兩個流的最大延遲時間為60s。超過60s還未匹配成功,意味着當前只有一個流來臨,則任務流信息異常,需要將數據側流輸出。
// 流1 要先按照id分組 DataStreamSource<String> sourceStream1 = env.addSource(consumer); KeyedStream<String, Tuple> stream1 = sourceStream1.keyBy(1); // 流2 要先按照id分組 DataStreamSource<String> sourceStream2 = env.addSource(consumer); KeyedStream<String, Tuple> stream2 = sourceStream1.keyBy(1); // 定義兩個側切流的outputTag OutputTag<String> outputTag1 = new OutputTag<>("stream1"); OutputTag<String> outputTag2 = new OutputTag<>("stream2");
stream1.connect(stream2).process(new CoProcessFunction<String, String, Tuple2<String, String>>() { // 流1的狀態 ValueState<String> state1; // 流2的狀態 ValueState<String> state2; // 定義一個用於刪除定時器的狀態 ValueState<Long> timeState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 初始化狀態 state1 = getRuntimeContext().getState(new ValueStateDescriptor<>("state1", String.class)); state2 = getRuntimeContext().getState(new ValueStateDescriptor<>("state2", String.class)); timeState = getRuntimeContext().getState(new ValueStateDescriptor<>("timeState", Long.class)); } // 流1的處理邏輯 @Override public void processElement1(String value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception { String value2 = state2.value(); // 流2不為空表示流2先來了,直接將兩個流拼接發到下游 if (value2 != null) { out.collect(Tuple2.of(value, value2)); // 清空流2對用的state信息 state2.clear(); // 流2來了就可以刪除定時器了,並把定時器的狀態清除 ctx.timerService().deleteEventTimeTimer(timeState.value()); timeState.clear(); } else { // 流2還沒來,將流1放入state1中, state1.update(value); // 並注冊一個1分鍾的定時器,流1中的 eventTime + 60s long time = 1111L + 60000; timeState.update(time); ctx.timerService().registerEventTimeTimer(time); } } // 流2的處理邏輯與流1的處理邏輯類似 @Override public void processElement2(String value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception { String value1 = state1.value(); if (value1 != null) { out.collect(Tuple2.of(value1, value)); state1.clear(); ctx.timerService().deleteEventTimeTimer(timeState.value()); timeState.clear(); } else { state2.update(value); long time = 1111L + 60000; timeState.update(time); ctx.timerService().registerEventTimeTimer(time); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception { super.onTimer(timestamp, ctx, out); // 定時器觸發了,即1分鍾內沒有收到兩個流 // 流1不為空,則將流1側切輸出 if (state1.value() != null) { ctx.output(outputTag1, state1.value()); } // 流2不為空,則將流2側切輸出 if (state2.value() != null) { ctx.output(outputTag2, state2.value()); } state1.clear(); state2.clear(); } });
注意:整體的邏輯思路是:
流1先來,先把流1保存進流1的狀態;
流2先來,先把流2保存進流2的狀態;
再注冊一個60s的定時器,如果60s內流2來了,則把兩個流連接發送下游;如果60內流2沒有來,則把流1數據測流輸出
流2的處理邏輯也是這樣。
另外再加一個定時器的狀態,用於清除定時器,因為60s內如果另一個流數據來的話,此時已經不需要定時器了,及時刪除定時器。所以這里用了一個狀態標志定時器。
