來源於 :https://blog.csdn.net/zg_hover/article/details/87592060
概述
flink中支持多種窗口,包括:時間窗口,session窗口,count窗口等,本文簡單介紹這些窗口的原理,並通過例子說明如何使用這些窗口。
時間窗口(Time Windows)
最簡單常用的窗口形式是基於時間的窗口,flink支持兩種時間窗口:
一種是翻滾時間窗口(tumbling time window)
一種是滑動時間窗口(sliding time window)
翻滾時間窗口(tumbling time window)
翻滾時間窗口的窗口是固定的,比如設定一個1分鍾的時間窗口,該時間窗口將只計算當前1分鍾內的數據,而不會管前1分鍾或后1分鍾的數據。
如上圖所示,編寫了一個1分鍾的翻滾窗口,用來收集最后一分鍾的值,並在1分鍾結束時輸出它們的總和。
從上圖可見,該窗口只會計算從當前計時開始的1分鍾內的數據,當1分鍾完成時輸出結果。然后,從完成這一刻起開始計算1分鍾內的數據,依次類推。
一個翻滾窗口的定義如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5)) //定義一個5秒的翻滾窗口
.sum(1)
滑動時間窗口(sliding time window)
滑動窗口,顧名思義,該時間窗口是滑動的。所以,從概念上講,這里有兩個方面的概念需要理解:
窗口:需要定義窗口的大小
滑動:需要定義在窗口中滑動的大小,但理論上講滑動的大小不能超過窗口大小
下面我們來看一個圖例:
如上圖所示,定義了一個1分鍾的滑動窗口。在第一個滑動窗口中,將值9,6,8和4相加,得到結果27。接着,窗口滑動半分鍾(例如,在我們的示例中為2個值),此時窗口中的值為8,4和7,3,產生結果22,以此類推。
可以在Flink中定義1分鍾的滑動窗口,每30秒滑動一次,定義如下:
stream.timeWindow(Time.minutes(1), Time.seconds(30))
定義中,1分鍾為窗口時間,30秒為滑動時間。
滑動窗口使用例子
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
def main(args: Array[String]) {
println("start word count")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5), Time.seconds(3)) // 定義了一個滑動窗口,窗口大小為5秒,每3秒滑動一次
.sum(1)
counts.print()
println("end word count")
env.execute("Window Stream WordCount")
println("exit now!")
}
}
可以在另一個終端通過命令nc -lk 9999輸入一些數據,查看一下效果,並理解一下。
this is a test, time windows.
1
為了,省去編譯和打包的麻煩,可以直接在flink的scala-shell中,修改一下上面的程序,把evn改成senv。
val text = senv.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .keyBy(0) .timeWindow(Time.seconds(5), Time.seconds(3)) .sum(1)
counts.print()
senv.execute("Window Stream WordCount")
總結
本文介紹了flink中事件窗口的基本概念。時間窗口在流式計算中是一個非常核心的概念,需要很好的理解。
原文鏈接:https://blog.csdn.net/zg_hover/java/article/details/87592060