Flink使用connect實現雙流join全外連接


一、背景說明

在Flink中可以使用Window join或者Interval Join實現雙流join,不過使用join只能實現內連接,如果要實現左右連接或者外連接,則可以通過connect算子來實現。現有訂單數據及支付數據如下方說明,基於數據時間實現訂單及支付數據的關聯,超時或者缺失則由側輸出流輸出

//OrderLog.csv 訂單數據,首列為訂單id,付款成功則類型為pay(第二列),且生成支付id(第三列),最后列為時間
34729,create,,1558430842
34730,create,,1558430843
34729,pay,sd76f87d6,1558430844
34730,pay,3hu3k2432,1558430845
34731,pay,35jue34we,1558430849
...

//ReceiptLog.csv 支付數據,付款成功后生成,對於支付id/支付渠道/時間
sd76f87d6,wechat,1558430847
3hu3k2432,alipay,1558430848
...

//輸出結果,限定時間內到達數據實現關聯以Tuple2輸出,否則側輸出流輸出
(OrderEvent(orderId=34729, eventType=pay, txId=sd76f87d6, eventTime=1558430844),TxEvent(txId=sd76f87d6, payChannel=wechat, eventTime=1558430847))
(OrderEvent(orderId=34730, eventType=pay, txId=3hu3k2432, eventTime=1558430845),TxEvent(txId=3hu3k2432, payChannel=alipay, eventTime=1558430848))
No Receipt> 35jue34we 只有下單沒有到賬數據
...

二、實現過程

  1. connect算子簡單說明
    在這里插入圖片描述
    作用:兩個不同來源的數據流進行連接,實現數據匹配。可以連接兩個保持他們類型的數據流,兩個數據流被connect之后,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。

返回:DataStream[A], DataStream[B] -> ConnectedStreams[A,B]

//示例:
DataStreamSource<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);
DataStreamSource<String> stringStream = env.fromElements("a", "b", "c");
// 把兩個流連接在一起: 貌合神離
ConnectedStreams<Integer, String> cs = intStream.connect(stringStream);
cs.getFirstInput().print("first");
cs.getSecondInput().print("second");
env.execute();
  1. 實現思路說明

①建立執行環境。

②讀取數據映射到JavaBean,並指定WaterMark時間語義。
由於后續需要用到定時器來實現側輸出流,因此需要指定數據時間語義

③分組並使用connect關聯兩數據流。(關鍵步驟)

1.按支付id進行keyby,使相同id數據進入相同並行度處理。
2.由於使用到定時器,故使用process算子(支持定時器)分別對兩條流進行處理。
3.process中使用狀態編程(ValueState),對先到數據存入狀態,並建立定時器,另外一條流在限定時間內到達則輸出並刪除定時器,否則觸發定時器走側輸出流輸出。在定時器中,對未到數據均輸出到側輸出流則實現全外連接,對A到了B未到輸出及A未到B到了不做輸出則實現左連接,反之則是右連接效果。
ps:在connect中,無非對兩條流進行單獨的處理,在A流中處理B流未到時如何輸出,在B流中處理A流未到時如何輸入,建立定時器,能實現另一條流延遲到達的處理,如左連接、右連接或全外連接,如使用join算子,則只能實現內連接。

④打印並執行。

三、完整代碼

package com.test.ordermatch;

import bean.OrderEvent;
import bean.TxEvent;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @author: Rango
 * @create: 2021-06-08 11:03
 * @description: 訂單支付實時監控,兩個數據源的關聯
 **/
public class TestOrderMatch {
    public static void main(String[] args) throws Exception {

        //1.建立環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        //2.讀取數據,映射數據源到javabean,指定watermark時間語義
        WatermarkStrategy<OrderEvent> orderWMS = WatermarkStrategy.<OrderEvent>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {
            @Override
            public long extractTimestamp(OrderEvent element, long recordTimestamp) {
                return element.getEventTime() * 1000L;
            }});
        WatermarkStrategy<TxEvent> receiptWMS = WatermarkStrategy.<TxEvent>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<TxEvent>() {
                    @Override
                    public long extractTimestamp(TxEvent element, long recordTimestamp) {
                        return element.getEventTime() * 1000L;
                    }});

        //使用flatmap能實現篩選,map+filter代碼量更多,實現類型為pay的數據
        SingleOutputStreamOperator<OrderEvent> OrderDS = env.readTextFile("input/OrderLog.csv")
                .flatMap(new FlatMapFunction<String, OrderEvent>() {
                    @Override
                    public void flatMap(String value, Collector<OrderEvent> out) throws Exception {
                        String[] split = value.split(",");
                        OrderEvent orderEvent = new OrderEvent(Long.parseLong(split[0]), split[1], split[2], Long.parseLong(split[3]));
                        if (orderEvent.getEventType().equals("pay")){
                            out.collect(orderEvent);
                        }}})
                .assignTimestampsAndWatermarks(orderWMS);

        SingleOutputStreamOperator<TxEvent> ReceiptDS = env.readTextFile("input/ReceiptLog.csv")
                .map(new MapFunction<String, TxEvent>() {
                    @Override
                    public TxEvent map(String value) throws Exception {
                        String[] split = value.split(",");
                        return new TxEvent(split[0], split[1], Long.parseLong(split[2]));
                    }
        }).assignTimestampsAndWatermarks(receiptWMS);

        //3.數據處理
        //使用keyby,使同一id數據進入同一並行度,以Tuple2輸出
        SingleOutputStreamOperator<Tuple2<OrderEvent, TxEvent>> processDS = OrderDS.connect(ReceiptDS)
                .keyBy("txId", "txId")
                .process(new orderReceiptKeyProcessFunc());

        //4.打印執行
        processDS.print();
        processDS.getSideOutput(new OutputTag<String>("PayWithoutReceipt"){}).print("No Receipt");
        processDS.getSideOutput(new OutputTag<String>("ReceiptWithoutPay"){}).print("No Order");
        env.execute();
    }

    public static class orderReceiptKeyProcessFunc extends KeyedCoProcessFunction<String,OrderEvent,TxEvent, Tuple2<OrderEvent,TxEvent>>{

        private ValueState<OrderEvent> orderState;
        private ValueState<TxEvent> receiptState;
        private ValueState<Long> timeState;

        @Override
        public void open(Configuration parameters) throws Exception {
            orderState = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("order-state",OrderEvent.class));
            receiptState = getRuntimeContext().getState(new ValueStateDescriptor<TxEvent>("receipt-state",TxEvent.class));
            timeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("time-state",Long.class));
        }

        @Override
        public void processElement1(OrderEvent value, Context ctx, Collector<Tuple2<OrderEvent, TxEvent>> out) throws Exception {
            //在訂單流中,判斷支付流是否有數據,沒有則啟用定時器10秒后觸發輸出到側輸出流
            if (receiptState.value() == null){
                //支付數據未到,先把訂單數據放入狀態
                orderState.update(value);
                //建立定時器,10秒后觸發
                Long ts = (value.getEventTime() + 10) * 1000L;
                ctx.timerService().registerEventTimeTimer(ts);
                timeState.update(ts);
            }else{
                //支付數據已到,直接輸出到主流
                out.collect(new Tuple2<>(value,receiptState.value()));
                //刪除定時器,如支付流先到,支付流建立了的定時器,在這里刪除
                ctx.timerService().deleteEventTimeTimer(timeState.value());
                //清空狀態,注意清空的是支付狀態
                receiptState.clear();
                timeState.clear();
            }}
        @Override
        public void processElement2(TxEvent value, Context ctx, Collector<Tuple2<OrderEvent, TxEvent>> out) throws Exception {
            //在支付流中,判斷訂單流是否有數據,沒有則啟用定時器5秒后觸發輸出到側輸出流
            if (orderState.value() == null){
                //訂單數據未到,先把支付數據放入狀態
                receiptState.update(value);
                //建立定時器,5秒后再關聯
                Long ts = (value.getEventTime() + 5) * 1000L;
                ctx.timerService().registerEventTimeTimer(ts);
                timeState.update(ts);
            }else{
                //訂單數據已到,直接輸出到主流
                out.collect(new Tuple2<>(orderState.value(),value));
                //刪除定時器,如支付流先到,支付流建立了的定時器,在這里刪除
                ctx.timerService().deleteEventTimeTimer(timeState.value());
                //清空狀態,注意清空的是訂單狀態
                orderState.clear();
                timeState.clear();
            }}
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<OrderEvent, TxEvent>> out) throws Exception {
            //這樣為全外連接輸出,刪除else則是實現左連接,右連接則只輸出else部分即可
            if (orderState.value() != null){
                ctx.output(new OutputTag<String>("PayWithoutReceipt") {}
                        , orderState.value().getTxId() + " 只有下單沒有到賬數據");
            }else{
                ctx.output(new OutputTag<String>("ReceiptWithoutPay") {}
                        , receiptState.value().getTxId() + " 只有到賬無下單數據");
            }
            orderState.clear();
            receiptState.clear();
            timeState.clear();
        }
    }
}

學習交流,有任何問題還請隨時評論指出交流。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM