Flink中的Time與Window


一、Time

在Flink的流式處理中,會涉及到時間的不同概念

Event Time(事件時間):是事件創建的時間。它通常由事件中的時間戳描述,例如采集的日志數據中,每一條日志都會記錄自己的生成時間,Flink通過時間戳分配器訪問事件時間戳

Ingestion Time(采集時間):是數據進入Flink的時間

Processing Time(處理時間):是每一個執行基於時間操作的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time。

例如一條日志進入Flink的時間為2017-11-12 10:00:00.123 到達window的系統時間為 2017-11-12 10:00:01.234,日志內容如下:

2017-11-02 18:37:15.624 INFO Fair over to rm2

對於業務來說,要統計1min內的故障日志個數,哪個時間是最有意義的?----- eventTime,因為我們要根據日志的生成時間進行統計。

  

如果要想聚合,不可能對無解數據流進行聚合。

 

二、Window

1、streaming流式計算是一種被設計用於處理處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增長的本質上無限的數據集,而window是一種切割無限數據為有限塊進行處理的手段。

Window是無限數據流處理的核心,Window將一個無限的stream拆分成有限大小的"buckets"桶,我們可以在這些桶上做計算操作。

共有兩類,五種時間窗口。

2、Window類型(兩類)

2.1、CountWindow:按照指定的數據條數生成一個window,與時間無關

2.2、TimeWindow:按照時間生成window。(按照Processing Time來划分Window)

對於TimeWindow和CountWindow,可以根據窗口實現原理的不同分成三類:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)。

(1)滾動窗口(Tumbling Windows)

將數據依據固定的窗口長度對數據進行切分。

特點:時間對齊,窗口長度固定,沒有重疊。

滾動窗口分配器將每個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,並且不會出現重疊。

(2)滑動窗口(Sliding Windows)

滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成。

特點:時間對齊,窗口長度固定,有重疊。

滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口類似,窗口的大小由窗口大小參數來配置,另一個窗口滑動參數控制滑動窗口開始的頻率。

因此,滑動窗口如果滑動參數小於窗口大小的話,窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。

使用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警。)

(3)會話窗口(Session Windows)

由一系列事件組合一個指定時間長度的timeout間隙組成。類似於web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口。

特點:時間無對齊。

session 窗口分配器通過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的情況,相反,當它在一個固定的

時間周期內不再收到元素,即非活動間隔產生,那這個窗口就會關閉。一個Session窗口通過一個session間隔來配置,這個session間隔定義了非活躍周期的長度,當這個非活躍

周期產生,那么當前的session將關閉並且后續的元素將被分配到新的session窗口中去。

 

三、Window API

3.1、CountWindow

CountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果。

注意:CountWindow的window_size 指的是相同key的元素的個數,不是輸入的所有元素的總數。

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}


/**
  * CountWindow 中的滾動窗口(Tumbling Windows)
  * 將數據依據固定的窗口長度對數據進行切分。
  */
object TimeAndWindow {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream: DataStream[String] = env.socketTextStream("localhost",11111)
    val streamKeyBy: KeyedStream[(String, Long), Tuple] = stream.map(item => (item,1L)).keyBy(0)
    //注意:CountWindow的window_size 指的是相同key的元素的個數,不是輸入的所有元素的總數。
    val streamWindow: DataStream[(String, Long)] = streamKeyBy.countWindow(5)
                .reduce((item1, item2)=>(item1._1,item1._2+item2._2))

    streamWindow.print()
    env.execute("TimeAndWindow")

  }
}

3.2

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}


/**
  * CountWindow 中的滑動窗口(Sliding Windows)
  * 將數據依據固定的窗口長度對數據進行切分。
  */
object TimeAndWindow {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream: DataStream[String] = env.socketTextStream("localhost",11111)
    val streamKeyBy: KeyedStream[(String, Long), Tuple] = stream.map(item => (item,1L)).keyBy(0)
    //注意:CountWindow的window_size 指的是相同key的元素的個數,不是輸入的所有元素的總數。
    //滿足步長,就執行一次,按第一個參數的長度
    val streamWindow: DataStream[(String, Long)] = streamKeyBy.countWindow(5,2)
                .reduce((item1, item2)=>(item1._1,item1._2+item2._2))

    streamWindow.print()
    env.execute("TimeAndWindow")

  }
}

四、EventTime與Window

1、EventTime的引入

在Flink的流式處理中,絕大部分的業務都會使用eventTime,一般只在eventTime無法使用時,才會被迫使用ProcessingTime或者IngestionTime。

如果要使用EventTime,那么需要引入EventTime的時間戳,引入方式如下所示:

2、Watermark

  概念:我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的,雖然大部分情況下,流到operator的數據都是按照事件產生的

事件戳順序來的,但是也不排除由於網絡、背壓等原因,導致亂序的產生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴格按照事件的EventTime順序排列的。

  Watermark是一種衡量Event Time進展的機制,它是數據本身的一個隱藏屬性,數據本身攜帶着對應的Watermark。

  Watermark是用於處理亂序事件的,而正確的處理亂序事件,通常用Watermark機制結合window來實現。

  數據流中的Watermark用於表示eventTime小於Watermark的數量,都已經到達了,因此,window的執行也是由Watermark觸發的。

  Watermark可以理解成一個延遲觸發機制。我們可以設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,然后認定eventTime 小於

maxEventTime-t 的所有數據都已經到達。如果有窗口的停止時間等於maxEventTime-t,那么這個窗口被觸發執行。

滾動窗口/滑動窗口/會話窗口

 
         
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

/**
* TimeWindow
*/
object EventTimeAndWindow {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//開啟watermark
//從調用時刻開始給env創建的每一個stream追加時間特征。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: KeyedStream[(String, Long), Tuple] = env.socketTextStream("192.168.218.130", 1111).assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) {
override def extractTimestamp(element: String): Long = {
// event word eventTime是日志生成時間,我們從日志中解析EventTime
val eventTime = element.split(" ")(0).toLong
println(eventTime)
eventTime
}
}
).map(item => (item.split(" ")(1),1L)).keyBy(0)
//加上滾動窗口,窗口大小是5s,調用window的api
// val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//滑動窗口
// val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
//會話窗口
val streamWindow: WindowedStream[(String, Long), Tuple, TimeWindow] = stream.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
val streamReduce = streamWindow.reduce((item1,item2)=>(item1._1,item1._2+item2._2))
streamReduce.print()

env.execute("EventTimeAndWindow")
}
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM