1. 獲取窗口遲到的數據
主要流程就是給遲到的數據打上標簽,然后使用相應窗口流的實例調用sideOutputLateData(lateDataTag),從而獲得窗口遲到的數據,進而進行相關的計算,具體代碼見下
WindowLateDataDemo

package cn._51doit.flink.day10; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.OutputTag; public class WindowLateDataDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("feng05", 8888); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 數據類型:時間戳,單詞,獲取時間戳 SingleOutputStreamOperator<String> assignTimestampsAndWatermarks = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(2)) { @Override public long extractTimestamp(String element) { String stampTime = element.split(",")[0]; return Long.parseLong(stampTime); } }); // 切分數據 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = assignTimestampsAndWatermarks.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String line) throws Exception { String word = line.split(",")[1]; return Tuple2.of(word, 1); } }); // 按照key分組 KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); OutputTag<Tuple2<String, Integer>> lateDataTag = new OutputTag<Tuple2<String, Integer>>("late-data"){}; // 划分窗口並進行計算 WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5))); SingleOutputStreamOperator<Tuple2<String, Integer>> summed = window.sideOutputLateData(lateDataTag).sum(1); // 獲取遲到數據的測流 DataStream<Tuple2<String, Integer>> lateDataStream = summed.getSideOutput(lateDataTag); summed.print("准時的數據: "); SingleOutputStreamOperator<Tuple2<String, Integer>> result = summed.union(lateDataStream).keyBy(0).sum(1); result.print(); env.execute(); } }
2.雙流join
補充:
Join、CoGroup和CoFlatMap這三個運算符都能夠將雙數據流轉換為單個數據流。Join和CoGroup會根據指定的條件進行數據配對操作,不同的是Join只輸出匹配成功的數據對,CoGroup無論是否有匹配都會輸出。CoFlatMap沒有匹配操作,只是分別去接收兩個流的輸入。
簡單來說,就是兩個數據流進行join操作,類似mysql中的join,其也包含inner join、left join、right join、full join等,以下是具體例子
- inner join
此處案例是先獲取兩個數據流,然后分別設置水位線,在進行join操作,從而得到joinedStream實例,然后調用joinStream中的where、equalTo方法,即可join到key相同的數據(注意此處需要自己定義key選擇器),接着就是划分窗口,處理窗口中的數據
注意:條件相同並且在同一個窗口中的數據才能被join上
知識點:
一個窗口會有很多分區,只有當每個分區中的時間都滿足窗口的觸發條件,窗口才會被觸發,所以下面的測試為了方便都將並行度設置為1
StreamDataSourceA

package cn._51doit.flink.day10; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; public class StreamDataSourceA extends RichParallelSourceFunction<Tuple3<String, String, Long>> { private volatile boolean running = true; @Override public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws InterruptedException { //事先准備好的數據 Tuple3[] elements = new Tuple3[]{ Tuple3.of("a", "1", 1000000050000L), //[50000 - 60000) Tuple3.of("a", "2", 1000000054000L), //[50000 - 60000) Tuple3.of("a", "3", 1000000079900L), //[70000 - 80000) Tuple3.of("a", "4", 1000000115000L), //[110000 - 120000) // 115000 - 5001 = 109999 >= 109999 Tuple3.of("b", "5", 1000000100000L), //[100000 - 110000) Tuple3.of("b", "6", 1000000108000L) //[100000 - 110000) }; int count = 0; while (running && count < elements.length) { //將數據發出去 ctx.collect(new Tuple3<>((String) elements[count].f0, (String) elements[count].f1, (Long) elements[count].f2)); count++; Thread.sleep(1000); } //Thread.sleep(100000000); } @Override public void cancel() { running = false; } }
StreamDataSourceB

package cn._51doit.flink.day10; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; public class StreamDataSourceB extends RichParallelSourceFunction<Tuple3<String, String, Long>> { private volatile boolean running = true; @Override public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception { // a ,1 hangzhou Tuple3[] elements = new Tuple3[]{ Tuple3.of("a", "hangzhou", 1000000059000L), //[50000, 60000) Tuple3.of("b", "beijing", 1000000105000L), //[100000, 110000) }; int count = 0; while (running && count < elements.length) { //將數據發出去 ctx.collect(new Tuple3<>((String) elements[count].f0, (String) elements[count].f1, (long) elements[count].f2)); count++; Thread.sleep(1000); } //Thread.sleep(100000000); } @Override public void cancel() { running = false; } }
FlinkTumblingWindowsInnerJoinDemo

package cn._51doit.flink.day10; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.JoinedStreams; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class FlinkTumblingWindowsInnerJoinDemo { public static void main(String[] args) throws Exception { int windowSize = 10; long delay = 5001L; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 設置時間類型 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //通過Env設置並行度為1,即以后所有的DataStream的並行都是1 env.setParallelism(1); // 設置數據源 //第一個流(左流) DataStream<Tuple3<String, String, Long>> leftSource = env.addSource(new StreamDataSourceA()).name("Demo SourceA"); //第二個流(右流) DataStream<Tuple3<String, String, Long>> rightSource = env.addSource(new StreamDataSourceB()).name("Demo SourceB"); // 對leftSource設置水位線 // ("a", "1", 1000) SingleOutputStreamOperator<Tuple3<String, String, Long>> leftStream = leftSource.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) { @Override public long extractTimestamp(Tuple3<String, String, Long> element) { return element.f2; } }); // 對rightSource設置水位線 // ("a", "hangzhou", 6000) SingleOutputStreamOperator<Tuple3<String, String, Long>> rightStream = rightSource.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) { @Override public long extractTimestamp(Tuple3<String, String, Long> element) { return element.f2; } }); // join操作 JoinedStreams<Tuple3<String, String, Long>, Tuple3<String, String, Long>> joinedStream = leftStream.join(rightStream); joinedStream .where(new LeftSelectKey()) .equalTo(new RightSelectKey()) .window(TumblingEventTimeWindows.of(Time.seconds(windowSize))) .apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, Tuple5<String, String, String, Long, Long>>() { // 兩個流的key的值相等,並且在同一個窗口內 @Override public Tuple5<String, String, String, Long, Long> join(Tuple3<String, String, Long> first, Tuple3<String, String, Long> second) throws Exception { // a, 1, "hangzhou", 1000001000, 1000006000 return new Tuple5<>(first.f0, first.f1, second.f1, first.f2, second.f2); } }).print(); env.execute(); } public static class LeftSelectKey implements KeySelector<Tuple3<String, String, Long>,String>{ @Override public String getKey(Tuple3<String, String, Long> w) throws Exception { return w.f0; } } public static class RightSelectKey implements KeySelector<Tuple3<String, String, Long>, String> { @Override public String getKey(Tuple3<String, String, Long> w) { return w.f0; } } }
運行結果
為什么b沒有被join上?
- left join
FlinkTumblingWindowLeftJoinDemo

package cn._51doit.flink.day10; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class FlinkTumblingWindowsLeftJoinDemo { public static void main(String[] args) throws Exception { int windowSize = 10; long delay = 5000L; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); // 設置數據源 DataStream<Tuple3<String, String, Long>> leftSource = env.addSource(new StreamDataSourceA()).name("Demo SourceA"); DataStream<Tuple3<String, String, Long>> rightSource = env.addSource(new StreamDataSourceB()).name("Demo SourceB"); // 設置水位線 DataStream<Tuple3<String, String, Long>> leftStream = leftSource.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) { @Override public long extractTimestamp(Tuple3<String, String, Long> element) { return element.f2; } } ); DataStream<Tuple3<String, String, Long>> rightStream = rightSource.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, String, Long>>(Time.milliseconds(delay)) { @Override public long extractTimestamp(Tuple3<String, String, Long> element) { return element.f2; } } ); int parallelism1 = leftStream.getParallelism(); int parallelism2 = rightStream.getParallelism(); // left join 操作 leftStream.coGroup(rightStream) .where(new LeftSelectKey()) .equalTo(new RightSelectKey()) .window(TumblingEventTimeWindows.of(Time.seconds(windowSize))) .apply(new LeftJoin()) .print(); env.execute("TimeWindowDemo"); } public static class LeftJoin implements CoGroupFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, Tuple5<String, String, String, Long, Long>> { //coGroup左流的數據和有流的數據取出來,可以將key相同,並且在同一個窗口的數據取出來 @Override public void coGroup( Iterable<Tuple3<String, String, Long>> leftElements, Iterable<Tuple3<String, String, Long>> rightElements, Collector<Tuple5<String, String, String, Long, Long>> out) { //leftElements是左流的數據 for (Tuple3<String, String, Long> leftElem : leftElements) { boolean hadElements = false; //如果左邊的流join上了右邊的流rightElements就不為空 for (Tuple3<String, String, Long> rightElem : rightElements) { //將join上的數據輸出 out.collect(new Tuple5<>(leftElem.f0, leftElem.f1, rightElem.f1, leftElem.f2, rightElem.f2)); hadElements = true; } if (!hadElements) { //沒join上,給右邊的數據賦空值 out.collect(new Tuple5<>(leftElem.f0, leftElem.f1, "null", leftElem.f2, -1L)); } } } } public static class LeftSelectKey implements KeySelector<Tuple3<String, String, Long>, String> { @Override public String getKey(Tuple3<String, String, Long> w) { return w.f0; } } public static class RightSelectKey implements KeySelector<Tuple3<String, String, Long>, String> { @Override public String getKey(Tuple3<String, String, Long> w) { return w.f0; } } }
做法和join類似,只是此處使用coGroup算子
3. 訂單Join案例
在各種各樣的系統中,都訂單數據表 訂單表:訂單主表、訂單明細表 訂單主表: 訂單id、訂單狀態、訂單總金額、訂單的時間、用戶ID 訂單明細表: 訂單主表的ID、商品ID、商品的分類ID、商品的單價、商品的數量 統計某個分類的成交金額 訂單狀態為成交的(主表中) 商品的分類ID(明細表中) 商品的金額即單價*數量(明細表中) 在京東或淘寶,下來一個單 o1000,已支付,600,2020-06-29 19:42:00,feng p1, 100, 1, 食品 p1, 200, 1, 食品 p1, 300, 1, 食品
3.1 訂單數據的join實現
讀取kafka中topic的數據的工具類

package cn._51doit.flink.day10; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Arrays; import java.util.List; import java.util.Properties; public class FlinkUtilsV2 { private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters, Class<? extends DeserializationSchema<T>> clazz) throws Exception { String topics = parameters.getRequired("kafka.topics"); String groupId = parameters.getRequired("group.id"); return createKafkaDataStream(parameters, topics, groupId, clazz); } public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters, String topics, Class<? extends DeserializationSchema<T>> clazz) throws Exception { String groupId = parameters.getRequired("group.id"); return createKafkaDataStream(parameters, topics, groupId, clazz); } public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters, String topics, String groupId, Class<? extends DeserializationSchema<T>> clazz) throws Exception { //將ParameterTool的參數設置成全局的參數 env.getConfig().setGlobalJobParameters(parameters); //開啟checkpoint env.enableCheckpointing(parameters.getLong("checkpoint.interval", 10000L), CheckpointingMode.EXACTLY_ONCE); //重啟策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(parameters.getInt("restart.times", 10), Time.seconds(5))); //設置statebackend String path = parameters.get("state.backend.path"); if(path != null) { //最好的方式將setStateBackend配置到Flink的全局配置文件中flink-conf.yaml env.setStateBackend(new FsStateBackend(path)); } //設置cancel任務不用刪除checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setMaxConcurrentCheckpoints(3); //String topics = parameters.getRequired("kafka.topics"); List<String> topicList = Arrays.asList(topics.split(",")); Properties properties = parameters.getProperties(); properties.setProperty("group.id", groupId); //創建FlinkKafkaConsumer FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<T>( topicList, clazz.newInstance(), properties ); return env.addSource(kafkaConsumer); } public static StreamExecutionEnvironment getEnv() { return env; } }
OrderDetail

package cn._51doit.flink.day10; import java.util.Date; public class OrderDetail { private Long id; private Long order_id; private int category_id; private String categoryName; private Long sku; private Double money; private int amount; private Date create_time; private Date update_time; //對數據庫的操作類型:INSERT、UPDATE private String type; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public Long getOrder_id() { return order_id; } public void setOrder_id(Long order_id) { this.order_id = order_id; } public int getCategory_id() { return category_id; } public void setCategory_id(int category_id) { this.category_id = category_id; } public Long getSku() { return sku; } public void setSku(Long sku) { this.sku = sku; } public Double getMoney() { return money; } public void setMoney(Double money) { this.money = money; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public Date getCreate_time() { return create_time; } public void setCreate_time(Date create_time) { this.create_time = create_time; } public Date getUpdate_time() { return update_time; } public void setUpdate_time(Date update_time) { this.update_time = update_time; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getCategoryName() { return categoryName; } public void setCategoryName(String categoryName) { this.categoryName = categoryName; } @Override public String toString() { return "OrderDetail{" + "id=" + id + ", order_id=" + order_id + ", category_id=" + category_id + ", categoryName='" + categoryName + '\'' + ", sku=" + sku + ", money=" + money + ", amount=" + amount + ", create_time=" + create_time + ", update_time=" + update_time + ", type='" + type + '\'' + '}'; } }
OrderMain

package cn._51doit.flink.day10; import java.util.Date; public class OrderMain { private Long oid; private Date create_time; private Double total_money; private int status; private Date update_time; private String province; private String city; //對數據庫的操作類型:INSERT、UPDATE private String type; public Long getOid() { return oid; } public void setOid(Long oid) { this.oid = oid; } public Date getCreate_time() { return create_time; } public void setCreate_time(Date create_time) { this.create_time = create_time; } public Double getTotal_money() { return total_money; } public void setTotal_money(Double total_money) { this.total_money = total_money; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getUpdate_time() { return update_time; } public void setUpdate_time(Date update_time) { this.update_time = update_time; } public String getProvince() { return province; } public void setProvince(String province) { this.province = province; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getType() { return type; } public void setType(String type) { this.type = type; } @Override public String toString() { return "OrderMain{" + "oid=" + oid + ", create_time=" + create_time + ", total_money=" + total_money + ", status=" + status + ", update_time=" + update_time + ", province='" + province + '\'' + ", city='" + city + '\'' + ", type='" + type + '\'' + '}'; } }
業務代碼
OrderJoin

package cn._51doit.flink.day10; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class OrderJoin { public static void main(String[] args) throws Exception { ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]); //使用EventTime作為時間標准 FlinkUtilsV2.getEnv().setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<String> orderMainLinesDataStream = FlinkUtilsV2.createKafkaDataStream(parameters, "ordermain", "g1", SimpleStringSchema.class); DataStream<String> orderDetailLinesDataStream = FlinkUtilsV2.createKafkaDataStream(parameters, "orderdetail", "g1", SimpleStringSchema.class); //對數據進行解析 SingleOutputStreamOperator<OrderMain> orderMainDataStream = orderMainLinesDataStream.process(new ProcessFunction<String, OrderMain>() { @Override public void processElement(String line, Context ctx, Collector<OrderMain> out) throws Exception { //flatMap+filter try { JSONObject jsonObject = JSON.parseObject(line); String type = jsonObject.getString("type"); if (type.equals("INSERT") || type.equals("UPDATE")) { JSONArray jsonArray = jsonObject.getJSONArray("data"); for (int i = 0; i < jsonArray.size(); i++) { OrderMain orderMain = jsonArray.getObject(i, OrderMain.class); orderMain.setType(type); //設置操作類型 out.collect(orderMain); } } } catch (Exception e) { //e.printStackTrace(); //記錄錯誤的數據 } } }); //對數據進行解析 SingleOutputStreamOperator<OrderDetail> orderDetailDataStream = orderDetailLinesDataStream.process(new ProcessFunction<String, OrderDetail>() { @Override public void processElement(String line, Context ctx, Collector<OrderDetail> out) throws Exception { //flatMap+filter try { JSONObject jsonObject = JSON.parseObject(line); String type = jsonObject.getString("type"); if (type.equals("INSERT") || type.equals("UPDATE")) { JSONArray jsonArray = jsonObject.getJSONArray("data"); for (int i = 0; i < jsonArray.size(); i++) { OrderDetail orderDetail = jsonArray.getObject(i, OrderDetail.class); orderDetail.setType(type); //設置操作類型 out.collect(orderDetail); } } } catch (Exception e) { //e.printStackTrace(); //記錄錯誤的數據 } } }); int delaySeconds = 2; //提取EventTime生成WaterMark SingleOutputStreamOperator<OrderMain> orderMainStreamWithWaterMark = orderMainDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderMain>(Time.seconds(delaySeconds)) { @Override public long extractTimestamp(OrderMain element) { return element.getCreate_time().getTime(); } }); SingleOutputStreamOperator<OrderDetail> orderDetailStreamWithWaterMark = orderDetailDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderDetail>(Time.seconds(delaySeconds)) { @Override public long extractTimestamp(OrderDetail element) { return element.getCreate_time().getTime(); } }); //Left Out JOIN,並且將訂單明細表作為左表 DataStream<Tuple2<OrderDetail, OrderMain>> joined = orderDetailStreamWithWaterMark.coGroup(orderMainStreamWithWaterMark) .where(new KeySelector<OrderDetail, Long>() { @Override public Long getKey(OrderDetail value) throws Exception { return value.getOrder_id(); } }) .equalTo(new KeySelector<OrderMain, Long>() { @Override public Long getKey(OrderMain value) throws Exception { return value.getOid(); } }) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new CoGroupFunction<OrderDetail, OrderMain, Tuple2<OrderDetail, OrderMain>>() { @Override public void coGroup(Iterable<OrderDetail> first, Iterable<OrderMain> second, Collector<Tuple2<OrderDetail, OrderMain>> out) throws Exception { for (OrderDetail orderDetail : first) { boolean isJoined = false; for (OrderMain orderMain : second) { out.collect(Tuple2.of(orderDetail, orderMain)); isJoined = true; } if (!isJoined) { out.collect(Tuple2.of(orderDetail, null)); } } } }); joined.print(); FlinkUtilsV2.getEnv().execute(); } }
3.2 訂單數據和遲到數據join的實現
OrderJoinAdv

package cn._51doit.flink.day10; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.sql.Connection; public class OrderJoinAdv { public static void main(String[] args) throws Exception { ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]); //使用EventTime作為時間標准 FlinkUtilsV2.getEnv().setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<String> orderMainLinesDataStream = FlinkUtilsV2.createKafkaDataStream(parameters, "ordermain", "g1", SimpleStringSchema.class); DataStream<String> orderDetailLinesDataStream = FlinkUtilsV2.createKafkaDataStream(parameters, "orderdetail", "g1", SimpleStringSchema.class); //對數據進行解析 SingleOutputStreamOperator<OrderMain> orderMainDataStream = orderMainLinesDataStream.process(new ProcessFunction<String, OrderMain>() { @Override public void processElement(String line, Context ctx, Collector<OrderMain> out) throws Exception { //flatMap+filter try { JSONObject jsonObject = JSON.parseObject(line); String type = jsonObject.getString("type"); if (type.equals("INSERT") || type.equals("UPDATE")) { JSONArray jsonArray = jsonObject.getJSONArray("data"); for (int i = 0; i < jsonArray.size(); i++) { OrderMain orderMain = jsonArray.getObject(i, OrderMain.class); orderMain.setType(type); //設置操作類型 out.collect(orderMain); } } } catch (Exception e) { //e.printStackTrace(); //記錄錯誤的數據 } } }); //對數據進行解析 SingleOutputStreamOperator<OrderDetail> orderDetailDataStream = orderDetailLinesDataStream.process(new ProcessFunction<String, OrderDetail>() { @Override public void processElement(String line, Context ctx, Collector<OrderDetail> out) throws Exception { //flatMap+filter try { JSONObject jsonObject = JSON.parseObject(line); String type = jsonObject.getString("type"); if (type.equals("INSERT") || type.equals("UPDATE")) { JSONArray jsonArray = jsonObject.getJSONArray("data"); for (int i = 0; i < jsonArray.size(); i++) { OrderDetail orderDetail = jsonArray.getObject(i, OrderDetail.class); orderDetail.setType(type); //設置操作類型 out.collect(orderDetail); } } } catch (Exception e) { //e.printStackTrace(); //記錄錯誤的數據 } } }); int delaySeconds = 2; int windowSize = 5; //提取EventTime生成WaterMark SingleOutputStreamOperator<OrderMain> orderMainStreamWithWaterMark = orderMainDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderMain>(Time.seconds(delaySeconds)) { @Override public long extractTimestamp(OrderMain element) { return element.getCreate_time().getTime(); } }); SingleOutputStreamOperator<OrderDetail> orderDetailStreamWithWaterMark = orderDetailDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderDetail>(Time.seconds(delaySeconds)) { @Override public long extractTimestamp(OrderDetail element) { return element.getCreate_time().getTime(); } }); //定義遲到側流輸出的Tag OutputTag<OrderDetail> lateTag = new OutputTag<OrderDetail>("late-date") {}; //對左表進行單獨划分窗口,窗口的長度與cogroup的窗口長度一樣 SingleOutputStreamOperator<OrderDetail> orderDetailWithWindow = orderDetailStreamWithWaterMark.windowAll(TumblingEventTimeWindows.of(Time.seconds(windowSize))) .sideOutputLateData(lateTag) //將遲到的數據打上Tag .apply(new AllWindowFunction<OrderDetail, OrderDetail, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<OrderDetail> values, Collector<OrderDetail> out) throws Exception { for (OrderDetail value : values) { //out.collect(value); } } }); //獲取遲到的數據 DataStream<OrderDetail> lateOrderDetailStream = orderDetailWithWindow.getSideOutput(lateTag); //應為orderDetail表的數據遲到數據不是很多,沒必要使用異步IO,直接使用RichMapFunction SingleOutputStreamOperator<Tuple2<OrderDetail, OrderMain>> lateOrderDetailAndOrderMain = lateOrderDetailStream.map(new RichMapFunction<OrderDetail, Tuple2<OrderDetail, OrderMain>>() { @Override public void open(Configuration parameters) throws Exception { //創建書庫的連接 } @Override public Tuple2<OrderDetail, OrderMain> map(OrderDetail value) throws Exception { return null; } @Override public void close() throws Exception { //關閉數據庫的連接 } }); //Left Out JOIN,並且將訂單明細表作為左表 DataStream<Tuple2<OrderDetail, OrderMain>> joined = orderDetailWithWindow.coGroup(orderMainStreamWithWaterMark) .where(new KeySelector<OrderDetail, Long>() { @Override public Long getKey(OrderDetail value) throws Exception { return value.getOrder_id(); } }) .equalTo(new KeySelector<OrderMain, Long>() { @Override public Long getKey(OrderMain value) throws Exception { return value.getOid(); } }) .window(TumblingEventTimeWindows.of(Time.seconds(windowSize))) .apply(new CoGroupFunction<OrderDetail, OrderMain, Tuple2<OrderDetail, OrderMain>>() { @Override public void coGroup(Iterable<OrderDetail> first, Iterable<OrderMain> second, Collector<Tuple2<OrderDetail, OrderMain>> out) throws Exception { for (OrderDetail orderDetail : first) { boolean isJoined = false; for (OrderMain orderMain : second) { out.collect(Tuple2.of(orderDetail, orderMain)); isJoined = true; } if (!isJoined) { out.collect(Tuple2.of(orderDetail, null)); } } } }); //join后,有可orderMain沒有join上 SingleOutputStreamOperator<Tuple2<OrderDetail, OrderMain>> punctualOrderDetailAndOrderMain = joined.map(new RichMapFunction<Tuple2<OrderDetail, OrderMain>, Tuple2<OrderDetail, OrderMain>>() { private transient Connection connection; @Override public void open(Configuration parameters) throws Exception { //打開數據庫連接 } @Override public Tuple2<OrderDetail, OrderMain> map(Tuple2<OrderDetail, OrderMain> tp) throws Exception { //每個關聯上訂單主表的數據,就查詢書庫 if (tp.f1 == null) { OrderMain orderMain = queryOrderMainFromMySQL(tp.f0.getOrder_id(), connection); tp.f1 = orderMain; } return tp; } @Override public void close() throws Exception { //關閉數據庫連接 } }); //將准時的和遲到的UNION到一起 DataStream<Tuple2<OrderDetail, OrderMain>> allOrderStream = punctualOrderDetailAndOrderMain.union(lateOrderDetailAndOrderMain); //根據具體的場景,寫入到Kafka、Hbase、ES、ClickHouse allOrderStream.print(); FlinkUtilsV2.getEnv().execute(); } private static OrderMain queryOrderMainFromMySQL(Long order_id, Connection connection) { return null; } }