flink-----實時項目---day06-------1. 獲取窗口遲到的數據 2.雙流join(inner join和left join(有點小問題)) 3 訂單Join案例(訂單數據接入到kafka,訂單數據的join實現,訂單數據和遲到數據join的實現)


1. 獲取窗口遲到的數據

  主要流程就是給遲到的數據打上標簽,然后使用相應窗口流的實例調用sideOutputLateData(lateDataTag),從而獲得窗口遲到的數據,進而進行相關的計算,具體代碼見下

WindowLateDataDemo

package cn._51doit.flink.day10;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;

public class WindowLateDataDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 數據類型:時間戳,單詞,獲取時間戳
        SingleOutputStreamOperator<String> assignTimestampsAndWatermarks = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(2)) {
            @Override
            public long extractTimestamp(String element) {
                String stampTime = element.split(",")[0];
                return Long.parseLong(stampTime);
            }
        });
        // 切分數據
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = assignTimestampsAndWatermarks.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                String word = line.split(",")[1];
                return Tuple2.of(word, 1);
            }
        });
        // 按照key分組
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
        OutputTag<Tuple2<String, Integer>> lateDataTag = new OutputTag<Tuple2<String, Integer>>("late-data"){};
        // 划分窗口並進行計算
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = window.sideOutputLateData(lateDataTag).sum(1);
        // 獲取遲到數據的測流
        DataStream<Tuple2<String, Integer>> lateDataStream = summed.getSideOutput(lateDataTag);
        summed.print("准時的數據: ");
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = summed.union(lateDataStream).keyBy(0).sum(1);
        result.print();
        env.execute();
    }
}
View Code

 2.雙流join

 補充:

  Join、CoGroup和CoFlatMap這三個運算符都能夠將雙數據流轉換為單個數據流。Join和CoGroup會根據指定的條件進行數據配對操作,不同的是Join只輸出匹配成功的數據對,CoGroup無論是否有匹配都會輸出。CoFlatMap沒有匹配操作,只是分別去接收兩個流的輸入。 

 

  簡單來說,就是兩個數據流進行join操作,類似mysql中的join,其也包含inner join、left join、right join、full join等,以下是具體例子

  • inner join

  此處案例是先獲取兩個數據流,然后分別設置水位線,在進行join操作,從而得到joinedStream實例,然后調用joinStream中的where、equalTo方法,即可join到key相同的數據(注意此處需要自己定義key選擇器),接着就是划分窗口,處理窗口中的數據

注意:條件相同並且在同一個窗口中的數據才能被join上

知識點:

  一個窗口會有很多分區,只有當每個分區中的時間都滿足窗口的觸發條件,窗口才會被觸發,所以下面的測試為了方便都將並行度設置為1

 StreamDataSourceA

package cn._51doit.flink.day10;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class StreamDataSourceA extends RichParallelSourceFunction<Tuple3<String, String, Long>> {
    private volatile boolean running = true;

    @Override
    public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws InterruptedException {
        //事先准備好的數據
        Tuple3[] elements = new Tuple3[]{
                Tuple3.of("a", "1", 1000000050000L), //[50000 - 60000)
                Tuple3.of("a", "2", 1000000054000L), //[50000 - 60000)
                Tuple3.of("a", "3", 1000000079900L), //[70000 - 80000)
                Tuple3.of("a", "4", 1000000115000L), //[110000 - 120000)  // 115000 - 5001 = 109999 >= 109999
                Tuple3.of("b", "5", 1000000100000L), //[100000 - 110000)
                Tuple3.of("b", "6", 1000000108000L)  //[100000 - 110000)
        };
        int count = 0;
        while (running && count < elements.length) {
            //將數據發出去
            ctx.collect(new Tuple3<>((String) elements[count].f0, (String) elements[count].f1, (Long) elements[count].f2));
            count++;
            Thread.sleep(1000);
        }
        //Thread.sleep(100000000);
    }
    @Override
    public void cancel() {
        running = false;
    }
}
View Code

 StreamDataSourceB

package cn._51doit.flink.day10;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class StreamDataSourceB extends RichParallelSourceFunction<Tuple3<String, String, Long>> {
    private volatile boolean running = true;
    @Override
    public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
        // a ,1 hangzhou
        Tuple3[] elements = new Tuple3[]{
                Tuple3.of("a", "hangzhou", 1000000059000L), //[50000, 60000)
                Tuple3.of("b", "beijing",  1000000105000L), //[100000, 110000)
        };
        int count = 0;
        while (running && count < elements.length) {
            //將數據發出去
            ctx.collect(new Tuple3<>((String) elements[count].f0, (String) elements[count].f1, (long) elements[count].f2));
            count++;
            Thread.sleep(1000);
        }
        //Thread.sleep(100000000);
    }
    @Override
    public void cancel() {
        running = false;
    }
}
View Code

 FlinkTumblingWindowsInnerJoinDemo

package cn._51doit.flink.day10;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.JoinedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class FlinkTumblingWindowsInnerJoinDemo {
    public static void main(String[] args) throws Exception {
        int windowSize = 10;
        long delay = 5001L;
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 設置時間類型
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //通過Env設置並行度為1,即以后所有的DataStream的並行都是1
        env.setParallelism(1);
        // 設置數據源
        //第一個流(左流)
        DataStream<Tuple3<String, String, Long>> leftSource = env.addSource(new StreamDataSourceA()).name("Demo SourceA");
        //第二個流(右流)
        DataStream<Tuple3<String, String, Long>> rightSource = env.addSource(new StreamDataSourceB()).name("Demo SourceB");
        // 對leftSource設置水位線
        // ("a", "1", 1000)
        SingleOutputStreamOperator<Tuple3<String, String, Long>> leftStream = leftSource.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple3<String, String, Long> element) {
                return element.f2;
            }
        });
        // 對rightSource設置水位線
        // ("a", "hangzhou", 6000)
        SingleOutputStreamOperator<Tuple3<String, String, Long>> rightStream = rightSource.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) {
            @Override
            public long extractTimestamp(Tuple3<String, String, Long> element) {
                return element.f2;
            }
        });

        // join操作
        JoinedStreams<Tuple3<String, String, Long>, Tuple3<String, String, Long>> joinedStream = leftStream.join(rightStream);
        joinedStream
                .where(new LeftSelectKey())
                .equalTo(new RightSelectKey())
                .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
                .apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, Tuple5<String, String, String, Long, Long>>() {
                    // 兩個流的key的值相等,並且在同一個窗口內
                    @Override
                    public Tuple5<String, String, String, Long, Long> join(Tuple3<String, String, Long> first, Tuple3<String, String, Long> second) throws Exception {
                        // a, 1, "hangzhou", 1000001000, 1000006000
                        return new Tuple5<>(first.f0, first.f1, second.f1, first.f2, second.f2);
                    }
                }).print();
        env.execute();
    }

    public static class LeftSelectKey implements KeySelector<Tuple3<String, String, Long>,String>{
        @Override
        public String getKey(Tuple3<String, String, Long> w) throws Exception {
            return w.f0;
        }
    }
    public static class RightSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
        @Override
        public String getKey(Tuple3<String, String, Long> w) {
            return w.f0;
        }
    }

}
View Code

運行結果

 

 為什么b沒有被join上?

 

  •  left join

FlinkTumblingWindowLeftJoinDemo

package cn._51doit.flink.day10;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class FlinkTumblingWindowsLeftJoinDemo {
    public static void main(String[] args) throws Exception {
        int windowSize = 10;
        long delay = 5000L;
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // 設置數據源
        DataStream<Tuple3<String, String, Long>> leftSource = env.addSource(new StreamDataSourceA()).name("Demo SourceA");
        DataStream<Tuple3<String, String, Long>> rightSource = env.addSource(new StreamDataSourceB()).name("Demo SourceB");
        // 設置水位線
        DataStream<Tuple3<String, String, Long>> leftStream = leftSource.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element) {
                        return element.f2;
                    }
                }
        );

        DataStream<Tuple3<String, String, Long>> rightStream = rightSource.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element) {
                        return element.f2;
                    }
                }
        );
        int parallelism1 = leftStream.getParallelism();
        int parallelism2 = rightStream.getParallelism();
        // left join 操作
        leftStream.coGroup(rightStream)
                .where(new LeftSelectKey())
                .equalTo(new RightSelectKey())
                .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
                .apply(new LeftJoin())
                .print();

        env.execute("TimeWindowDemo");
    }

    public static class LeftJoin implements CoGroupFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, Tuple5<String, String, String, Long, Long>> {
        //coGroup左流的數據和有流的數據取出來,可以將key相同,並且在同一個窗口的數據取出來
        @Override
        public void coGroup(
                Iterable<Tuple3<String, String, Long>> leftElements, Iterable<Tuple3<String, String, Long>> rightElements, Collector<Tuple5<String, String, String, Long, Long>> out) {
            //leftElements是左流的數據
            for (Tuple3<String, String, Long> leftElem : leftElements) {
                boolean hadElements = false;
                //如果左邊的流join上了右邊的流rightElements就不為空
                for (Tuple3<String, String, Long> rightElem : rightElements) {
                    //將join上的數據輸出
                    out.collect(new Tuple5<>(leftElem.f0, leftElem.f1, rightElem.f1, leftElem.f2, rightElem.f2));
                    hadElements = true;
                }
                if (!hadElements) {
                    //沒join上,給右邊的數據賦空值
                    out.collect(new Tuple5<>(leftElem.f0, leftElem.f1, "null", leftElem.f2, -1L));
                }
            }
        }
    }

    public static class LeftSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
        @Override
        public String getKey(Tuple3<String, String, Long> w) {
            return w.f0;
        }
    }

    public static class RightSelectKey implements KeySelector<Tuple3<String, String, Long>, String> {
        @Override
        public String getKey(Tuple3<String, String, Long> w) {
            return w.f0;
        }
    }

}
View Code

做法和join類似,只是此處使用coGroup算子

 

3. 訂單Join案例

在各種各樣的系統中,都訂單數據表
    訂單表:訂單主表、訂單明細表

    訂單主表:
        訂單id、訂單狀態、訂單總金額、訂單的時間、用戶ID

    訂單明細表:
        訂單主表的ID、商品ID、商品的分類ID、商品的單價、商品的數量

統計某個分類的成交金額
    訂單狀態為成交的(主表中)
    商品的分類ID(明細表中)
    商品的金額即單價*數量(明細表中)

在京東或淘寶,下來一個單
    o1000,已支付,600,2020-06-29 19:42:00,feng
    p1, 100, 1, 食品
    p1, 200, 1, 食品
    p1, 300, 1, 食品

3.1 訂單數據的join實現

讀取kafka中topic的數據的工具類

package cn._51doit.flink.day10;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class FlinkUtilsV2 {

    private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


    public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters, Class<? extends DeserializationSchema<T>> clazz) throws Exception {
        String topics = parameters.getRequired("kafka.topics");
        String groupId = parameters.getRequired("group.id");
        return createKafkaDataStream(parameters, topics, groupId, clazz);
    }



    public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters, String topics, Class<? extends DeserializationSchema<T>> clazz) throws Exception {

        String groupId = parameters.getRequired("group.id");
        return createKafkaDataStream(parameters, topics, groupId, clazz);
    }


    public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters, String topics, String groupId, Class<? extends DeserializationSchema<T>> clazz) throws Exception {

        //將ParameterTool的參數設置成全局的參數
        env.getConfig().setGlobalJobParameters(parameters);

        //開啟checkpoint
        env.enableCheckpointing(parameters.getLong("checkpoint.interval", 10000L), CheckpointingMode.EXACTLY_ONCE);

        //重啟策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(parameters.getInt("restart.times", 10), Time.seconds(5)));

        //設置statebackend
        String path = parameters.get("state.backend.path");
        if(path != null) {
            //最好的方式將setStateBackend配置到Flink的全局配置文件中flink-conf.yaml
            env.setStateBackend(new FsStateBackend(path));
        }

        //設置cancel任務不用刪除checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.getCheckpointConfig().setMaxConcurrentCheckpoints(3);

        //String topics = parameters.getRequired("kafka.topics");

        List<String> topicList = Arrays.asList(topics.split(","));

        Properties properties = parameters.getProperties();

        properties.setProperty("group.id", groupId);

        //創建FlinkKafkaConsumer
        FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<T>(
                topicList,
                clazz.newInstance(),
                properties
        );

        return env.addSource(kafkaConsumer);
    }

    public static StreamExecutionEnvironment getEnv() {
        return env;
    }

}
View Code

OrderDetail

package cn._51doit.flink.day10;

import java.util.Date;

public class OrderDetail {

    private Long id;
    private Long order_id;
    private int category_id;
    private String categoryName;
    private Long sku;
    private Double money;
    private int amount;
    private Date create_time;
    private Date update_time;

    //對數據庫的操作類型:INSERT、UPDATE
    private String type;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public Long getOrder_id() {
        return order_id;
    }

    public void setOrder_id(Long order_id) {
        this.order_id = order_id;
    }

    public int getCategory_id() {
        return category_id;
    }

    public void setCategory_id(int category_id) {
        this.category_id = category_id;
    }

    public Long getSku() {
        return sku;
    }

    public void setSku(Long sku) {
        this.sku = sku;
    }

    public Double getMoney() {
        return money;
    }

    public void setMoney(Double money) {
        this.money = money;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public Date getCreate_time() {
        return create_time;
    }

    public void setCreate_time(Date create_time) {
        this.create_time = create_time;
    }

    public Date getUpdate_time() {
        return update_time;
    }

    public void setUpdate_time(Date update_time) {
        this.update_time = update_time;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getCategoryName() {
        return categoryName;
    }

    public void setCategoryName(String categoryName) {
        this.categoryName = categoryName;
    }

    @Override
    public String toString() {
        return "OrderDetail{" +
                "id=" + id +
                ", order_id=" + order_id +
                ", category_id=" + category_id +
                ", categoryName='" + categoryName + '\'' +
                ", sku=" + sku +
                ", money=" + money +
                ", amount=" + amount +
                ", create_time=" + create_time +
                ", update_time=" + update_time +
                ", type='" + type + '\'' +
                '}';
    }
}
View Code

OrderMain

package cn._51doit.flink.day10;

import java.util.Date;

public class OrderMain {

    private Long oid;
    private Date create_time;
    private Double total_money;
    private int status;
    private Date update_time;
    private String province;
    private String city;
    //對數據庫的操作類型:INSERT、UPDATE
    private String type;

    public Long getOid() {
        return oid;
    }

    public void setOid(Long oid) {
        this.oid = oid;
    }

    public Date getCreate_time() {
        return create_time;
    }

    public void setCreate_time(Date create_time) {
        this.create_time = create_time;
    }

    public Double getTotal_money() {
        return total_money;
    }

    public void setTotal_money(Double total_money) {
        this.total_money = total_money;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public Date getUpdate_time() {
        return update_time;
    }

    public void setUpdate_time(Date update_time) {
        this.update_time = update_time;
    }

    public String getProvince() {
        return province;
    }

    public void setProvince(String province) {
        this.province = province;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    @Override
    public String toString() {
        return "OrderMain{" +
                "oid=" + oid +
                ", create_time=" + create_time +
                ", total_money=" + total_money +
                ", status=" + status +
                ", update_time=" + update_time +
                ", province='" + province + '\'' +
                ", city='" + city + '\'' +
                ", type='" + type + '\'' +
                '}';
    }
}
View Code

業務代碼

OrderJoin

package cn._51doit.flink.day10;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class OrderJoin {


    public static void main(String[] args) throws Exception {

        ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]);

        //使用EventTime作為時間標准
        FlinkUtilsV2.getEnv().setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> orderMainLinesDataStream = FlinkUtilsV2.createKafkaDataStream(parameters, "ordermain", "g1", SimpleStringSchema.class);

        DataStream<String> orderDetailLinesDataStream = FlinkUtilsV2.createKafkaDataStream(parameters, "orderdetail", "g1", SimpleStringSchema.class);

        //對數據進行解析
        SingleOutputStreamOperator<OrderMain> orderMainDataStream = orderMainLinesDataStream.process(new ProcessFunction<String, OrderMain>() {

            @Override
            public void processElement(String line, Context ctx, Collector<OrderMain> out) throws Exception {
                //flatMap+filter
                try {
                    JSONObject jsonObject = JSON.parseObject(line);
                    String type = jsonObject.getString("type");
                    if (type.equals("INSERT") || type.equals("UPDATE")) {
                        JSONArray jsonArray = jsonObject.getJSONArray("data");
                        for (int i = 0; i < jsonArray.size(); i++) {
                            OrderMain orderMain = jsonArray.getObject(i, OrderMain.class);
                            orderMain.setType(type); //設置操作類型
                            out.collect(orderMain);
                        }
                    }
                } catch (Exception e) {
                    //e.printStackTrace();
                    //記錄錯誤的數據
                }
            }
        });

        //對數據進行解析
        SingleOutputStreamOperator<OrderDetail> orderDetailDataStream = orderDetailLinesDataStream.process(new ProcessFunction<String, OrderDetail>() {

            @Override
            public void processElement(String line, Context ctx, Collector<OrderDetail> out) throws Exception {
                //flatMap+filter
                try {
                    JSONObject jsonObject = JSON.parseObject(line);
                    String type = jsonObject.getString("type");
                    if (type.equals("INSERT") || type.equals("UPDATE")) {
                        JSONArray jsonArray = jsonObject.getJSONArray("data");
                        for (int i = 0; i < jsonArray.size(); i++) {
                            OrderDetail orderDetail = jsonArray.getObject(i, OrderDetail.class);
                            orderDetail.setType(type); //設置操作類型
                            out.collect(orderDetail);
                        }
                    }
                } catch (Exception e) {
                    //e.printStackTrace();
                    //記錄錯誤的數據
                }
            }
        });

        int delaySeconds = 2;

        //提取EventTime生成WaterMark
        SingleOutputStreamOperator<OrderMain> orderMainStreamWithWaterMark = orderMainDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderMain>(Time.seconds(delaySeconds)) {
            @Override
            public long extractTimestamp(OrderMain element) {
                return element.getCreate_time().getTime();
            }
        });

        SingleOutputStreamOperator<OrderDetail> orderDetailStreamWithWaterMark = orderDetailDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderDetail>(Time.seconds(delaySeconds)) {
            @Override
            public long extractTimestamp(OrderDetail element) {
                return element.getCreate_time().getTime();
            }
        });

        //Left Out JOIN,並且將訂單明細表作為左表
        DataStream<Tuple2<OrderDetail, OrderMain>> joined = orderDetailStreamWithWaterMark.coGroup(orderMainStreamWithWaterMark)
                .where(new KeySelector<OrderDetail, Long>() {
                    @Override
                    public Long getKey(OrderDetail value) throws Exception {
                        return value.getOrder_id();
                    }
                })
                .equalTo(new KeySelector<OrderMain, Long>() {
                    @Override
                    public Long getKey(OrderMain value) throws Exception {
                        return value.getOid();
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new CoGroupFunction<OrderDetail, OrderMain, Tuple2<OrderDetail, OrderMain>>() {

                    @Override
                    public void coGroup(Iterable<OrderDetail> first, Iterable<OrderMain> second, Collector<Tuple2<OrderDetail, OrderMain>> out) throws Exception {

                        for (OrderDetail orderDetail : first) {
                            boolean isJoined = false;
                            for (OrderMain orderMain : second) {
                                out.collect(Tuple2.of(orderDetail, orderMain));
                                isJoined = true;
                            }
                            if (!isJoined) {
                                out.collect(Tuple2.of(orderDetail, null));
                            }
                        }
                    }
                });

        joined.print();

        FlinkUtilsV2.getEnv().execute();
    }
}
View Code

 

3.2 訂單數據和遲到數據join的實現

 OrderJoinAdv

package cn._51doit.flink.day10;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.sql.Connection;

public class OrderJoinAdv {


    public static void main(String[] args) throws Exception {

        ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]);

        //使用EventTime作為時間標准
        FlinkUtilsV2.getEnv().setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> orderMainLinesDataStream = FlinkUtilsV2.createKafkaDataStream(parameters, "ordermain", "g1", SimpleStringSchema.class);

        DataStream<String> orderDetailLinesDataStream = FlinkUtilsV2.createKafkaDataStream(parameters, "orderdetail", "g1", SimpleStringSchema.class);

        //對數據進行解析
        SingleOutputStreamOperator<OrderMain> orderMainDataStream = orderMainLinesDataStream.process(new ProcessFunction<String, OrderMain>() {

            @Override
            public void processElement(String line, Context ctx, Collector<OrderMain> out) throws Exception {
                //flatMap+filter
                try {
                    JSONObject jsonObject = JSON.parseObject(line);
                    String type = jsonObject.getString("type");
                    if (type.equals("INSERT") || type.equals("UPDATE")) {
                        JSONArray jsonArray = jsonObject.getJSONArray("data");
                        for (int i = 0; i < jsonArray.size(); i++) {
                            OrderMain orderMain = jsonArray.getObject(i, OrderMain.class);
                            orderMain.setType(type); //設置操作類型
                            out.collect(orderMain);
                        }
                    }
                } catch (Exception e) {
                    //e.printStackTrace();
                    //記錄錯誤的數據
                }
            }
        });

        //對數據進行解析
        SingleOutputStreamOperator<OrderDetail> orderDetailDataStream = orderDetailLinesDataStream.process(new ProcessFunction<String, OrderDetail>() {

            @Override
            public void processElement(String line, Context ctx, Collector<OrderDetail> out) throws Exception {
                //flatMap+filter
                try {
                    JSONObject jsonObject = JSON.parseObject(line);
                    String type = jsonObject.getString("type");
                    if (type.equals("INSERT") || type.equals("UPDATE")) {
                        JSONArray jsonArray = jsonObject.getJSONArray("data");
                        for (int i = 0; i < jsonArray.size(); i++) {
                            OrderDetail orderDetail = jsonArray.getObject(i, OrderDetail.class);
                            orderDetail.setType(type); //設置操作類型
                            out.collect(orderDetail);
                        }
                    }
                } catch (Exception e) {
                    //e.printStackTrace();
                    //記錄錯誤的數據
                }
            }
        });

        int delaySeconds = 2;
        int windowSize = 5;

        //提取EventTime生成WaterMark
        SingleOutputStreamOperator<OrderMain> orderMainStreamWithWaterMark = orderMainDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderMain>(Time.seconds(delaySeconds)) {
            @Override
            public long extractTimestamp(OrderMain element) {
                return element.getCreate_time().getTime();
            }
        });

        SingleOutputStreamOperator<OrderDetail> orderDetailStreamWithWaterMark = orderDetailDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderDetail>(Time.seconds(delaySeconds)) {
            @Override
            public long extractTimestamp(OrderDetail element) {
                return element.getCreate_time().getTime();
            }
        });

        //定義遲到側流輸出的Tag
        OutputTag<OrderDetail> lateTag = new OutputTag<OrderDetail>("late-date") {};

        //對左表進行單獨划分窗口,窗口的長度與cogroup的窗口長度一樣
        SingleOutputStreamOperator<OrderDetail> orderDetailWithWindow = orderDetailStreamWithWaterMark.windowAll(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
                .sideOutputLateData(lateTag) //將遲到的數據打上Tag
                .apply(new AllWindowFunction<OrderDetail, OrderDetail, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<OrderDetail> values, Collector<OrderDetail> out) throws Exception {
                        for (OrderDetail value : values) {
                            //out.collect(value);
                        }
                    }
                });

        //獲取遲到的數據
        DataStream<OrderDetail> lateOrderDetailStream = orderDetailWithWindow.getSideOutput(lateTag);

        //應為orderDetail表的數據遲到數據不是很多,沒必要使用異步IO,直接使用RichMapFunction
        SingleOutputStreamOperator<Tuple2<OrderDetail, OrderMain>> lateOrderDetailAndOrderMain = lateOrderDetailStream.map(new RichMapFunction<OrderDetail, Tuple2<OrderDetail, OrderMain>>() {

            @Override
            public void open(Configuration parameters) throws Exception {
                //創建書庫的連接
            }

            @Override
            public Tuple2<OrderDetail, OrderMain> map(OrderDetail value) throws Exception {
                return null;
            }

            @Override
            public void close() throws Exception {
                //關閉數據庫的連接
            }
        });


        //Left Out JOIN,並且將訂單明細表作為左表
        DataStream<Tuple2<OrderDetail, OrderMain>> joined = orderDetailWithWindow.coGroup(orderMainStreamWithWaterMark)
                .where(new KeySelector<OrderDetail, Long>() {
                    @Override
                    public Long getKey(OrderDetail value) throws Exception {
                        return value.getOrder_id();
                    }
                })
                .equalTo(new KeySelector<OrderMain, Long>() {
                    @Override
                    public Long getKey(OrderMain value) throws Exception {
                        return value.getOid();
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
                .apply(new CoGroupFunction<OrderDetail, OrderMain, Tuple2<OrderDetail, OrderMain>>() {

                    @Override
                    public void coGroup(Iterable<OrderDetail> first, Iterable<OrderMain> second, Collector<Tuple2<OrderDetail, OrderMain>> out) throws Exception {

                        for (OrderDetail orderDetail : first) {
                            boolean isJoined = false;
                            for (OrderMain orderMain : second) {
                                out.collect(Tuple2.of(orderDetail, orderMain));
                                isJoined = true;
                            }
                            if (!isJoined) {
                                out.collect(Tuple2.of(orderDetail, null));
                            }
                        }
                    }
                });

        //join后,有可orderMain沒有join上
        SingleOutputStreamOperator<Tuple2<OrderDetail, OrderMain>> punctualOrderDetailAndOrderMain = joined.map(new RichMapFunction<Tuple2<OrderDetail, OrderMain>, Tuple2<OrderDetail, OrderMain>>() {

            private transient Connection connection;

            @Override
            public void open(Configuration parameters) throws Exception {
                //打開數據庫連接
            }

            @Override
            public Tuple2<OrderDetail, OrderMain> map(Tuple2<OrderDetail, OrderMain> tp) throws Exception {
                //每個關聯上訂單主表的數據,就查詢書庫
                if (tp.f1 == null) {
                    OrderMain orderMain = queryOrderMainFromMySQL(tp.f0.getOrder_id(), connection);
                    tp.f1 = orderMain;
                }
                return tp;
            }

            @Override
            public void close() throws Exception {
                //關閉數據庫連接
            }
        });

        //將准時的和遲到的UNION到一起
        DataStream<Tuple2<OrderDetail, OrderMain>> allOrderStream = punctualOrderDetailAndOrderMain.union(lateOrderDetailAndOrderMain);

        //根據具體的場景,寫入到Kafka、Hbase、ES、ClickHouse
        allOrderStream.print();

        FlinkUtilsV2.getEnv().execute();
    }

    private static OrderMain queryOrderMainFromMySQL(Long order_id, Connection connection) {
        return null;
    }
}
View Code

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM