Flink 動態窗口統計面試題-實現


之前分享了一個 Flink 的面試題,這里簡單回顧下內容:

有兩個輸入源,一個是命令流,一個是數據流
需要將命令流進行廣播,然后和數據流進行connect,根據命令流指定的命令進行統計
實現一個輸出到終端的 sink,將統計結果打印出來,每一條記錄包括 taskId, targetAttr, periodStartTime(周期開始時間), value (統計后的值,double類型)

面試題原文鏈接: https://mp.weixin.qq.com/s/iKx0EE-xvnOyncCIhN6MeA

實現流程

1、命令流使用從 kafka 輸入,方便手動發送命令,map 解析成對象,廣播
2、數據流實現 SourceFunction 自動生成數據,map 解析成對象
3、使用數據流關聯 命令流,輸出數據與命令組合的 tuple
4、生成 timestamp 和 周期性的 watermark(flink 自帶)
5、數據通過 DynamicTumblingEventTimeWindows.assignWindows 指定動態窗口
6、使用窗口函數根據命令的 methed 計算對應的結果

具體實現請移步 github: https://github.com/springMoon/flink-rookie/tree/master/src/main/scala/com/venn/questing/dynamicWindow

動態窗口實現

這個問題的難點就在: 根據命令流的規則進行窗口統計(而命令的規則中,指定了統計的目標,也指定了 窗口的長度和開始時間)

Flink 原生的翻滾、滑動、session 窗口都是固定時間長度的窗口(session 窗口特殊的是,指定的長度不是窗口的長度而是 session timeout 的時間)

看下 翻滾窗口(TumblingEventTimeWindows)的源碼

public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;
    // 指定窗口的長度
    private final long size;
    // 指定窗口的開始時間的偏移長度,如: Flink 的窗口都是整點的,按天的窗口都是從 0 點開始(UTC0),指定 offset = 8 小時,就稱為北京時間的 0 點了
    private final long offset;

    protected TumblingEventTimeWindows(long size, long offset) {
        if (Math.abs(offset) >= size) {
            throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
        }

        this.size = size;
        this.offset = offset;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            // Long.MIN_VALUE is currently assigned when no timestamp is present
            // 每條數據進來會根據 當前的 timestam 使用 offset 和 size 計算窗口對於的開始時間,結束時間就是 開始時間 + size 
            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
            return Collections.singletonList(new TimeWindow(start, start + size));
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }

}

TimeWindow.java
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
}

Flink 在使用 TumblingEventTimeWindows 功能的時候,每條數據都會進入 TumblingEventTimeWindows.assignWindows 方法,計算數據屬於的窗口(知道窗口的長度,基於 0 的偏移值,任何一個 正常的 timestam 都可以通過上面的 getWindowStartWithOffset 函數計算出該 timestamp 對應窗口的 開始時間和結束時間)。

動態窗口的實現也是基於 TumblingEventTimeWindows 修改的,主要是"根據每條輸入數據的命令,修改 窗口的 size 和 offset" 使窗口稱為動態窗口

核心代碼如下:

/**
 * flink dynamic tumbling event window
 */
@PublicEvolving
public class DynamicTumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    // not final, dynamic modify
    private long size;
    private long offset;

    protected DynamicTumblingEventTimeWindows() {
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            Tuple2<DataEntity, Command> element1 = (Tuple2<DataEntity, Command>) element;
            Command command = element1._2;
            // cal new window size
            // 大於當前時間的情況又怎么處理呢: 窗口開始時間大於 timestamp,下一窗口命令還未開始,數據屬於上一窗口命令,所以不修改 size 與 offset
            if (command.startTime() < timestamp) {
                long millis = command.startTime() % 999;
                if ("minute".equalsIgnoreCase(command.periodUnit())) {
                    this.size = command.periodLength() * 60 * 1000;
                    // offset 等於 命令開始時間的 秒值 + 毫秒值
                    long second = command.startTime() / 1000 % 60;
                    offset = second * 1000 + millis;
                } else {
                    this.size = command.periodLength() * 1000;
                    // offset 等於 命令開始時間的 毫秒值
                    offset = millis;
                }
            }
            // todo 窗口開始時間大於或者小於 當前 timestamp 的時候,需要處理
            // 小於當前時間,可以計算出當前timestamp 對應的窗口
            long start = getWindowStartWithOffset(timestamp, offset, size);
            return Collections.singletonList(new TimeWindow(start, start + size));
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }

    /**
     * cal window start time
     */
    public long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp + offset - windowSize) % windowSize;
    }

    public static DynamicTumblingEventTimeWindows of() {
        return new DynamicTumblingEventTimeWindows();
    }

}

DynamicTumblingEventTimeWindows.of 生成窗口的時候,不再指定固定的 size 和 offset(動態窗口的規則中,有指定 對於的 屬於進行統計,所以就不指定默認窗口 size 和 offset)

根據輸入數據的命令部分,所以命令大於當前 timestamp 的數據(小於 timestamp 的命令說明該命令尚未開始,數據還是屬於上一窗口,前面 數據流與命令流 關聯的時候,已經做了處理,這里只是多加一層判斷),根據命令中的 startTime 計算命令對應窗口基於 0 毫秒的偏移值(如果是分鍾的窗口還有加上 秒 的偏移值),窗口的長度是 periodLength 屬性對應的值,這里就得到了命令對應的窗口的 size 和 offset,后面的流程就和 Flink 原生窗口(TumblingEventTimeWindows)一樣了,計算下 窗口的開始時間,結束時間

命令開始時間處理

對於命令的開始時間,其實也是一個處理的難點

命令的開始時間可能是小於、等於、大於當前時間的,其中小於和等於的命令,意味着窗口馬上就要開始,使用對應的屬性計算窗口的 size 和 offset 即可

對於命令的開始大於當前時間的命令,需要做下特殊處理,大於當前時間,意味着命令還不能生效,不能替換當前命令,當前的數據,是屬於上一個正在執行的命令

在實現的時候,我使用了兩個 map 的對應,一個存儲當前正在執行命令,一個存儲最新的命令(為了簡單,假設基於每個屬性的命令一次只會有一個在執行)

核心代碼如下:

new BroadcastProcessFunction[DataEntity, Command, (DataEntity, Command)]() {

    // 存放當前命令的 map(非 keyBy 的不能使用 keyState,用 hashmap 將就了)
    var currentCommand: util.HashMap[String, Command] = _
    // 存放新命令的 map
    var commandState: MapStateDescriptor[String, Command] = _

    override def open(parameters: Configuration): Unit = {

      currentCommand = new util.HashMap[String, Command]()
      commandState = new MapStateDescriptor[String, Command]("commandState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint[Command]() {}))
    }

    override def processElement(element: DataEntity, ctx: BroadcastProcessFunction[DataEntity, Command, (DataEntity, Command)]#ReadOnlyContext, out: Collector[(DataEntity, Command)]): Unit = {
      // 命令可以是大於/小於當前時間
      // 小於當前時間的,直接添加即可,之前命令的窗口不會收到新數據,新數據直接進新命令的窗口
      // 大於當前時間的命令,不能直接與流一起往下游輸出,等時間小於當前的 processTime 時間后,才會開始新窗口
      val command = ctx.getBroadcastState(commandState).get(element.attr)
      val current = currentCommand.get(element.attr)
      if (command != null && command.startTime <= ctx.currentProcessingTime()) {
        // 當新命令的時間小於當前的處理時間,替換舊命令
        currentCommand.put(element.attr, command)
      }
      // 如果當前命令為空,數據就不往下發送了
      if (current != null) {
        out.collect((element, current))
      }
      // command not exists, ignore it
    }

    override def processBroadcastElement(element: Command, ctx: BroadcastProcessFunction[DataEntity, Command, (DataEntity, Command)]#Context, out: Collector[(DataEntity, Command)]): Unit = {
      // only one command are new accepted, cover old command
      logger.info("receive command : " + element)
      ctx.getBroadcastState(commandState).put(element.targetAttr, element)
    }
  }

代碼都有注釋,不再贅述

全部代碼,請移步 github: https://github.com/springMoon/flink-rookie/tree/master/src/main/scala/com/venn/questing/dynamicWindow

測試

從 kafka 數據 readme.md 中對應的命令,查看 輸出結果

2020-09-09 20:12:08,812 INFO  - receive command : Command(task1,attr2,sum,SECOND,20,1598596980000)
2020-09-09 20:12:08,812 INFO  - receive command : Command(task2,attr1,sum,MINUTE,1,1598596980000)
2020-09-09 20:12:08,812 INFO  - receive command : Command(task3,attr2,max,SECOND,30,1598596980000)
2020-09-09 20:12:09,816 INFO  - receive command : Command(task4,attr3,min,MINUTE,1,1599640669628)
sum> {"method":"min","periodStartTime":"20:11:10","targetAttr":"attr3","periodEndTime":"20:12:10","value":"18.0","taskId":"task4"}
sum> {"method":"max","periodStartTime":"20:11:59","targetAttr":"attr2","periodEndTime":"20:12:29","value":"981.0","taskId":"task3"}
sum> {"method":"max","periodStartTime":"20:12:29","targetAttr":"attr2","periodEndTime":"20:12:59","value":"937.0","taskId":"task3"}
sum> {"method":"sum","periodStartTime":"20:11:59","targetAttr":"attr1","periodEndTime":"20:12:59","value":"26876.0","taskId":"task2"}
sum> {"method":"min","periodStartTime":"20:12:10","targetAttr":"attr3","periodEndTime":"20:13:10","value":"32.0","taskId":"task4"}
sum> {"method":"max","periodStartTime":"20:12:59","targetAttr":"attr2","periodEndTime":"20:13:29","value":"998.0","taskId":"task3"}
2020-09-09 20:13:43,712 INFO  - receive command : Command(task5,attr2,sum,SECOND,20,1598596980000)
2020-09-09 20:13:43,712 INFO  - receive command : Command(task6,attr1,sum,MINUTE,1,1598596980000)
2020-09-09 20:13:43,712 INFO  - receive command : Command(task7,attr2,max,SECOND,30,1598596980000)
2020-09-09 20:13:43,712 INFO  - receive command : Command(task8,attr3,min,MINUTE,1,1599640669628)
sum> {"method":"max","periodStartTime":"20:13:29","targetAttr":"attr2","periodEndTime":"20:13:59","value":"995.0","taskId":"task3"}
sum> {"method":"sum","periodStartTime":"20:12:59","targetAttr":"attr1","periodEndTime":"20:13:59","value":"31627.0","taskId":"task2"}
sum> {"method":"min","periodStartTime":"20:13:10","targetAttr":"attr3","periodEndTime":"20:14:10","value":"90.0","taskId":"task4"}
sum> {"method":"max","periodStartTime":"20:13:59","targetAttr":"attr2","periodEndTime":"20:14:29","value":"945.0","taskId":"task7"}
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM