Flink學習筆記:Time的故事


本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:

 

Flink大數據項目實戰:http://t.cn/EJtKhaz

1. Time三兄弟

1.1 DataStream支持的三種time

DataStream有大量基於time的operator,windows操作只是其中一種。

 

Flink支持三種time:

1.EventTime

2.IngestTime

3.ProcessingTime

 

1.2三個時間的比較

EventTime

1.事件生成時的時間,在進入Flink之前就已經存在,可以從event的字段中抽取。

2.必須指定watermarks(水位線)的生成方式。

3.優勢:確定性,亂序、延時、或者數據重放等情況,都能給出正確的結果

4.弱點:處理無序事件時性能和延遲受到影響

 

IngestTime

1.事件進入flink的時間,即在source里獲取的當前系統的時間,后續操作統一使用該時間。

2.不需要指定watermarks的生成方式(自動生成)

3.弱點:不能處理無序事件和延遲數據

 

ProcessingTime

1.執行操作的機器的當前系統時間(每個算子都不一樣)

2.不需要流和機器之間的協調

3.優勢:最佳的性能和最低的延遲

4.弱點:不確定性 ,容易受到各種因素影像(event產生的速度、到達flink的速度、在算子之間傳輸速度等),壓根就不管順序和延遲

 

比較

性能: ProcessingTime> IngestTime> EventTime

延遲: ProcessingTime< IngestTime< EventTime

確定性: EventTime> IngestTime> ProcessingTime

1.3根據業務選擇最合適的時間

Hadoop的日志進入Flink的時間為2018-12-23 17:43:46,666(Ingest Time),在進入window操作時那台機器的系統時間是2018-12-23 17:43:47,120(Processing Time),日志的具體內容是:

 

      (Event Time)2018-12-23 16:37:15,624 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over to rm2

 

要統計每個5min內的日志error個數,哪個時間是最有意義的? 最佳選擇就是【event time】。一般都需要使用event time,除非由於特殊情況只能用另外兩種時間來代替。

1.4設置time類型

設置時間特性

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

 

不設置Time 類型,默認是processingTime。

 

如果使用EventTime則需要在source之后明確指定Timestamp Assigner & Watermark Generator(見后面小節)。

2. 時間戳和水位線背后的機制

2.1 Watermarks是干啥的

out-of-order/late element

實時系統中,由於各種原因造成的延時,造成某些消息發到flink的時間延時於事件產生的時間。如果基於event time構建window,但是對於late element,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了。這個特別的機制,就是watermark。

 

Watermarks(水位線)就是來處理這種問題的機制

1.參考google的DataFlow。

2.是event time處理進度的標志。

3.表示比watermark更早(更老)的事件都已經到達(沒有比水位線更低的數據 )。

4.基於watermark來進行窗口觸發計算的判斷。

2.2有序流中Watermarks

在某些情況下,基於Event Time的數據流是有續的(相對event time)。在有序流中,watermark就是一個簡單的周期性標記。

 

2.3亂序流中Watermarks

在更多場景下,基於Event Time的數據流是無續的(相對event time)。

在無序流中,watermark至關重要,她告訴operator比watermark更早(更老/時間戳更小)的事件已經到達, operator可以將內部事件時間提前到watermark的時間戳(可以觸發window計算啦)

 

上圖可以類比銀行或者醫院的排號來理解。

2.4並行流中的Watermarks

 

通常情況下, watermark在source函數中生成,但是也可以在source后任何階段,如果指定多次 watermark,后面指定的 watermark會覆蓋前面的值。 source的每個sub task獨立生成水印。

watermark通過operator時會推進operators處的當前event time,同時operators會為下游生成一個新的watermark。

多輸入operator(union、 keyBy、 partition)的當前event time是其輸入流event time的最小值。

3.生成Timestamp和Watermark

 

3.1 Timestamp /Watermark兩種生成方式

3.2 Timestamp /Watermark兩種生成方式

只有基於EventTime的流處理程序需要指定Timestamp和Watermarks的生成方式。

指定時間特性為Event Time(前面講過)。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

分配timestamp和生成Watermarks兩種方式:

聲明時間特性為Event Time后,Flink需要知道每個event的timestamp(一般從event的某個字段去抽取),Flink還需要知道目前event time的進度也就是Watermarks(一般伴隨着Event Time一起指定生成方式,二者息息相關)

方式1:直接在source function中生成

方式2:timestamp assigner / watermark generator

注意:timestamp和watermark都是采用毫秒(從java的1970-01-01T00:00:00Z時間作為起始)。

聲明:event、element、record都是一個意思。

3.3方式一、直接在source function中生成

自定義source實現SourceFunction接口或者繼承RichParallelSourceFunction。

 

3.4方式二、 timestamp assigner / watermark generator

通過assignTimestampsAndWatermarks方法指定timestamp assigner / watermark generator

一般在datasource后調用assignTimestampsAndWatermarks,也可以在第一個基於event time的operator之前指定(例如window operator)。

特例:使用Kafka Connector作為source時,在source內部assignTimestampsAndWatermarks。

 

3.5兩種Watermark

Periodic(周期性) Watermarks

1.基於Timer

2.ExecutionConfig.setAutoWatermarkInterval(msec) (默認是 200ms, 設置watermarker 發送的周期)。

3.實現AssignerWithPeriodicWatermarks 接口。

Puncuated(間斷的) WaterMarks

1.基於某些事件觸發watermark 的生成和發送(由用戶代碼實現,例如遇到特殊元素) 。

2.實現AssignerWithPeriodicWatermarks 接口。

3.6 Periodic Watermark

周期性調用getCurrentWatermark,如果獲取的Watermark不等於null且比上一個最新的Watermark大就向下游發射。

 

3.7 Puncuated Watermark

間斷性調用getCurrentWatermark,它會根據一個條件發送watermark,這個條件可以自己去定義。

 

4. 預定義Timestamp Extractors / Watermark Emitters

4.1Assigners with ascending timestamps

適用於event時間戳單調遞增的場景,數據沒有太多延時。

 

4.2允許固定延遲的Assigner

適用於預先知道最大延遲的場景(例如最多比之前的元素延遲3000ms)。

 

4.3延遲數據處理

延時數據處理一般有兩種處理方式:

方式一:allowedLateness(),設定最大延遲時間,觸發被延遲,不宜設置太長。

方式二: sideOutputTag ,提供了延遲數據獲取的一種方式,這樣就不會丟棄數據了。

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM