Flink Window(窗口)詳解之分類


Windows 計算是流式計算中非常常用的數據計算方式之一,通過按照固定時間或長度將數據流切分成不同的窗口,然后對數據進行相應的聚合運算,從而得到一定時間范圍內的統計結果。例如統計最近 5 分鍾內某基站的呼叫數,此時基站的數據在不斷地產生,但是通過5 分鍾的窗口將數據限定在固定時間范圍內,就可以對該范圍內的有界數據執行聚合處理,得出最近 5 分鍾的基站的呼叫數量。把無線流的數據變為有界流

1) Global Window 和 Keyed Window

在運用窗口計算時,Flink 根據上游數據集是否為 KeyedStream 類型,對應的Windows 也會有所不同。

  •  Keyed Window:上游數據集如果是 KeyedStream 類型,則調用 DataStream API 的 window()方法,數據會根據 Key 在不同的 Task 實例中並行分別計算,最后得出針對每個 Key 統計的結果。
  •  Global Window:如果是 Non-Keyed 類型,則調用 WindowsAll()方法,所有的數據都會71在窗口算子中由到一個 Task 中計算,並得到全局統計結果。
//讀取文件數據
val data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)
.map(line=>{
var arr =line.split(",")
new
StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.to
Long)
})
//Global Window
data.windowAll(自定義的WindowAssigner)
//Keyed Window
data.keyBy(_.sid)
.window(自定義的WindowAssigner)

2) Time Window 和 Count Window

基於業務數據的方面考慮,Flink 又支持兩種類型的窗口,一種是基於時間的窗口叫Time Window。還有一種基於輸入數據數量的窗口叫 Count Window

3) Time Window(時間窗口)

根據不同的業務場景,Time Window 也可以分為三種類型,分別是滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)
1. 滾動窗口(Tumbling Window)
滾動窗口是根據固定時間進行切分,且窗口和窗口之間的元素互不重疊。這種類型的窗口的最大特點是比較簡單。只需要指定一個窗口長度(window size)。

 

 

//每隔5秒統計每個基站的日志數量
data.map(stationLog=>((stationLog.sid,1)))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
//.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1) //聚合

其中時間間隔可以是 Time.milliseconds(x)、Time.seconds(x)或 Time.minutes(x)。

2. 滑動窗口(Sliding Window)

滑動窗口也是一種比較常見的窗口類型,其特點是在滾動窗口基礎之上增加了窗口滑動時間(Slide Time),且允許窗口數據發生重疊。當 Windows size 固定之后,窗口並不像滾動窗口按照 Windows Size 向前移動,而是根據設定的 Slide Time 向前滑動。窗口之間的數據重疊大小根據 Windows size 和 Slide time 決定,當 Slide time 小於 Windows size便會發生窗口重疊,Slide size 大於 Windows size 就會出現窗口不連續,數據可能不能在任何一個窗口內計算,Slide size 和 Windows size 相等時,Sliding Windows 其實就是Tumbling Windows。

 

//每隔3秒計算最近5秒內,每個基站的日志數量
data.map(stationLog=>((stationLog.sid,1)))
.keyBy(_._1)
.timeWindow(Time.seconds(5),Time.seconds(3))
//.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(3)))
.sum(1)

3. 會話窗口(Session Window)

會話窗口(Session Windows)主要是將某段時間內活躍度較高的數據聚合成一個窗口進行計算,窗口的觸發的條件是 Session Gap,是指在規定的時間內如果沒有數據活躍接入,則認為窗口結束,然后觸發窗口計算結果。需要注意的是如果數據一直不間斷地進入窗口,也會導致窗口始終不觸發的情況。與滑動窗口、滾動窗口不同的是,Session Windows 不需要有固定 windows size 和 slide time,只需要定義 session gap,來規定不活躍數據的時間上限即可。

 

//3秒內如果沒有數據進入,則計算每個基站的日志數量
data.map(stationLog=>((stationLog.sid,1)))
.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(3)))
.sum(1)

4) Count Window(數量窗口)

 Count Window 也有滾動窗口、滑動窗口等。由於使用比較少,不再贅述。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM