轉載 https://blog.csdn.net/zg_hover/article/details/87592060
概述
flink中支持多種窗口,包括:時間窗口,session窗口,count窗口等,本文簡單介紹這些窗口的原理,並通過例子說明如何使用這些窗口。
時間窗口(Time Windows)
最簡單常用的窗口形式是基於時間的窗口,flink支持兩種時間窗口:
一種是翻滾時間窗口(tumbling time window)
一種是滑動時間窗口(sliding time window)
翻滾時間窗口(tumbling time window)
翻滾時間窗口的窗口是固定的,比如設定一個1分鍾的時間窗口,該時間窗口將只計算當前1分鍾內的數據,而不會管前1分鍾或后1分鍾的數據。
如上圖所示,編寫了一個1分鍾的翻滾窗口,用來收集最后一分鍾的值,並在1分鍾結束時輸出它們的總和。
從上圖可見,該窗口只會計算從當前計時開始的1分鍾內的數據,當1分鍾完成時輸出結果。然后,從完成這一刻起開始計算1分鍾內的數據,依次類推。
一個翻滾窗口的定義如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5)) //定義一個5秒的翻滾窗口
.sum(1)
滑動時間窗口(sliding time window)
滑動窗口,顧名思義,該時間窗口是滑動的。所以,從概念上講,這里有兩個方面的概念需要理解:
窗口:需要定義窗口的大小
滑動:需要定義在窗口中滑動的大小,但理論上講滑動的大小不能超過窗口大小
下面我們來看一個圖例:
如上圖所示,定義了一個1分鍾的滑動窗口。在第一個滑動窗口中,將值9,6,8和4相加,得到結果27。接着,窗口滑動半分鍾(例如,在我們的示例中為2個值),此時窗口中的值為8,4和7,3,產生結果22,以此類推。
可以在Flink中定義1分鍾的滑動窗口,每30秒滑動一次,定義如下:
stream.timeWindow(Time.minutes(1), Time.seconds(30))
定義中,1分鍾為窗口時間,30秒為滑動時間。