【源碼解析】Flink 是如何基於事件時間生成Timestamp和Watermark


生成Timestamp和Watermark 的三個重載方法介紹可參見上一篇博客: Flink assignAscendingTimestamps 生成水印的三個重載方法

 之前想研究下Flink是怎么處理亂序的數據,看了相關的源碼,加上測試,發現得到了與預期完全不相同的結果。

預期是:亂序到達的數據,flink可以基於數據的事件時間,自動整理數據,依次計算輸出

結果是:在assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]指派timestamp和watermark的情況下,亂序到達的數據:遲到的數據直接從側邊輸出了超前的數據直接結束當前的窗口,開啟超前數據對應的窗口,后面到達的正常數據,直接作為遲到數據處理了

 在得到上面的結果的過程中,仔細的研究了一下生產Timestamp和Watermark相關的源碼。

Flink DataStream API 目前只能通過 assignTimestampsAndWatermarks方法創建時間戳和水印有兩種生成模式:

  1、基於事件時間創建每個事件的Timestamp 和 基於事件時間周期性的創建Watermark(默認周期為200ms)

  2、基於事件時間創建每個事件的Timestamp 和 基於事件時間每個事件都創建一個Watermark(如果新的Watermark大於當前的Watermark,才會發出)

 事件時間下,事件的Timestamp的創建都是直接依賴於事件攜帶的事件時間,而Watermark則是基於事件時間生成Watermark,所以有周期性創建Watermark和標記的Watermark(With Punctuated Watermarks)的區分(官網中基於Kafka 的分區時間作為Watermark 也是周期性的生成Watermark,只不過傳入的事件時間改為事件在kafka中的timestamp了) 

1、周期性的創建Watermark

周期性的創建Watermark的有兩種方法(kafka的分區時間的忽略):  

assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T]
assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] 

1.1  assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] 對應源碼 

調用方法如下:

.assignAscendingTimestamps(element => {
      // 方便打斷點debug 
      println("xxxxxx : " + element.createTime)
      sdf.parse(element.createTime).getTime
    })

 周期性的創建Watermark 是在 TimestampsAndPeriodicWatermarksOperator 中生成、發出,對應的時間來源是調用不同的生成timestamp 和 Watermark 的實現類

TimestampsAndPeriodicWatermarksOperator  相應代碼如下:

  /*    
        處理事件元素: 獲取對應的事件時間的時間戳,替換事件默認的時間戳(如果數據源是kafka,時間戳就是數據在kafka中的時間戳)
     */
    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
                element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

        output.collect(element.replace(element.getValue(), newTimestamp));
    }
    /*
        處理時間(Watermark) : 獲取當前時間對應的上一次的事件時間,生成新的watermark,新的watermark的時間戳大於當前的watermark,就發出新的watermark
     */
    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        // 從這里可以看到,每200ms 打印一次
        System.out.println("timestamp : " + timestamp + ", system.current : " + System.currentTimeMillis());
        // register next timer
        Watermark newWatermark = userFunction.getCurrentWatermark();
        if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
            currentWatermark = newWatermark.getTimestamp();
            // emit watermark
 output.emitWatermark(newWatermark);
        }

        long now = getProcessingTimeService().getCurrentProcessingTime();
// 注冊timer ,周期性的調用,下面會展開 getProcessingTimeService().registerTimer(now
+ watermarkInterval, this); }

在這種生成timestamp 和 Watermark 的情況下,userFunction  對應的類是:AscendingTimestampExtractor

對應源碼如下:

@Override
    public final long extractTimestamp(T element, long elementPrevTimestamp) {
                // 調用 assignAscendingTimestamps 的參數函數
        final long newTimestamp = extractAscendingTimestamp(element);
        if (newTimestamp >= this.currentTimestamp) {
// 這是為了下面生成Watermark的方法,總能得到 大於等於 當前Watermark的 時間戳
this.currentTimestamp = newTimestamp; return newTimestamp; } else { violationHandler.handleViolation(newTimestamp, this.currentTimestamp); return newTimestamp; } } @Override public final Watermark getCurrentWatermark() { return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1); }

timestamp的生成: TimestampsAndPeriodicWatermarksOperator#processElement  方法,調用AscendingTimestampExtractor#extractTimestamp 再調用 用戶代碼中具體生成timestamp 的方法,最終生成事件對應的timestamp,替換原有的timestamp

Watermark的生成:TimestampsAndPeriodicWatermarksOperator#onProcessingTime 方法,調用 AscendingTimestampExtractor#getCurrentWatermark, 返回生成timestamp 時的 currentTimestamp -1 ,生成  Watermark,如果生成的Watermark的timestamp 大於當前的  Watermark的timestamp 就發出新的Watermark

1.2 assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] 

調用方法如下:

.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LateDataEvent](Time.milliseconds(50)) {
      override def extractTimestamp(element: LateDataEvent): Long = {
        println("current timestamp : " + sdf.parse(element.createTime).getTime)
        sdf.parse(element.createTime).getTime
      }
    })

在這種生成timestamp 和 Watermark 的情況下,userFunction  對應的類是:BoundedOutOfOrdernessTimestampExtractor

對應源碼

    @Override
    public final Watermark getCurrentWatermark() {
        // this guarantees that the watermark never goes backwards.
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) {
// 這是為了上面上次Watermark的方法總能獲取到 大於等於 當前Watermark的時間戳 currentMaxTimestamp
= timestamp; } return timestamp; }

 基本上與上面相同,只是這種情況下,生成Watermark會 減去相應的 maxOutOfOrderness (允許延遲時間,就是代碼中BoundedOutOfOrdernessTimestampExtractor對應的參數)

之所以說是周期性的,是因為生成Watermark的方法是周期性調用的:

// 注冊timer 定期執行
getProcessingTimeService().registerTimer(now + watermarkInterval, this); // 對應 watermarkInterval 來自與系統配置 watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); // 對應配置, 默認 200ms env.getConfig.setAutoWatermarkInterval(400)

看代碼可知,生成timestamp和Watermark是兩條線,timestamp 是每個事件消息都會生成,而Watermark 是周期的

2、標記的Watermark(With Punctuated Watermarks)

這種Watermark的生成只有一種,對應代碼如下:

.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() {
      // check extractTimestamp emitted watermark is non-null and large than previously 生成當前事件的Watermark
      override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = {
        new Watermark(extractedTimestamp)
      }

      // generate next watermark 生成當前事件的timestamp
      override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = {
        val eventTime = sdf.parse(element.createTime).getTime
        eventTime
      }
    })

對應上面生成添加時間戳到事件中和發出Watermark  在 TimestampsAndPunctuatedWatermarksOperator中具體如下:

@Override
    public void processElement(StreamRecord<T> element) throws Exception {
        final T value = element.getValue();
                // extractTimestamp 方法就是assignTimestampsAndWatermarks 中的 extractTimestamp 生成事件的時間戳
        final long newTimestamp = userFunction.extractTimestamp(value,
                element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

        output.collect(element.replace(element.getValue(), newTimestamp));
                // checkAndGetNextWatermark 方法就是assignTimestampsAndWatermarks 中的 checkAndGetNextWatermark,檢查Watermark
        final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp);
// 新的Watermark大於當前的Watermark才會發出
if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } }

這里可以看出,每條數據都會生成 tiemstamp 和 Watermark(不一定會發出,如果數據都是正常的,Watermark的消息會和事件的消息一樣多,所以會影響性能)

搞定。

 歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM