一、背景說明
在Flink中可以使用Window join或者Interval Join實現雙流join,不過使用join只能實現內連接,如果要實現左右連接或者外連接,則可以通過connect算子來實現。現有訂單數據及支付數據如下方說明,基於數據時間實現訂單及支付數據的關聯,超時或者缺失則由側輸出流輸出
//OrderLog.csv 訂單數據,首列為訂單id,付款成功則類型為pay(第二列),且生成支付id(第三列),最后列為時間
34729,create,,1558430842
34730,create,,1558430843
34729,pay,sd76f87d6,1558430844
34730,pay,3hu3k2432,1558430845
34731,pay,35jue34we,1558430849
...
//ReceiptLog.csv 支付數據,付款成功后生成,對於支付id/支付渠道/時間
sd76f87d6,wechat,1558430847
3hu3k2432,alipay,1558430848
...
//輸出結果,限定時間內到達數據實現關聯以Tuple2輸出,否則側輸出流輸出
(OrderEvent(orderId=34729, eventType=pay, txId=sd76f87d6, eventTime=1558430844),TxEvent(txId=sd76f87d6, payChannel=wechat, eventTime=1558430847))
(OrderEvent(orderId=34730, eventType=pay, txId=3hu3k2432, eventTime=1558430845),TxEvent(txId=3hu3k2432, payChannel=alipay, eventTime=1558430848))
No Receipt> 35jue34we 只有下單沒有到賬數據
...
二、實現過程
- connect算子簡單說明
作用:兩個不同來源的數據流進行連接,實現數據匹配。可以連接兩個保持他們類型的數據流,兩個數據流被connect之后,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。
返回:DataStream[A], DataStream[B] -> ConnectedStreams[A,B]
//示例:
DataStreamSource<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource<String> stringStream = env.fromElements("a", "b", "c");
// 把兩個流連接在一起: 貌合神離
ConnectedStreams<Integer, String> cs = intStream.connect(stringStream);
cs.getFirstInput().print("first");
cs.getSecondInput().print("second");
env.execute();
- 實現思路說明
①建立執行環境。
②讀取數據映射到JavaBean,並指定WaterMark時間語義。
由於后續需要用到定時器來實現側輸出流,因此需要指定數據時間語義
③分組並使用connect關聯兩數據流。(關鍵步驟)
1.按支付id進行keyby,使相同id數據進入相同並行度處理。
2.由於使用到定時器,故使用process算子(支持定時器)分別對兩條流進行處理。
3.process中使用狀態編程(ValueState),對先到數據存入狀態,並建立定時器,另外一條流在限定時間內到達則輸出並刪除定時器,否則觸發定時器走側輸出流輸出。在定時器中,對未到數據均輸出到側輸出流則實現全外連接,對A到了B未到輸出及A未到B到了不做輸出則實現左連接,反之則是右連接效果。
ps:在connect中,無非對兩條流進行單獨的處理,在A流中處理B流未到時如何輸出,在B流中處理A流未到時如何輸入,建立定時器,能實現另一條流延遲到達的處理,如左連接、右連接或全外連接,如使用join算子,則只能實現內連接。
④打印並執行。
三、完整代碼
package com.test.ordermatch;
import bean.OrderEvent;
import bean.TxEvent;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* @author: Rango
* @create: 2021-06-08 11:03
* @description: 訂單支付實時監控,兩個數據源的關聯
**/
public class TestOrderMatch {
public static void main(String[] args) throws Exception {
//1.建立環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//2.讀取數據,映射數據源到javabean,指定watermark時間語義
WatermarkStrategy<OrderEvent> orderWMS = WatermarkStrategy.<OrderEvent>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {
@Override
public long extractTimestamp(OrderEvent element, long recordTimestamp) {
return element.getEventTime() * 1000L;
}});
WatermarkStrategy<TxEvent> receiptWMS = WatermarkStrategy.<TxEvent>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<TxEvent>() {
@Override
public long extractTimestamp(TxEvent element, long recordTimestamp) {
return element.getEventTime() * 1000L;
}});
//使用flatmap能實現篩選,map+filter代碼量更多,實現類型為pay的數據
SingleOutputStreamOperator<OrderEvent> OrderDS = env.readTextFile("input/OrderLog.csv")
.flatMap(new FlatMapFunction<String, OrderEvent>() {
@Override
public void flatMap(String value, Collector<OrderEvent> out) throws Exception {
String[] split = value.split(",");
OrderEvent orderEvent = new OrderEvent(Long.parseLong(split[0]), split[1], split[2], Long.parseLong(split[3]));
if (orderEvent.getEventType().equals("pay")){
out.collect(orderEvent);
}}})
.assignTimestampsAndWatermarks(orderWMS);
SingleOutputStreamOperator<TxEvent> ReceiptDS = env.readTextFile("input/ReceiptLog.csv")
.map(new MapFunction<String, TxEvent>() {
@Override
public TxEvent map(String value) throws Exception {
String[] split = value.split(",");
return new TxEvent(split[0], split[1], Long.parseLong(split[2]));
}
}).assignTimestampsAndWatermarks(receiptWMS);
//3.數據處理
//使用keyby,使同一id數據進入同一並行度,以Tuple2輸出
SingleOutputStreamOperator<Tuple2<OrderEvent, TxEvent>> processDS = OrderDS.connect(ReceiptDS)
.keyBy("txId", "txId")
.process(new orderReceiptKeyProcessFunc());
//4.打印執行
processDS.print();
processDS.getSideOutput(new OutputTag<String>("PayWithoutReceipt"){}).print("No Receipt");
processDS.getSideOutput(new OutputTag<String>("ReceiptWithoutPay"){}).print("No Order");
env.execute();
}
public static class orderReceiptKeyProcessFunc extends KeyedCoProcessFunction<String,OrderEvent,TxEvent, Tuple2<OrderEvent,TxEvent>>{
private ValueState<OrderEvent> orderState;
private ValueState<TxEvent> receiptState;
private ValueState<Long> timeState;
@Override
public void open(Configuration parameters) throws Exception {
orderState = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("order-state",OrderEvent.class));
receiptState = getRuntimeContext().getState(new ValueStateDescriptor<TxEvent>("receipt-state",TxEvent.class));
timeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("time-state",Long.class));
}
@Override
public void processElement1(OrderEvent value, Context ctx, Collector<Tuple2<OrderEvent, TxEvent>> out) throws Exception {
//在訂單流中,判斷支付流是否有數據,沒有則啟用定時器10秒后觸發輸出到側輸出流
if (receiptState.value() == null){
//支付數據未到,先把訂單數據放入狀態
orderState.update(value);
//建立定時器,10秒后觸發
Long ts = (value.getEventTime() + 10) * 1000L;
ctx.timerService().registerEventTimeTimer(ts);
timeState.update(ts);
}else{
//支付數據已到,直接輸出到主流
out.collect(new Tuple2<>(value,receiptState.value()));
//刪除定時器,如支付流先到,支付流建立了的定時器,在這里刪除
ctx.timerService().deleteEventTimeTimer(timeState.value());
//清空狀態,注意清空的是支付狀態
receiptState.clear();
timeState.clear();
}}
@Override
public void processElement2(TxEvent value, Context ctx, Collector<Tuple2<OrderEvent, TxEvent>> out) throws Exception {
//在支付流中,判斷訂單流是否有數據,沒有則啟用定時器5秒后觸發輸出到側輸出流
if (orderState.value() == null){
//訂單數據未到,先把支付數據放入狀態
receiptState.update(value);
//建立定時器,5秒后再關聯
Long ts = (value.getEventTime() + 5) * 1000L;
ctx.timerService().registerEventTimeTimer(ts);
timeState.update(ts);
}else{
//訂單數據已到,直接輸出到主流
out.collect(new Tuple2<>(orderState.value(),value));
//刪除定時器,如支付流先到,支付流建立了的定時器,在這里刪除
ctx.timerService().deleteEventTimeTimer(timeState.value());
//清空狀態,注意清空的是訂單狀態
orderState.clear();
timeState.clear();
}}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<OrderEvent, TxEvent>> out) throws Exception {
//這樣為全外連接輸出,刪除else則是實現左連接,右連接則只輸出else部分即可
if (orderState.value() != null){
ctx.output(new OutputTag<String>("PayWithoutReceipt") {}
, orderState.value().getTxId() + " 只有下單沒有到賬數據");
}else{
ctx.output(new OutputTag<String>("ReceiptWithoutPay") {}
, receiptState.value().getTxId() + " 只有到賬無下單數據");
}
orderState.clear();
receiptState.clear();
timeState.clear();
}
}
}
學習交流,有任何問題還請隨時評論指出交流。