Flink WaterMark原理與實現


一、WaterMark作用

在使用 EventTime 處理 Stream 數據的時候會遇到數據亂序的問題,流處理從 Event(事 件)產生,流經 Source,再到 Operator,這中間需要一定的時間。雖然大部分情況下,傳輸到 Operator 的數據都是按照事件產生的時間順序來的,但是也不排除由於網絡延遲等原因而導致亂序的產生,特別是使用 Kafka 的時候,多個分區之間的數據無法保證有序。因此, 在進行 Window 計算的時候,不能無限期地等下去,必須要有個機制來保證在特定的時間后, 必須觸發 Window 進行計算,這個特別的機制就是 Watermark(水位線)。Watermark 是用於 處理亂序事件的。

 

二、原理

在 Flink 的窗口處理過程中,如果確定全部數據到達,就可以對 Window 的所有數據做 窗口計算操作(如匯總、分組等),如果數據沒有全部到達,則繼續等待該窗口中的數據全 部到達才開始處理。這種情況下就需要用到水位線(WaterMarks)機制,它能夠衡量數據處 理進度(表達數據到達的完整性),保證事件數據(全部)到達 Flink 系統,或者在亂序及 延遲到達時,也能夠像預期一樣計算出正確並且連續的結果。當任何 Event 進入到 Flink 系統時,會根據當前最大事件時間產生 Watermarks 時間戳。

如何計算WaterMark的值?

Watermark = 進入 Flink 的最大的事件時間(mxtEventTime)— 指定的延遲時間(t)

有Watermark 的 Window 是怎么觸發窗口函數?

如果有窗口的停止時間等於或者小於maxEventTime – t(當時的 warkmark),那么 這個窗口被觸發執行

 

Watermark 的使用存在三種情況:

1. 本來有序的 Stream 中的 Watermark

如果數據元素的事件時間是有序的,Watermark 時間戳會隨着數據元素的事件時間按順 序生成,此時水位線的變化和事件時間保持一直(因為既然是有序的時間,就不需要設置延 遲了,那么 t 就是 0。所以 watermark=maxtime-0 = maxtime),也就是理想狀態下的水位 線。當 Watermark 時間大於 Windows 結束時間就會觸發對 Windows 的數據計算,以此類推,下一個 Window 也是一樣。

 

2.亂序事件中的 Watermark

現實情況下數據元素往往並不是按照其產生順序接入到 Flink 系統中進行處理,而頻繁出現亂序或遲到的情況,這種情況就需要使用 Watermarks 來應對。比如下圖,設置延遲時 間t為2

 

 3.並行數據流中的 Watermark

 在多並行度的情況下,Watermark 會有一個對齊機制,這個對齊機制會取所有 Channel中最小的 Watermark。

 

 

三、Watermark 運用

1.有序的Watermark

object WaterMark1 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 使用eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream= env.socketTextStream("flink101", 8888)
      .map(line => {
        var arr = line.split(",")
        Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
      }).assignAscendingTimestamps(_.callTime) // 數據有序的升序watermark
      .filter(_.callType.equals("success"))
        .keyBy(_.sid)
        .timeWindow(Time.seconds(10), Time.seconds(5))
        .reduce(new MyReduceFunction(), new ReturnMaxTimeWindowFunction)

    env.execute("assignAscendingTimestampsDemo")
  }

2.無序的Watermark

object Watermark2 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 使用eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 周期引入watermart的設置,默認是100毫秒
    env.getConfig.setAutoWatermarkInterval(100L)

    val stream= env.socketTextStream("flink101", 8888)
      .map(line => {
        var arr = line.split(",")
        Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
      })
    // 數據是亂序的,延遲時間為3秒,周期性watermark

    /**
      * 第一種實現
      */
    val ds = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Log](Time.seconds(3)){
      override def extractTimestamp(element: Log): Long = {
        element.callTime  // EventTime
      }
    })

    /**
      * 第二種實現
      */
    val ds2 = stream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Log] {

      var maxEventTime: Long =_

      override def getCurrentWatermark: Watermark = {
        // 周期性生成watermark
        new Watermark(maxEventTime - 3000L)
      }

      // 設定EventTime是哪個屬性
      override def extractTimestamp(element: Log, previousElementTimestamp: Long): Long = {
        maxEventTime = maxEventTime.max(element.callTime)
        element.callTime
      }
    })

    env.execute("assignTimestampsAndWatermarksDemo")
  }

 

3.With Punctuated(間斷性的) Watermark

val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 使用eventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//讀取文件數據
val data = env.socketTextStream("flink101",8888)
.map(line=>{
var arr =line.split(",") new
StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.to Long)
})

// 生成watermark
data.assignTimestampsAndWatermarks(
new MyCustomerPunctuatedWatermarks(3000L)) //自定義延遲
}
class MyCustomerPunctuatedWatermarks(delay:Long) extends AssignerWithPunctuatedWatermarks[StationLog]{
var maxTime :Long=0
override def checkAndGetNextWatermark(element: StationLog, extractedTimestamp: Long): Watermark = {
if(element.sid.equals("station_1")){//當ID為:station_1 才生成水位線 maxTime =maxTime.max(extractedTimestamp)
new Watermark(maxTime-delay)
}else{
return null //其他情況下不返回水位線
} }
override def extractTimestamp(element: StationLog, previousElementTimestamp: Long): Long = {
element.callTime //抽取EventTime的值 }
}

 

以上三種Watermark的實現,根據數據的事件時間是否有延遲和業務需求選擇相應的生成WaterMark的方法。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM