之前分享了一個 Flink 的面試題,這里簡單回顧下內容:
有兩個輸入源,一個是命令流,一個是數據流
需要將命令流進行廣播,然后和數據流進行connect,根據命令流指定的命令進行統計
實現一個輸出到終端的 sink,將統計結果打印出來,每一條記錄包括 taskId, targetAttr, periodStartTime(周期開始時間), value (統計后的值,double類型)
面試題原文鏈接: https://mp.weixin.qq.com/s/iKx0EE-xvnOyncCIhN6MeA
實現流程
1、命令流使用從 kafka 輸入,方便手動發送命令,map 解析成對象,廣播
2、數據流實現 SourceFunction 自動生成數據,map 解析成對象
3、使用數據流關聯 命令流,輸出數據與命令組合的 tuple
4、生成 timestamp 和 周期性的 watermark(flink 自帶)
5、數據通過 DynamicTumblingEventTimeWindows.assignWindows 指定動態窗口
6、使用窗口函數根據命令的 methed 計算對應的結果
具體實現請移步 github: https://github.com/springMoon/flink-rookie/tree/master/src/main/scala/com/venn/questing/dynamicWindow
動態窗口實現
這個問題的難點就在: 根據命令流的規則進行窗口統計(而命令的規則中,指定了統計的目標,也指定了 窗口的長度和開始時間)
Flink 原生的翻滾、滑動、session 窗口都是固定時間長度的窗口(session 窗口特殊的是,指定的長度不是窗口的長度而是 session timeout 的時間)
看下 翻滾窗口(TumblingEventTimeWindows)的源碼
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; // 指定窗口的長度 private final long size; // 指定窗口的開始時間的偏移長度,如: Flink 的窗口都是整點的,按天的窗口都是從 0 點開始(UTC0),指定 offset = 8 小時,就稱為北京時間的 0 點了 private final long offset; protected TumblingEventTimeWindows(long size, long offset) { if (Math.abs(offset) >= size) { throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size"); } this.size = size; this.offset = offset; } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { // Long.MIN_VALUE is currently assigned when no timestamp is present // 每條數據進來會根據 當前的 timestam 使用 offset 和 size 計算窗口對於的開始時間,結束時間就是 開始時間 + size long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size); return Collections.singletonList(new TimeWindow(start, start + size)); } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } } } TimeWindow.java public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }
Flink 在使用 TumblingEventTimeWindows 功能的時候,每條數據都會進入 TumblingEventTimeWindows.assignWindows 方法,計算數據屬於的窗口(知道窗口的長度,基於 0 的偏移值,任何一個 正常的 timestam 都可以通過上面的 getWindowStartWithOffset 函數計算出該 timestamp 對應窗口的 開始時間和結束時間)。
動態窗口的實現也是基於 TumblingEventTimeWindows 修改的,主要是"根據每條輸入數據的命令,修改 窗口的 size 和 offset" 使窗口稱為動態窗口
核心代碼如下:
/** * flink dynamic tumbling event window */ @PublicEvolving public class DynamicTumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; // not final, dynamic modify private long size; private long offset; protected DynamicTumblingEventTimeWindows() { } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { Tuple2<DataEntity, Command> element1 = (Tuple2<DataEntity, Command>) element; Command command = element1._2; // cal new window size // 大於當前時間的情況又怎么處理呢: 窗口開始時間大於 timestamp,下一窗口命令還未開始,數據屬於上一窗口命令,所以不修改 size 與 offset if (command.startTime() < timestamp) { long millis = command.startTime() % 999; if ("minute".equalsIgnoreCase(command.periodUnit())) { this.size = command.periodLength() * 60 * 1000; // offset 等於 命令開始時間的 秒值 + 毫秒值 long second = command.startTime() / 1000 % 60; offset = second * 1000 + millis; } else { this.size = command.periodLength() * 1000; // offset 等於 命令開始時間的 毫秒值 offset = millis; } } // todo 窗口開始時間大於或者小於 當前 timestamp 的時候,需要處理 // 小於當前時間,可以計算出當前timestamp 對應的窗口 long start = getWindowStartWithOffset(timestamp, offset, size); return Collections.singletonList(new TimeWindow(start, start + size)); } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } } /** * cal window start time */ public long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp + offset - windowSize) % windowSize; } public static DynamicTumblingEventTimeWindows of() { return new DynamicTumblingEventTimeWindows(); } }
DynamicTumblingEventTimeWindows.of 生成窗口的時候,不再指定固定的 size 和 offset(動態窗口的規則中,有指定 對於的 屬於進行統計,所以就不指定默認窗口 size 和 offset)
根據輸入數據的命令部分,所以命令大於當前 timestamp 的數據(小於 timestamp 的命令說明該命令尚未開始,數據還是屬於上一窗口,前面 數據流與命令流 關聯的時候,已經做了處理,這里只是多加一層判斷),根據命令中的 startTime 計算命令對應窗口基於 0 毫秒的偏移值(如果是分鍾的窗口還有加上 秒 的偏移值),窗口的長度是 periodLength 屬性對應的值,這里就得到了命令對應的窗口的 size 和 offset,后面的流程就和 Flink 原生窗口(TumblingEventTimeWindows)一樣了,計算下 窗口的開始時間,結束時間
命令開始時間處理
對於命令的開始時間,其實也是一個處理的難點
命令的開始時間可能是小於、等於、大於當前時間的,其中小於和等於的命令,意味着窗口馬上就要開始,使用對應的屬性計算窗口的 size 和 offset 即可
對於命令的開始大於當前時間的命令,需要做下特殊處理,大於當前時間,意味着命令還不能生效,不能替換當前命令,當前的數據,是屬於上一個正在執行的命令
在實現的時候,我使用了兩個 map 的對應,一個存儲當前正在執行命令,一個存儲最新的命令(為了簡單,假設基於每個屬性的命令一次只會有一個在執行)
核心代碼如下:
new BroadcastProcessFunction[DataEntity, Command, (DataEntity, Command)]() { // 存放當前命令的 map(非 keyBy 的不能使用 keyState,用 hashmap 將就了) var currentCommand: util.HashMap[String, Command] = _ // 存放新命令的 map var commandState: MapStateDescriptor[String, Command] = _ override def open(parameters: Configuration): Unit = { currentCommand = new util.HashMap[String, Command]() commandState = new MapStateDescriptor[String, Command]("commandState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint[Command]() {})) } override def processElement(element: DataEntity, ctx: BroadcastProcessFunction[DataEntity, Command, (DataEntity, Command)]#ReadOnlyContext, out: Collector[(DataEntity, Command)]): Unit = { // 命令可以是大於/小於當前時間 // 小於當前時間的,直接添加即可,之前命令的窗口不會收到新數據,新數據直接進新命令的窗口 // 大於當前時間的命令,不能直接與流一起往下游輸出,等時間小於當前的 processTime 時間后,才會開始新窗口 val command = ctx.getBroadcastState(commandState).get(element.attr) val current = currentCommand.get(element.attr) if (command != null && command.startTime <= ctx.currentProcessingTime()) { // 當新命令的時間小於當前的處理時間,替換舊命令 currentCommand.put(element.attr, command) } // 如果當前命令為空,數據就不往下發送了 if (current != null) { out.collect((element, current)) } // command not exists, ignore it } override def processBroadcastElement(element: Command, ctx: BroadcastProcessFunction[DataEntity, Command, (DataEntity, Command)]#Context, out: Collector[(DataEntity, Command)]): Unit = { // only one command are new accepted, cover old command logger.info("receive command : " + element) ctx.getBroadcastState(commandState).put(element.targetAttr, element) } }
代碼都有注釋,不再贅述
全部代碼,請移步 github: https://github.com/springMoon/flink-rookie/tree/master/src/main/scala/com/venn/questing/dynamicWindow
測試
從 kafka 數據 readme.md 中對應的命令,查看 輸出結果
2020-09-09 20:12:08,812 INFO - receive command : Command(task1,attr2,sum,SECOND,20,1598596980000) 2020-09-09 20:12:08,812 INFO - receive command : Command(task2,attr1,sum,MINUTE,1,1598596980000) 2020-09-09 20:12:08,812 INFO - receive command : Command(task3,attr2,max,SECOND,30,1598596980000) 2020-09-09 20:12:09,816 INFO - receive command : Command(task4,attr3,min,MINUTE,1,1599640669628) sum> {"method":"min","periodStartTime":"20:11:10","targetAttr":"attr3","periodEndTime":"20:12:10","value":"18.0","taskId":"task4"} sum> {"method":"max","periodStartTime":"20:11:59","targetAttr":"attr2","periodEndTime":"20:12:29","value":"981.0","taskId":"task3"} sum> {"method":"max","periodStartTime":"20:12:29","targetAttr":"attr2","periodEndTime":"20:12:59","value":"937.0","taskId":"task3"} sum> {"method":"sum","periodStartTime":"20:11:59","targetAttr":"attr1","periodEndTime":"20:12:59","value":"26876.0","taskId":"task2"} sum> {"method":"min","periodStartTime":"20:12:10","targetAttr":"attr3","periodEndTime":"20:13:10","value":"32.0","taskId":"task4"} sum> {"method":"max","periodStartTime":"20:12:59","targetAttr":"attr2","periodEndTime":"20:13:29","value":"998.0","taskId":"task3"} 2020-09-09 20:13:43,712 INFO - receive command : Command(task5,attr2,sum,SECOND,20,1598596980000) 2020-09-09 20:13:43,712 INFO - receive command : Command(task6,attr1,sum,MINUTE,1,1598596980000) 2020-09-09 20:13:43,712 INFO - receive command : Command(task7,attr2,max,SECOND,30,1598596980000) 2020-09-09 20:13:43,712 INFO - receive command : Command(task8,attr3,min,MINUTE,1,1599640669628) sum> {"method":"max","periodStartTime":"20:13:29","targetAttr":"attr2","periodEndTime":"20:13:59","value":"995.0","taskId":"task3"} sum> {"method":"sum","periodStartTime":"20:12:59","targetAttr":"attr1","periodEndTime":"20:13:59","value":"31627.0","taskId":"task2"} sum> {"method":"min","periodStartTime":"20:13:10","targetAttr":"attr3","periodEndTime":"20:14:10","value":"90.0","taskId":"task4"} sum> {"method":"max","periodStartTime":"20:13:59","targetAttr":"attr2","periodEndTime":"20:14:29","value":"945.0","taskId":"task7"}
