Flink 窗口機制
窗口概述:
- 窗口是Flink用來處理無界流的核心,窗口將流切成有界的桶,之后就可以在bucket基礎上對數據計算。所以窗口的單位是桶。
為什么要使用窗口?
- 流式處理中數據都是源源不斷的來,不可能等到所有數據都到了之后才開始計算,而我們可以定義一個時間段這個間隔內的數據進行計算,這個時間階段就是窗口,窗口是一種切割無限數據為有限塊進行處理的手段。
窗口分類:
-
基於事件的窗口> 事件窗口就是基於事件數據個數的窗口,是Flink特有的,與時間無關。
-
滾動窗口:'countWindow(3)'只需指定窗口大小,當元素數量達到窗口大小就會觸發計算。
-
滑動窗口:'countWindow(3,2)'滑動窗口與滾動窗口同名只需多傳個參數,作為步長,每收到2個相同key的數據就會計算一次,最多3個。
說明:分組前開窗用windowAll()是非並行化的算子,所有的數據都會進入一個並行實例。分組后開窗用window()每個組一個窗口,每個組單獨輸出自己的,分組計算。
-
-
基於時間的窗口> 時間窗口就是基於處理時間、和事件時間驅動的窗口,時間窗口需要指定窗口分配器。
-
滾動窗口:滾動窗口有固定的大小,窗口與窗口之間不能重疊也不存在縫隙,每個事件只能屬於一個窗口。
說明:滾動窗口左閉右開,每個事件只能屬於一個窗口。
-
滑動窗口:與滾動窗口一樣, 滑動窗口也是有固定的長度,多了滑動步長,用來控制滑動窗口啟動的頻率,滑動步長小於窗口長度時窗口會重疊,每個數據都會被分配到多個窗口中。
說明:滑動窗口每經過一個步長都會有一個窗口關閉輸出,並且會向前多滑動一個窗口計算。
數據的最大重復次數:長度除以步長為一個數據最多的重復計算次數。 -
會話窗口:有一段時間沒有收到數據,會話窗口會自動關閉,后序的元素會被分配到一個新的會話窗口,這段沒有收到數據的時間就是會話窗口的gap。
說明:會話窗口不會有重疊,與滾動窗口和滑動窗口相比,會話窗口也沒有固定的開啟和關閉時間。
原理:在Flink內部,每到達一個新的元素都會創建一個新的會話窗口,如果這些窗口彼此相距比較定義的間隔小, 則會對他們進行合並。為了能夠合並會話窗口算子需要合並觸發器和合並窗口函數。
-
4.全局窗口:全局窗口分配器會分配相同key的所有元素進入同一個全局窗口,該窗口只有指定自定義的觸發器時才有用,否則不會觸發計算。
說明:
1.無論那種窗口都是以事件為驅動的,只有事件來了才會觸發計算,不來不計算。
2.FLink窗口的划分並不是以第一條數據為基准的。 -
窗口函數:
-
增量聚合函數:來一個數據計算一次,窗口結束時輸出一次。
- ReduceFunction:Reduce方法是對兩個數據進行聚合,第一個數據是上一次的結果,同一窗口內同一分組的數據來的時候不會進入reduce方法。
- AggregateFunction:需要定義累加器並初始化,相比reduce更加靈活,輸入類型、中間狀態、輸出結果都可以不同。
-
全窗口函數:每來一條數據就先存起來,到窗口觸發計算時一起計算。
- ProcessWindowFunction:內有一個收集器用於向下游發送數據,可以自定義觸發器變成增量聚合。
說明:由於全窗口函數需要將已經接收到的數據先存起來,等到窗口觸發時機到了在一起執行計算,當數據量很大時所有數據都存儲在內存中可能會導致內存泄漏OOM,此時就可以通過自定義觸發器修改全窗口函數的觸發時機,或者也可以使用窗口聚合函數,傳一個增量聚合函數和一個全窗口函數。
時間語義:
-
處理時間:服務器處理的時間,容易存在數據漂移。
-
事件時間:事件發生的事件,事件時間不能用來表明時間的進度,可能存在數據亂序的問題,因此需要通過waterMark水印來表示事件的進展。
-
注入時間:事件剛剛進入Flink的系統事件就是事件的注入時間,即source數據時的時間。
說明:Flink1.12 版本默認事件時間語義,之前版本默認為處理時間語義,凡是使用事件時間都需要指定Watermark以及時間字段的提取。
WaterMark:
-
概念:
1.衡量事件時間的進展。
2.是單調不減的,要么增加要么保持不變。
3.是特殊的數據,作為數據流的一部分隨數據流傳遞,並且攜帶一個時間戳t。
4.用來解決數據流的事件時間亂序問題。
5.WaterMark認為在此之前的數據都已經處理過,在此之后的數據認為是遲到數據。
6.窗口的計算和關閉的觸發、定時器的觸發都依賴 Watermark。說明:事件時間的機制即使事件是有序的必須使用watermark,因為所有的計算觸發、窗口的關閉執行等都需要watermark。
-
Watermark 分類:
間歇性的生成>來一條數據更新一次 => onEvent(),每來一條數據都生成一個watermark會導致性能降低。
周期性的生成>默認的生成方式,間隔固定周期更新一次,默認200ms => onPeriodicEmit()。 -
Watermark 生成邏輯:
-
升序>Watermark = 當前最大事件時間 - 1ms
WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner() -
亂序>Watermark = 當前最大事件時間 - 亂序程度 - 1ms
WatermarkStrategy
.forBoundedOutOfOrderness(Duration)
.withTimestampAssigner()說明:
1.有序時亂序程度為0,Watermark默認是Long的最小值。
2.為了防止讀文件太快而watermark沒有更新還是Long的最小值太小了導致事件無法被觸發,所以將watermark置為Long的最大值。
-
-
Watermark 多並行度傳遞:
1.上游算子一個下游算子多個: 采用廣播的形式。
2.上游多個算子下游一個算子:以最小的watermark為准。說明:如果需要接收多個算子的Watermark,需要等待所有的Watermark都來了才可以計算,否則會一致等待。
-
其他注意:
1.如果上游是Kafka,直接在官方提供的SourceFunction實現類上,指定watermark,用來保證Kafka到Source之間的數據有序。
2.多分區的數據源,設置一個idle超時,防止watermark不更新的問題。
3.為了防止讀文件太快而watermark沒有更新還是Long的最小值太小了導致事件無法被觸發,所以將watermark置為Long的最大值。
窗口允許遲到:
-
Flink的窗口允許遲到,用來解決關窗前的遲到數據,如果窗口設置了等待時間時,當Watermark大於等於窗口的最大值時只會觸發計算而不會關閉,在允許遲到時間范圍內每來一條數據都會觸發一次計算,當Watermark大於等於窗口的最大值+等待時間時,窗口就會關閉,在此之后再來的數據窗口不處理而是放到測輸出流中,使用測輸出流來解決窗口關閉之后的遲到數據。
說明:
1.等待時間=最大亂序數(有序為0)。
2.在窗口運行遲到的范圍時間內,每來一條數據都會觸發一次計算。
側輸出流:
-
測輸出流用來接收窗口關閉之后的遲到數據,需要對窗口設置測輸出流,並且窗口中的測輸出流只能有一個參數為測輸出流標簽,如果需要匯總遲到數據需要使用Connect合並兩條流,這僅僅是將兩條流的數據放在一起進行存儲,如果要聚合還要通過Process自定義實現。
-
使用測輸出流可以將一個流分為多個流,使用測輸出流可以在主流分析數據的同時在進行指標的監測,通過運行時上下文對象獲取測輸出流,並且該對象比窗口中的測輸出流更靈活,可以有兩個參數,第一個為輸出流標簽,第二個參數可以根據需求自定義。
說明:測輸出流必須以匿名內部類的方式在代碼中指定否則會報錯,側輸出流識別的是名字而不是標簽,只要名字一樣即便標簽不一樣也會被認為是同一個測輸出流。
-
側輸出流還可以用來存儲CEP中的超時和遲到數據。
觸發器:
-
作用就是決定何時觸發窗口函數中的邏輯執行。
-
三種模式:
- 'fire'>觸發計算,默認。
- 'purge'>清除窗口中的元素。
- 'fire_and_purge'>觸發計算並清空窗口中的元素。
說明:讀文件有界流時,可能窗口的處理時間遠遠大於讀取文件的時間,窗口還沒有觸發計算文件就已經讀取完了有界流程序就會退出,這種情況可以觸發器修改窗口的觸發時間。
定時器:
-
基於事件時間或處理時間操作過每個數據后都注冊一個定時器,然后在定時器指定時間觸發並執行相應的操作。無論那種時間的定時器都可以通過運行時上下文對象注冊或刪除,待定時時間到會觸發定時器執行回調函數,OnTimer方法用來觸發定時器。
-
定時器原理:
-
注冊:每次調用注冊方法,都會new一個定時器對象放到一個去重的隊列中(僅對同分組內的定時器去重,不同分組的沒關系),即使重復注冊定時器也只會有一個定時器起作用。
-
觸發:基於事件時間的觸發器 WaterMark >= 定時器注冊時間時會觸發定時器。
注意:WaterMark = 當前最大事件時間 - 1 毫秒 ,而定時器的注冊時間就是個整數沒有減1毫秒。
-
