實際生產中,由於各種原因,導致事件創建時間與處理時間不一致,收集的規定對實時推薦有較大的影響。所以一般情況時選取創建時間,然后事先創建flink的時間窗口。但是問題來了,如何保證這個窗口的時間內所有事件都到齊了?這個時候就可以設置水位線(waterMark)。
概念:支持基於時間窗口操作,由於事件的時間來源於源頭系統,很多時候由於網絡延遲、分布式處理,以及源頭系統等各種原因導致源頭數據的事件時間可能亂序。這時可以設定一個時間閾值,或者說水位線(waterMark),其作用定義一個最大亂序時間,比如某條日志時間為2019-01-01 08:00:10,如果亂序最大允許時間為10s,那么就認為2019-01-01 08:00:00之前產生的所有事件都到齊了,可以進行計算。
時間窗口:指定一個固定時間間隔的窗口
一、滑動窗口
1、SlidingEventTimeWindows.of(Time.second(4), Time.seconds(3)):表示滑動窗口大小為4秒,滑動步長是3 秒,同時,每3秒才滑動一次;
2、每條數據存活的時間為滑動窗口的大小;
3、如果滑動窗口超過之前的窗口,那么后面來的屬於前面窗口的數據會丟失;
4、來了一條數據,邊移動邊計算滑動窗口的數據(一個窗口停留,計算一次,不移動,不計算 ),直至窗口到達指定位置。
計算某位置時間的公式:
//n:時間戳;size窗口大小;slide:滑動長度 //根據等差公式推導
an = a1 + (x-1)*s a1 = size - slide -1 x = [n - (size-slide)]/slide //除數后再乘以slide
s = slide //當來了一條時間戳為n的事件,就認為指定位置時間之前的所有事件都到齊了
指定位置 = (size-slide-1) + [(n-waterMark) - (size-slide)]/slide * slide
二、翻滾窗口
基於時間窗口,對連續數據進行迭代計算時,不會重疊。翻滾窗口是一個特殊的滑動窗口,當窗口的長度等於滑動的長度時,滑動窗口就是翻滾窗口。
計算某位置時間的公式:
指定位置 = -1 + (n-waterMark)/size * size //除數后再乘以size,size為窗口大小,n為時間戳
三、會話窗口
時間間隔達到一定時間長度時才進行統計計算。
##
測試代碼(需要集群telnet一個producer):
package com.cjs import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows} object WaterMarkTest { /** *想使用WaterMark,需要3個步驟: * 1、對數據進行timestamp提取,即調用assignTimestampsAndWatermarks函數, * 實例化BoundedOutOfOrdernessTimestampExtractor,重寫extractTimestamp方法 * 2、設置使用事件時間,因為WaterMark是基於事件時間 * 3、定義時間窗口:翻滾窗口(TumblingEventWindows)、滑動窗口(timeWindow) * 任意一個沒有實現,都會報異常:Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'? */ def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment val streamAdd = senv.socketTextStream("192.168.112.10",9999) val stream = streamAdd.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) { //WaterMark設置 //對數據流進行處理,獲取timestamp,對數據流就夠不影響
override def extractTimestamp(element: String): Long ={ //定義timestamp怎么從數據中抽取出來
val eventTime = element.split(" ")(0).toLong print(s"$eventTime \n") eventTime } }) //提取時間戳之后,該數據流是帶有時間的,用於事件窗口
.map(x=>(x.split(" ")(1),1L)).keyBy(0) //設置使用事件時間,因為WaterMark是基於事件時間
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //定義翻滾窗口 // stream.window(TumblingEventTimeWindows.of(Time.seconds(3))).sum(1).print() // stream.sum(1).print() //直接輸出,沒有用到事件時間窗口,flink默認是累計統計,來一個,統計一個 //定義滑動窗口
stream.window(SlidingEventTimeWindows.of(Time.seconds(4),Time.seconds(2))).sum(1).print() senv.execute("watermark") } }
