先看上一節的代碼程序
package com.wyh.windowsApi import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object WindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //周期性生成watermark 默認是200毫秒 env.getConfig.setAutoWatermarkInterval(100L) /** * 從文件中讀取數據 * * */ //val stream = env.readTextFile("F:\\flink-study\\wyhFlinkSD\\data\\sensor.txt") val stream = env.socketTextStream("localhost", 7777) //Transform操作 val dataStream: DataStream[SensorReading] = stream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) //===到來的數據是升序的,准時發車,用assignAscendingTimestamps //指定哪個字段是時間戳 需要的是毫秒 * 1000 // .assignAscendingTimestamps(_.timestamp * 1000) //===處理亂序數據 // .assignTimestampsAndWatermarks(new MyAssignerPeriodic()) //==底層也是周期性生成的一個方法 處理亂序數據 延遲1秒種生成水位 同時分配水位和時間戳 括號里傳的是等待延遲的時間 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = { t.timestamp * 1000 } }) //統計10秒內的最小溫度 val minTemPerWindowStream = dataStream .map(data => (data.id, data.temperature)) .keyBy(0) // .timeWindow(Time.seconds(10)) //開時間窗口 滾動窗口 沒有數據的窗口不會觸發 //左閉右開 包含開始 不包含結束 延遲1秒觸發的那個時間的數據不包含 //可以直接調用底層方法,第三個參數傳offset代表時區 //.window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5),Time.hours(-8))) .timeWindow(Time.seconds(15), Time.seconds(5)) //滑動窗口,每隔5秒輸出一次 .reduce((data1, data2) => (data1._1, data1._2.min(data2._2))) //用reduce做增量聚合 minTemPerWindowStream.print("min temp") dataStream.print("input data") env.execute("window Test") } } //設置水位線(水印) 這里有兩種方式實現 //一種是周期性生成 一種是以數據的某種特性進行生成水位線(水印) /** * 周期性生成watermark 默認200毫秒 */ class MyAssignerPeriodic() extends AssignerWithPeriodicWatermarks[SensorReading] { val bound: Long = 60 * 1000 var maxTs: Long = Long.MaxValue override def getCurrentWatermark: Watermark = { //定義一個規則進行生成 new Watermark(maxTs - bound) } //用什么抽取這個時間戳 override def extractTimestamp(t: SensorReading, l: Long): Long = { //保存當前最大的時間戳 maxTs = maxTs.max(t.timestamp) t.timestamp * 1000 } } /** * 亂序生成watermark * 每來一條數據就生成一個watermark */ class MyAssignerPunctuated() extends AssignerWithPunctuatedWatermarks[SensorReading] { override def checkAndGetNextWatermark(t: SensorReading, l: Long): Watermark = { new Watermark(l) } override def extractTimestamp(t: SensorReading, l: Long): Long = { t.timestamp * 1000 } }
開始點源碼 Ctrl + 鼠標左鍵
點進去發現是KededStream里面的其中一個方法,繼續點
我們發現實際上是封裝了一層java代碼,代碼中TimeWindow本身就是一個簡寫,這里發現底層還是.window() 方法 傳入窗口類型參數
我們發現,如果窗口的時間是處理時間就調用滑動處理時間窗口,我們在代碼中設置了事件時間,
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
所以這里是滑動事件處理時間窗口。
繼續點
點進去我們就看到實現的方法了
先來看最開始的時間是如何生成的,繼續點
我們就看到這樣的一個計算公式
來解釋一下,我們可以看到這樣一個參數,offset,它如果我們沒有設置就默認為0。它本身是用來指定時間的時區的。注意:這里有個參數其實叫windowSize 其實傳進來的是一個滑動步長!!!但是不影響結果
如何在代碼中添加這個offset呢:.window() 方法中傳入
SlidingEventTimeWindows.of() 第三個參數就是offset
.window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5),Time.hours(-8)))
在這里,就計算出第一個窗口開始時間。
繼續看調用的
我們可以看到,一個for循環追加了好多窗口window.
判斷如果開始時間大於時間戳減去窗口的大小,那么就把當前這個窗口加上一個創建口大小,然后再減去一個滑動步長,再判斷是否大於時間戳減去窗口的大小,以此類推,知道小於,就結束創建,就可以得出最早創建的窗口。
如果是滾動窗口,傳進來的就是最早結束的時間,直接加上窗口大小