本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:
Flink大數據項目實戰:http://t.cn/EJtKhaz
1. Time三兄弟
1.1 DataStream支持的三種time
DataStream有大量基於time的operator,windows操作只是其中一種。
Flink支持三種time:
1.EventTime
2.IngestTime
3.ProcessingTime
1.2三個時間的比較
EventTime
1.事件生成時的時間,在進入Flink之前就已經存在,可以從event的字段中抽取。
2.必須指定watermarks(水位線)的生成方式。
3.優勢:確定性,亂序、延時、或者數據重放等情況,都能給出正確的結果
4.弱點:處理無序事件時性能和延遲受到影響
IngestTime
1.事件進入flink的時間,即在source里獲取的當前系統的時間,后續操作統一使用該時間。
2.不需要指定watermarks的生成方式(自動生成)
3.弱點:不能處理無序事件和延遲數據
ProcessingTime
1.執行操作的機器的當前系統時間(每個算子都不一樣)
2.不需要流和機器之間的協調
3.優勢:最佳的性能和最低的延遲
4.弱點:不確定性 ,容易受到各種因素影像(event產生的速度、到達flink的速度、在算子之間傳輸速度等),壓根就不管順序和延遲
比較
性能: ProcessingTime> IngestTime> EventTime
延遲: ProcessingTime< IngestTime< EventTime
確定性: EventTime> IngestTime> ProcessingTime
1.3根據業務選擇最合適的時間
Hadoop的日志進入Flink的時間為2018-12-23 17:43:46,666(Ingest Time),在進入window操作時那台機器的系統時間是2018-12-23 17:43:47,120(Processing Time),日志的具體內容是:
(Event Time)2018-12-23 16:37:15,624 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over to rm2
要統計每個5min內的日志error個數,哪個時間是最有意義的? 最佳選擇就是【event time】。一般都需要使用event time,除非由於特殊情況只能用另外兩種時間來代替。
1.4設置time類型
設置時間特性
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
不設置Time 類型,默認是processingTime。
如果使用EventTime則需要在source之后明確指定Timestamp Assigner & Watermark Generator(見后面小節)。
2. 時間戳和水位線背后的機制
2.1 Watermarks是干啥的
out-of-order/late element
實時系統中,由於各種原因造成的延時,造成某些消息發到flink的時間延時於事件產生的時間。如果基於event time構建window,但是對於late element,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了。這個特別的機制,就是watermark。
Watermarks(水位線)就是來處理這種問題的機制
1.參考google的DataFlow。
2.是event time處理進度的標志。
3.表示比watermark更早(更老)的事件都已經到達(沒有比水位線更低的數據 )。
4.基於watermark來進行窗口觸發計算的判斷。
2.2有序流中Watermarks
在某些情況下,基於Event Time的數據流是有續的(相對event time)。在有序流中,watermark就是一個簡單的周期性標記。
2.3亂序流中Watermarks
在更多場景下,基於Event Time的數據流是無續的(相對event time)。
在無序流中,watermark至關重要,她告訴operator比watermark更早(更老/時間戳更小)的事件已經到達, operator可以將內部事件時間提前到watermark的時間戳(可以觸發window計算啦)
上圖可以類比銀行或者醫院的排號來理解。
2.4並行流中的Watermarks
通常情況下, watermark在source函數中生成,但是也可以在source后任何階段,如果指定多次 watermark,后面指定的 watermark會覆蓋前面的值。 source的每個sub task獨立生成水印。
watermark通過operator時會推進operators處的當前event time,同時operators會為下游生成一個新的watermark。
多輸入operator(union、 keyBy、 partition)的當前event time是其輸入流event time的最小值。
3.生成Timestamp和Watermark
3.1 Timestamp /Watermark兩種生成方式
3.2 Timestamp /Watermark兩種生成方式
只有基於EventTime的流處理程序需要指定Timestamp和Watermarks的生成方式。
指定時間特性為Event Time(前面講過)。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
分配timestamp和生成Watermarks兩種方式:
聲明時間特性為Event Time后,Flink需要知道每個event的timestamp(一般從event的某個字段去抽取),Flink還需要知道目前event time的進度也就是Watermarks(一般伴隨着Event Time一起指定生成方式,二者息息相關)
方式1:直接在source function中生成
方式2:timestamp assigner / watermark generator
注意:timestamp和watermark都是采用毫秒(從java的1970-01-01T00:00:00Z時間作為起始)。
聲明:event、element、record都是一個意思。
3.3方式一、直接在source function中生成
自定義source實現SourceFunction接口或者繼承RichParallelSourceFunction。
3.4方式二、 timestamp assigner / watermark generator
通過assignTimestampsAndWatermarks方法指定timestamp assigner / watermark generator
一般在datasource后調用assignTimestampsAndWatermarks,也可以在第一個基於event time的operator之前指定(例如window operator)。
特例:使用Kafka Connector作為source時,在source內部assignTimestampsAndWatermarks。
3.5兩種Watermark
Periodic(周期性) Watermarks
1.基於Timer
2.ExecutionConfig.setAutoWatermarkInterval(msec) (默認是 200ms, 設置watermarker 發送的周期)。
3.實現AssignerWithPeriodicWatermarks 接口。
Puncuated(間斷的) WaterMarks
1.基於某些事件觸發watermark 的生成和發送(由用戶代碼實現,例如遇到特殊元素) 。
2.實現AssignerWithPeriodicWatermarks 接口。
3.6 Periodic Watermark
周期性調用getCurrentWatermark,如果獲取的Watermark不等於null且比上一個最新的Watermark大就向下游發射。
3.7 Puncuated Watermark
間斷性調用getCurrentWatermark,它會根據一個條件發送watermark,這個條件可以自己去定義。
4. 預定義Timestamp Extractors / Watermark Emitters
4.1Assigners with ascending timestamps
適用於event時間戳單調遞增的場景,數據沒有太多延時。
4.2允許固定延遲的Assigner
適用於預先知道最大延遲的場景(例如最多比之前的元素延遲3000ms)。
4.3延遲數據處理
延時數據處理一般有兩種處理方式:
方式一:allowedLateness(),設定最大延遲時間,觸發被延遲,不宜設置太長。
方式二: sideOutputTag ,提供了延遲數據獲取的一種方式,這樣就不會丟棄數據了。