【源碼解析】Flink 是如何處理遲到數據


相信會看到這篇文章的都對Flink的時間類型(事件時間、處理時間、攝入時間)和Watermark有些了解,當然不了解可以先看下官網的介紹:https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html 

這里就會有這樣一個問題:FLink 是怎么基於事件時間和Watermark處理遲到數據的呢

在回答這個問題之前,建議大家可以看下下面的Google 的三篇論文,關於流處理的模型:

https://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf 《The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing》

high-level的現代數據處理概念指引:

https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101

https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

---------------------------進入正題--------------------------------

現在進入正題:FLink 是怎么基於事件時間和Watermark處理遲到數據的呢?

這個問題可以分成兩個部分:

  1. 基於事件時間創建Timestamp 和Watermark(后面會詳細介紹)

  2. 處理遲到數據

 

1. 基於事件時間創建Timestamp 和Watermark

 為了方便查看,這里使用 assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]  重載方法基於每個事件生成水印代碼如下:

val input = env.addSource(source)
      .map(json => {
        // json : {"id" : 0, "createTime" : "2019-08-24 11:13:14.942", "amt" : "9.8"}
        val id = json.get("id").asText()
        val createTime = json.get("createTime").asText()
        val amt = json.get("amt").asText()
        LateDataEvent("key", id, createTime, amt)
      })
      // assign watermarks every event
      .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() {
      // check extractTimestamp emitted watermark is non-null and large than previously
      override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = {
        new Watermark(extractedTimestamp)
      }
      // generate next watermark
      override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = {
        val eventTime = sdf.parse(element.createTime).getTime
        eventTime
      }
    })

```

擴展:數據在算子中是以StreamRecord 對象作為流轉抽象結構如下:

public final class StreamRecord<T> extends StreamElement {

    /** The actual value held by this record.  具體數據*/
    private T value;

    /** The timestamp of the record.  該數據對應的時間戳 */
    private long timestamp;

}

StreamElement 也是 Watermark 和 StreamStatus的父類,簡單來說就是Flink 承載消息的基類(這里可以指定,Watermark 是和事件一個級別的抽象,而Timestamp 是Watermark和事件的成員變量,代表Watermark和事件的時間)

```

assignTimestampsAndWatermarks 是基於事件的數據(extractTimestamp 方法中返回的Timestamp),替換StreamRecord 對象中的Timestamp和發出新的Watermark(如果當前事件的Timestamp 生成的Watermark大於上一次的Watermark)

下面我們來debug這部分源碼:

首先在extractTimestamp  方法中添加斷點查看Timestamp 和Watermark的生成:

TimestampsAndPunctuatedWatermarksOperator.processElement(使用的類取決於assignTimestampsAndWatermarks 方法的參數) 中處理事件的Timestamp和對應的Watermark

StreamRecord對象的創建在 StreamSourceContexts.processAndCollectWithTimestamp 中,使用的Timestamp 是數據在kafka的時間,在KafkaFetcher.emitRecord方法中從consumerRecord中獲取

KafkaFetcher.emitRecord   發出從kafka中消費到的數據:

protected void emitRecord(
        T record,
        KafkaTopicPartitionState<TopicPartition> partition,
        long offset,
        ConsumerRecord<?, ?> consumerRecord) throws Exception {

        emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp());
    }

StreamSourceContexts.processAndCollectWithTimestamp 創建StreamRecord 對象

protected void processAndCollectWithTimestamp(T element, long timestamp) {
            output.collect(reuse.replace(element, timestamp)); // 放入真正的事件時間戳
        }

下面我們來看 TimestampsAndPunctuatedWatermarksOperator.processElement 的源碼 

@Override
    public void processElement(StreamRecord<T> element) throws Exception {
        // 獲取這條數據
        final T value = element.getValue();
        // userFunction 就是代碼里面創建的匿名類 AssignerWithPunctuatedWatermarks
        // 調用extractTimestamp,獲取新的Timestamp
        // element.hasTimestamp 有的話就用,沒有就給默認值long類型 的最小值
        final long newTimestamp = userFunction.extractTimestamp(value,
                element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
        // 使用新的Timestamp 替換StreamRecord 舊的Timestamp
        output.collect(element.replace(element.getValue(), newTimestamp));
        // 獲取下一個Watermark,調用實現的 checkAndGetNextWatermark 方法
        final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp);
        // 如果新的Watermark 大於上一個Watermark 就發出新的
        if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) {
            currentWatermark = nextWatermark.getTimestamp();
            output.emitWatermark(nextWatermark);
        }
    }

至此Timestamp和Watermark的創建(或者說生成)就好了

2. Flink 處理遲到數據

  為了演示這個功能,在上面的程序中添加了window算子遲到數據側邊輸出的方法 sideOutputLateData,為了方便查看,這里再添加一次全部代碼

val source = new FlinkKafkaConsumer[ObjectNode]("late_data", new JsonNodeDeserializationSchema(), Common.getProp)
    // 側邊輸出的tag
    val late = new OutputTag[LateDataEvent]("late")

    val input = env.addSource(source)
      .map(json => {
        // json : {"id" : 0, "createTime" : "2019-08-24 11:13:14.942", "amt" : "9.8"}
        val id = json.get("id").asText()
        val createTime = json.get("createTime").asText()
        val amt = json.get("amt").asText()
        LateDataEvent("key", id, createTime, amt)
      })
      // assign watermarks every event
      .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() {
      // check extractTimestamp emitted watermark is non-null and large than previously
      override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = {
        new Watermark(extractedTimestamp)
      }
      // generate next watermark
      override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = {
        val eventTime = sdf.parse(element.createTime).getTime
        eventTime
      }
    })
      // after keyBy will have window number of different key
      .keyBy("key")
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      // get lateData
      .sideOutputLateData(late)
      .process(new ProcessWindowFunction[LateDataEvent, LateDataEvent, Tuple, TimeWindow] {
        // just for debug window process late data
        override def process(key: Tuple, context: Context, elements: Iterable[LateDataEvent], out: Collector[LateDataEvent]): Unit = {
          // print window start timestamp & end timestamp & current watermark time
          println("window:" + context.window.getStart + "-" + context.window.getEnd + ", currentWatermark : " + context.currentWatermark)
          val it = elements.toIterator
          while (it.hasNext) {
            val current = it.next()
            out.collect(current)
          }
        }
      })
    // print late data
    input.getSideOutput(late).print("late:")
    input.print("apply:")
    env.execute("LateDataProcess")

代碼邏輯很簡單,主要是為了加入window算子,process算子是為了方便debug到window算子中

下面開始debug源碼:

在process 方法中添加斷點:

這次直接從window算子接收上游發過來的數據開始看起:

StreamInputProcessor.processInput方法負責將接收到的事件(數據、Watermark、StreamStatus、LatencyMarker),反序列化為 StreamElement(上文已經說得了,是事件抽象的基類),判斷具體是那種消息,分別進行處理

public boolean processInput() throws Exception {
        
        while (true) {
            if (currentRecordDeserializer != null) {
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();

                    if (recordOrMark.isWatermark()) {
                        // handle watermark
                        statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                        continue;
                    } else if (recordOrMark.isStreamStatus()) {
                        // handle stream status
                        statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                        continue;
                    } else if (recordOrMark.isLatencyMarker()) {
                        // handle latency marker
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }

        }
    

注:代碼比較長,挑選了跟這次主題相關的部分

Watermark:

數據:

 這里我們主要看數據的處理邏輯:

// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
    // metric 的Counter,統計有多少條數據進來
    numRecordsIn.inc();  
    // 選擇當前的key(類似與數據分區,每個key一個,里面存儲自己的states)
    streamOperator.setKeyContextElement1(record);
    // 真正在進到WindowOperator 中處理數據了
    streamOperator.processElement(record);
}
就到了 WindowOperator.processElement 方法(主要判斷邏輯都在這里)
// 判斷windowAssigner 是不是MergingWindowAssigner 
if (windowAssigner instanceof MergingWindowAssigner) 

區分開會話窗口和滑動、跳動窗口的處理邏輯,會話窗口的各個key的窗口是不對齊的

直接到 else部分:

} else {
    for (W window: elementWindows) {

        // drop if the window is already late 判斷窗口數據是否遲到
        // 是,就直接跳過這條數據,重新處理下一條數據
        if (isWindowLate(window)) {
            continue;
        }

PS: 寫了這么久,終於到遲到數據處理的地方了 -_-

下面看下 isWindowLate 部分的處理邏輯:

/**
 * Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness
 * of the given window.
 */
protected boolean isWindowLate(W window) {
    // 只有事件時間下,並且 窗口元素的最大時間 + 允許遲到時間 <= 當前Watermark 的時候為true(即當前窗口元素遲到了)
    return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}        

/**
 * Returns the cleanup time for a window, which is
 * {@code window.maxTimestamp + allowedLateness}. In
 * case this leads to a value greater than {@link Long#MAX_VALUE}
 * then a cleanup time of {@link Long#MAX_VALUE} is
 * returned.
 * 返回窗口的cleanup 時間, 窗口的最大時間 + 允許延遲的時間
 * @param window the window whose cleanup time we are computing.
 */
private long cleanupTime(W window) {
    if (windowAssigner.isEventTime()) {
        long cleanupTime = window.maxTimestamp() + allowedLateness;
        return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
    } else {
        return window.maxTimestamp();
    }
}

看一條正常到達的數據

{"id" : 891, "createTime" : "2019-08-24 17:51:44.152", "amt" : "5.6"}

891 這條數據的事件時間是:2019-08-24 17:51:44.152 ,1 分鍾的整分窗口,這條數據對應的窗口就是: [2019-08-24 17:51:00.000, 2019-08-24 17:52:000) ,對應的時間戳是 : [1566640260000, 1566640320000) ,當前的Watermark 是 : 1566640294102,窗口數據的最大時間戳大於 當前的Watermark, 不是遲到數據,不跳過。

 現在在來看一條遲到的數據

{"id" : 892, "createTime" : "2019-08-24 17:51:54.152", "amt" : "3.6"}

892 這條數據的事件時間是:2019-08-24 17:51:54.152 ,1 分鍾的整分窗口,這條數據對應的窗口就是: [2019-08-24 17:51:00.000, 2019-08-24 17:52:000) ,對應的時間戳是 : [1566640260000, 1566640320000) ,當前的Watermark 是 : 1566652224102 ,窗口數據的最大時間戳小於 當前的Watermark, 數據是遲到數據,跳過。

 

 上面就是窗口對遲到數據的處理源碼dubug了,到這里就已經講完Flink 處理遲到數據的兩個部分:

  1. 基於事件時間創建Timestamp 和Watermark(后面會詳細介紹)

  2. 窗口處理遲到數據

注: 這里加上“窗口”,明確是window 算子做的這些事情

下面在來看下窗口遲到輸出的SideOutput ,源碼在:WindowOperator.processElement 方法的最后一段:

// side output input event if 事件時間
// element not handled by any window 沒有window處理過這條數據,上面isSkippedElement 默認值為true,如果上面判斷為遲到數據,isSkippedElement就會為false
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
if (isSkippedElement && isElementLate(element)) {
    // 設置了 lateDataOutputTag 即window 算子后面的  .sideOutputLateData(late) 
    if (lateDataOutputTag != null){
        sideOutput(element);
    } else {
        this.numLateRecordsDropped.inc();
    }
}

/**
 * Decide if a record is currently late, based on current watermark and allowed lateness.
 * 事件時間,並且 元素的時間戳 + 允許延遲的時間 <= 當前watermark 是為true
 * @param element The element to check
 * @return The element for which should be considered when sideoutputs
 */
protected boolean isElementLate(StreamRecord<IN> element){
    return (windowAssigner.isEventTime()) &&
        (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());
}

/**
 * Write skipped late arriving element to SideOutput.
 * 
 * @param element skipped late arriving element to side output
 */
protected void sideOutput(StreamRecord<IN> element){
    output.collect(lateDataOutputTag, element);
}

搞定

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM