Flink| time| watermark| Windows窗口


 

1. Window概述

一般真實的流都是無界的,怎么處理無界的數據?可以把無限的數據流進行切分,得到有限的數據集進行處理----也就是得到有界流。

streaming流式計算是一種被設計用於處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增長的本質上無限的數據集;

而window是把無限數據流為有限流的一種方式,Window將一個無限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作。

 Window API

窗口分配器window( )方法     (一般的DataStream可使用windowall( )這種方式開窗,但所有的數據都在一個分區上,這種方式不推薦使用

     我們可以用.window( )來定義一個窗口,然后基於這個window去做一些聚合或者其他處理操作。注意window( )方法必須在KeyBy(按照不同並行度做分區)之后才能用。

Flink提供了更加簡單的.timeWindow和.countWindow方法,用於定義時間窗口和計數窗口。

 val minTempPerWindow = dataStream.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15))
.reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))

 窗口分配器(window assigner)

     window()方法接收的輸入參數是一個WindowAssigner,WindowAssigner負責將每條輸入的數據分發到正確的window中(是滾動窗口還是滑動窗口呢?窗口大小,步長)

Flink提供了通用的WindowAssigner:滾動窗口(tumbling window)、滑動窗口(sliding window)、會話窗口(session window)、全局窗口(global window)--所有的數據都放這里邊,就變成無界流了沒有結束時間,一般用來自定義窗口。

 窗口操作至少分成2步: ①是.timewindow或者.countwindow  ②是聚合操作 如.reduce( ) ;

創建不同類型的窗口:

滾動時間窗口(tumbling time window) .timeWindow(Time.seconds(15))
滑動時間窗口(sliding time window)  .timeWindow(Time.seconds(15), Time.seconds(5))
會話窗口(session window)          .window(EventTimeSessionWindows.withGap(Time.minutes(10)) //間隔10分鍾
滾動計數窗口(tumbling count window).countWindow(5)
滑動計數窗口(sliding count window) .countWindow(10, 2) //10個一個窗口,間隔2個滑下。

窗口函數(window function)

window function定義了要對窗口中收集的數據做的計算操作,可以分為兩類:

增量聚合函數(incremental aggregation functions):每條數據到來就進行計算,保持一個簡單的狀態;ReduceFunction, AggregateFunction

全窗口函數(full window functions):先把窗口所有數據都收集起來,等到計算的時候會遍歷所有數據ProcessFunctionWindow

               

其他可選API

.trigger()----觸發器:定義window什么時候關閉,觸發計算輸出結果
.evitor()----移除器:定義移除某些數據的邏輯;
.allowedLateness() ---允許處理遲到的數據 
.sideOutputLateData() ---將遲到的數據放入側輸出流
.getSideOutput() ----獲取側輸出流

Window API總覽

   

 

 

 

2. EventTime、IngestionTime、ProcessingTime

在Flink的流式處理中,會涉及到時間的不同概念,如下圖所示

                             

不同的時間語義有不同的應用場景,我們往往更關心事件時間(Event Time--故事發生的時間)。 

Event Time事件創建的時間。它通常由事件中的時間戳描述,例如采集的日志數據中,每一條日志都會記錄自己的生成時間,Flink通過時間戳分配器訪問事件時間戳。

Ingestion Time事件-數據進入Flink的時間。

Processing Time:是每一個執行基於時間操作的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time(事件進入算子的時間)

例如,一條日志進入Flink的時間為2017-11-12 10:00:00.123,到達Window的系統時間為2017-11-12 10:00:01.234,日志的內容如下:

2017-11-02 18:37:15.624 INFO Fail over to rm2

對於業務來說,要統計1min內的故障日志個數,哪個時間是最有意義的?—— eventTime,因為我們要根據日志的生成時間進行統計。

在代碼中設置Event Time

可直接在代碼中,對執行環境調用setStreamTimeCharacteristic方法,設置流的時間特性;
具體的時間,還需要從數據中提取時間戳(timestamp);

val env = StreamExecutionEnvironment.getExecutionEnvironment //從調用時刻開始給env創建的每一個stream追加時間特征

 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

 沒有設置系統默認按processing time;

 

 3. Watermark--解決亂序問題

  我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的,雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由於網絡、分布式等原因,導致亂序的產生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴格按照事件的Event Time順序排列的

亂序數據的影響

     

  那么此時出現一個問題,一旦出現亂序,如果只根據eventTime決定window的運行(遇到一個時間戳達到了窗口的時間,不應該立刻觸發窗口的計算,而是等待一段時間,等遲到的數據來了再關閉窗口),我們不能明確數據是否全部到位,但又不能無限期的等下去,此時必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了,這個特別的機制(延遲觸發機制),就是Watermark

  • Watermark是一種衡量Event Time進展的機制,它是數據本身的一個隱藏屬性,數據本身攜帶着對應的Watermark。
  • Watermark是用於處理亂序事件的,而正確的處理亂序事件,通常用Watermark機制結合window來實現。
  • 數據流中的Watermark用於表示timestamp小於Watermark的數據,都已經到達了,因此,window的執行也是由Watermark觸發的。
  • Watermark可以理解成一個延遲觸發機制,我們可以設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,然后認定eventTime小於maxEventTime - t的所有數據都已經到達,如果有窗口的停止時間等於maxEventTime – t,那么這個窗口被觸發執行。
  • watermark用來讓程序自己平衡延遲和結果正確性;

 watermark的特點:

        

watermark的傳遞

   多個輸入分區,並行;假如設置為eventtime,1個任務有多個分區,一個分區對應一個分區watermark,數據來一個更新分區內的watermark(只更新比它大的--遞增--只漲不跌);多個分區中取最小的watermark為准;

   

watermark的引入:

 Event Time的使用一定要指定數據源的時間戳;  對於排好序的數據,只需要指定時間戳就夠了,不需要延遲觸發。

//注意單位是毫秒,所以根據時間戳的不同,可能需要乘1000 dataStream.assignAscendingTimestamps(_.timestamp * 1000) //已經排好序的,就不需延遲觸發了
 dataStream.assignTimestampsAndWatermarks( // 同時分配時間戳和水位線 new BoundOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)){ //1000ms是等待延遲的時間 override def extractTimestamp(element: SensorReading): Long = { element.timestamp * 1000 //取timestamp作為時間戳 單位毫秒 } })
比如當前5s的數據來了,當前設置了延遲1s,水位線watermark就是4s(不認為5s之前的數據都來了,只認為4s之前的數據都來了)

 Flink暴露了TimestampAssigner接口供我們實現,使我們可以自定義如何從事件數據中抽取時間戳和生產watermark

 dataStream.assignTimestampsAndWatermarks(new MyAssigner()) //自定義MyAssigner可以有兩種類型,都繼承自TimestampAssigner

 TimestampAssigner 定義了抽取時間戳,以及生產watermark的方法,有兩種類型:

    ①AssignerWithPeriodicWatermarks

  • 周期性的生成watermark:系統會周期性的(processingTime)將watermark插入到流中;
  • 默認周期是200毫秒,可以使用ExecutionConfig.setAutoWatermarkInterval()方法進行設置;
  • 升序和前面亂序的處理BoundedOutOfOrderness,都是基於周期性watermark的。

    ②AssignerWithPunctuatedWatermarks

  •  沒有時間周期規律,可打斷的生產watermark。

 watermark的設定:

  • 在Flink中,watermark由應用程序開發人員生成,這通常需要對應的領域有一定的了解;
  • 如果watermark設置的延遲太久,收到結果的速度可能就會很慢,解決辦法是在水位線到達之前輸出一個近似結果;
  • 而如果watermark到達得太早,則可能收到錯誤結果,不過Flink處理遲到數據的機制可以解決這個問題。

 

4. Window可以分成兩類:

   時間窗口TimeWindow滾動時間窗口(Tumbling Window)、滑動時間窗口(Sliding Window)、會話時間窗口(Session Window)。

   計數窗口CountWindow:按照指定的數據條數生成一個Window,與時間無關。分為滾動計數窗口、滑動計數窗口。

TimeWindow

  • 1. 滾動窗口(Tumbling Windows)

    將數據依據固定的窗口長度對數據進行切片

    特點時間對齊,窗口長度固定,沒有重疊。  它是步長 = site的滑動窗口;  

      使用場景:商業BI分析統計(關注的商業指標往往是某個時間段的指標,如一天或一周的銷售額,每個時間段的聚合操作);

    滾動窗口分配器將每個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,並且不會出現重疊。例如:如果你指定了一個5分鍾大小的滾動窗口,窗口的創建如下圖所示:

                    

import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream} import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.api.scala._ object StreamEventTimeApp { def main(args: Array[String]): Unit = { //環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //聲明使用eventTime;引入EventTime 從調用時刻開始給env創建的每一個stream追加時間特征  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dstream: DataStream[String]
= env.socketTextStream("hadoop101", 7777) val textWithTsDStream: DataStream[(String, Long, Int)] = dstream.map { text => val arr: Array[String] = text.split(" ") (arr(0), arr(1).toLong, 1) } // 1 告知 flink如何獲取數據中的event時間戳 2 告知延遲的watermark為 3s val textWithEventTimeDStream: DataStream[(String, Long, Int)] = textWithTsDStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(3000)) { //time別導錯包了 override def extractTimestamp(element: (String, Long, Int)): Long = { return element._2 } }).setParallelism(1) //每5秒開一個窗口 統計key的個數 5秒是一個數據的時間戳為准 val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDStream.keyBy(0) textKeyStream.print("textKey: ") //滾動窗口 val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.milliseconds(5000))) windowStream.sum(2).print("windows: ").setParallelism(1) env.execute() }

[kris@hadoop101 gmall]$ nc -lk 7777 abc 1000 abc 3000 abc 4000 abc 5000 abc 6000 abc 7000 abc 7500 abc 8000 abc 9000 abc 12000 abc 12999 abc 14000 abc 15000 abc 17000 abc 18000 textKey: :8> (abc,1000,1) textKey: :8> (abc,3000,1) textKey: :8> (abc,4000,1) textKey: :8> (abc,5000,1) textKey: :8> (abc,6000,1) textKey: :8> (abc,7000,1) textKey: :8> (abc,7500,1) textKey: :8> (abc,8000,1) Window: > (abc,1000,3) textKey: :8> (abc,9000,1) textKey: :8> (abc,12000,1) textKey: :8> (abc,12999,1) Window: > (abc,5000,6) textKey: :8> (abc,14000,1) textKey: :8> (abc,15000,1) textKey: :8> (abc,17000,1) textKey: :8> (abc,18000,1) Window: > (abc,12000,3)   滾動窗口:   X秒開一個窗口,上例中5s開一個窗;   上例watermark 3s   第n次發車時間:nX+3,車上攜帶的[X, nX)秒內的     如第一次車上攜帶 [0, 5)以內的,在第 5 + 3 = 8s時間點發車
     第二次車上攜帶 [5, 10)以內的,在第10 + 3 = 13s時間點發車
     第三次車上攜帶 [10, 15)以內的,在第15 + 3 = 18s時間點發車;


延遲3s, 不認為當前時間的數據都來了, 認為當前時間 - 3s之前的數據都來了;
上例中如果是不同key的結果如下: 在KeyBy之前分配的時間戳和watermark,並行度為1,都是在一個slot里邊,都是一個任務; KeyBy之后是想把它分到不同的分區,那么前邊的watermark(多個輸入多個輸出,對於前邊source而言是多個輸出,這個watermark要廣播出去) 不同key的數據來了之后,相當於先來的key的watermark也會跟着漲;
<===> watermark的傳遞; textKey::6> (a,2000,1) textKey::6> (a,3000,1) textKey::2> (b,4000,1) textKey::2> (b,5000,1) textKey::6> (a,6000,1) textKey::2> (b,7000,1) textKey::6> (a,7999,1) windows: :6> (a,2000,2) windows: :2> (b,4000,1) textKey::2> (b,9000,1) textKey::6> (a,9998,1) textKey::2> (b,11000,1) textKey::6> (a,12998,1) textKey::2> (b,13000,1) windows: :2> (b,5000,3) windows: :6> (a,6000,3)

 

  • 2. 滑動窗口(Sliding Windows)

    滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成

    特點窗口長度固定,有重疊

     適用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警);

      靈活;連續的波浪;比如股票交易所它是最近24小時的漲跌幅度,隨時往后算隨時往后划;

 滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口類似,窗口的大小由窗口大小參數來配置,另一個窗口滑動參數控制滑動窗口開始的頻率。因此,滑動窗口如果滑動參數小於窗口大小的話,窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。

例如,你有10分鍾的窗口和5分鍾的滑動,那么每個窗口中5分鍾的窗口里包含着上個10分鍾產生的數據,如下圖所示:

         

       

測試代碼:

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

object WindowApi {

  def main(args: Array[String]): Unit = {
    //環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //env.setParallelism(1) 全局並行度設為1
    //聲明使用eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val dstream: DataStream[String] = env.socketTextStream("hadoop101", 7777)
    val textWithTsDStream = dstream.map{
      text =>
        val arr: Array[String] = text.split(" ")
        (arr(0), arr(1).toLong, 1)
    }
    // 1 告知 flink如何獲取數據中的event時間戳  2 告知延遲的watermark
    val textWithEventTimeDStream: DataStream[(String, Long, Int)] = textWithTsDStream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(3000)) {
        override def extractTimestamp(element: (String, Long, Int)): Long = {
          return element._2
        }
      }).setParallelism(1) //只要它的並行度為1就可以了,  其他的算子不需設並行度為1

    val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDStream.keyBy(0)
    textKeyStream.print("textKey:")
    //滾動窗口
    //val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.milliseconds(5000)))
    //滑動窗口 大小4s, 步長2s, 延遲3s
    val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(SlidingEventTimeWindows.of(Time.milliseconds(4000L), Time.milliseconds(2000L)))

    windowStream.sum(2).print("windows: ").setParallelism(1)

    env.execute("Window Stream")
  }
}
View Code

 

如果watermark = 0,窗口大小為5,步長為3s的滑動窗口:

 val textWithEventTimeDStream: DataStream[(String, Long, Int)] = textWithTsDStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(0)) { //time別導錯包了 override def extractTimestamp(element: (String, Long, Int)): Long = { return element._2 } }).setParallelism(1) //滑動窗口 val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(SlidingEventTimeWindows.of(Time.milliseconds(5000L), Time.milliseconds(3000L))) windowStream.sum(2).print("windows: ").setParallelism(1) watermark=0,窗口大小5s, 步長3s : [kris@hadoop101 ~]$ nc -lk 7777 abc 500 abc 1000 abc 1998 abc 1999 abc 2000 --[0, 2) abc 3000 abc 4000 abc 4998 abc 4999 abc 5000 --[0, 5) abc 6000 abc 7000 abc 7998 abc 7999 abc 8000 --[3, 8) abc 9000 abc 10000 abc 10998 abc 10999 abc 11000 --[6, 11) abc 12000 abc 13000 abc 13999 abc 14000 --[9, 14) textKey::8> (abc,500,1) textKey::8> (abc,1000,1) textKey::8> (abc,1998,1) textKey::8> (abc,1999,1) windows: > (abc,500,4) --[0, 2) textKey::8> (abc,2000,1) textKey::8> (abc,3000,1) textKey::8> (abc,4000,1) textKey::8> (abc,4998,1) textKey::8> (abc,4999,1) windows: > (abc,500,9) --[0, 5) textKey::8> (abc,5000,1) textKey::8> (abc,6000,1) textKey::8> (abc,7000,1) textKey::8> (abc,7998,1) textKey::8> (abc,7999,1) windows: > (abc,3000,9) --[3, 8) textKey::8> (abc,8000,1) textKey::8> (abc,9000,1) textKey::8> (abc,10000,1) textKey::8> (abc,10998,1) textKey::8> (abc,10999,1) windows: > (abc,6000,9) --[6, 11) textKey::8> (abc,11000,1) textKey::8> (abc,12000,1) textKey::8> (abc,13000,1) textKey::8> (abc,13999,1) windows: > (abc,9000,8) --[9, 14) textKey::8> (abc,14000,1)

關於滑動窗口觸發執行的時間點:

watermark=0,窗口大小10s, 步長5s, [0, 5) [0, 10) [5, 15) [10, 20) [15, 25) [20, 30) watermark=0,窗口大小5s, 步長2s, [0, 1) [0, 3) [0, 5) [2, 7) [4, 9) watermark=0,窗口大小5s, 步長3s, [0, 2) [0, 5) [3, 8) [6, 11) [9, 14) watermark=0,窗口大小4s, 步長2s, [0, 2) [0, 4) [2, 6) [4, 8) [6, 10) 觸發窗口執行的時間點為:如果是整數倍就是步長,否則就是余數  如watermark=0,窗口大小4s, 步長2s,觸發窗口執行的時間點為2s ; watermark=0,窗口大小6s, 步長3s,觸發窗口執行的時間點為3s ; watermark=0,窗口大小10s, 步長5s, 觸發窗口執行的時間點為5s ; ---------以上為整數倍,以下為余數----------- watermark=0,窗口大小5s, 步長3s,觸發窗口執行時間點為2s (5%3=2); watermark=0,窗口大小5s, 步長2s,觸發窗口執行時間點為1s (5%2=1);

 

 

 如果watermark = 3,窗口大小為5,步長為1s的滑動窗口:

 val textWithEventTimeDStream: DataStream[(String, Long, Int)] = textWithTsDStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(3000)) { override def extractTimestamp(element: (String, Long, Int)): Long = { return element._2 } }).setParallelism(1) //只要它的並行度為1就可以了, 其他的算子不需設並行度為1  val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(SlidingEventTimeWindows.of(Time.milliseconds(5000L), Time.milliseconds(1000L))) Time.milliseconds(5000L)---開車接多少人(5s內的), Time.milliseconds(1000L)---步長決定了觸發開車的時機,如果為5000L則跟滾動窗口是一樣的; Time.milliseconds(3000L)---水位線是3s, 1s時觸發開車,但延遲3s; 5s的窗口大小,1s的步長,3s的延遲 [kris@hadoop101 ~]$ nc -lk 7777 abc 100 abc 1000 abc 3998 abc 3999 abc 5000 abc 8000 abc 10000 =======================> textKey::8> (abc,100,1) textKey::8> (abc,1000,1) textKey::8> (abc,3998,1) textKey::8> (abc,3999,1) //開車的時間點在1s,雖然成可以裝下5s內的,但是開車的時候那里邊只有1個 windows: > (abc,100,1) --1s開車 textKey::8> (abc,5000,1) //開車的時間點在2s,但延遲3s,5s內的都可以,數據時間還不到5s,前邊有多少算多少;(將2s以前,5s內的接走) windows: > (abc,100,2) --2s開車 textKey::8> (abc,8000,1) //8000--> -3000 = 5000,4000,3000 ,2000,1000(這兩個發車時間點前邊已經發過了)各會發車 windows: > (abc,100,2) --3s開車 windows: > (abc,100,4) --4s開車 windows: > (abc,100,4) --5s開車 textKey::8> (abc,10000,1) //10000--> -3000 = 7000[2000, 7000),6000[1000, 6000) windows: > (abc,1000,4) --6s開車 windows: > (abc,3998,3) --7s開車 滑動窗口: 窗口大小5s(X), 步長1s(Y), 水位線watermark 3s(Z) 1s開車, 延遲3s, 1 + 3 = 4s, 帶走[0, 4)以內的,但時間只是到了1s, 即[0, 1)以內的 2s開車, 延遲3s, 2 + 3 = 5s, 帶走[0, 5)以內的,但時間只是到了2s, 即[0, 2)以內的 3s開車, 延遲3s, 3 + 3 = 6s, 帶走[1, 6)以內的,但時間只是到了3s, 即[0, 3)以內的 4s開車, 延遲3s, 4 + 3 = 7s, 帶走[2, 7)以內的,但時間只是到了4s, 即[0, 4)以內的 5s開車, 延遲3s, 5 + 3 = 8s, 帶走[3, 8)以內的,但時間只是到了5s, 即[0, 5)以內的 6s開車, 延遲3s, 6 + 3 = 9s, 帶走[4, 9)以內的,但時間只是到了6s, 即[1, 6)以內的 7s開車, 延遲3s, 7 + 3 = 10s, 帶走[5, 10)以內的,但時間只是到了7s, 即[2, 7)以內的 ... nYs開車, 延遲Zs, n + Z, 帶走[n+Z-X ,n+Z)以內的,但時間只是到了nYs, 即[nY-X, nY)以內的

 

 

watermark = 2s, 窗口大小6s, 步長3s :

 

 // 1 告知 flink如何獲取數據中的event時間戳 2 告知延遲的watermark val textWithEventTimeDStream: DataStream[(String, Long, Int)] = textWithTsDStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(2000)) { override def extractTimestamp(element: (String, Long, Int)): Long = { return element._2 } }).setParallelism(1) //只要它的並行度為1就可以了, 其他的算子不需設並行度為1 //滑動窗口 大小6s, 步長3s, 延遲2s val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(SlidingEventTimeWindows.of(Time.seconds(6), Time.seconds(3))) [kris@hadoop101 ~]$ nc -lk 7777 abc 1000 abc 2000 abc 3000 abc 4998 abc 4999 abc 5000 -- abc 7998 abc 7999 abc 8000 -- abc 10998 abc 10999 abc 11000 -- abc 13998 abc 13999 abc 14000 -- abc 16998 abc 16999 abc 17000 -- abc 19998 abc 19999 abc 20000 -- textKey::8> (abc,1000,1) textKey::8> (abc,2000,1) textKey::8> (abc,3000,1) textKey::8> (abc,4998,1) textKey::8> (abc,4999,1) windows: :8> (abc,1000,2) --[0, 3) textKey::8> (abc,5000,1) textKey::8> (abc,7998,1) textKey::8> (abc,7999,1) windows: :8> (abc,1000,6) --[0, 6) textKey::8> (abc,8000,1) textKey::8> (abc,10998,1) textKey::8> (abc,10999,1) windows: :8> (abc,3000,7) --[3, 9) textKey::8> (abc,11000,1) textKey::8> (abc,13998,1) textKey::8> (abc,13999,1) windows: :8> (abc,7998,6) --[6, 12) textKey::8> (abc,14000,1) textKey::8> (abc,16998,1) textKey::8> (abc,16999,1) windows: :8> (abc,10998,6) --[9, 15) textKey::8> (abc,17000,1) textKey::8> (abc,19998,1) textKey::8> (abc,19999,1) windows: :8> (abc,13998,6) --[12, 18) textKey::8> (abc,20000,1) 3s開車, 延遲2s, 3 + 2 = 5s, 帶走[0, 5)以內的,但時間只是到了3s, 即[0, 3)以內的 6s開車, 延遲2s, 6 + 2 = 8s, 帶走[2, 8)以內的,但時間只是到了6s, 即[0, 6)以內的 9s開車, 延遲2s, 9 + 2 = 11s, 帶走[5, 11)以內的,但時間只是到了9s, 即[3, 9)以內的 12s開車, 延遲2s, 12 + 2 = 14s, 帶走[8, 14)以內的,但時間只是到了12s, 即[6, 12)以內的 15s開車, 延遲2s, 15 + 2 = 17s, 帶走[11, 17)以內的,但時間只是到了15s, 即[9, 15)以內的 18s開車, 延遲2s, 18 + 2 = 20s, 帶走[14, 20)以內的,但時間只是到了18s, 即[12, 18)以內的 ...

 

 

 

 

 

  • 3.  會話窗口(Session Windows)

    由一系列事件組合一個指定時間長度的timeout間隙組成,類似於web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口

    特點時間無對齊

    session窗口分配器通過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的情況,相反,當它在一個固定的時間周期內不再收到元素,即非活動間隔產生,那個這個窗口就會關閉。一個session窗口通過一個session間隔來配置,這個session間隔定義了非活躍周期的長度,當這個非活躍周期產生,那么當前的session將關閉並且后續的元素將被分配到新的session窗口中去。

            

 

object StreamEventTimeApp {
  def main(args: Array[String]): Unit = {
    //環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //聲明使用eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val dstream: DataStream[String] = env.socketTextStream("hadoop101", 7777)

    val textWithTsDStream: DataStream[(String, Long, Int)] = dstream.map { text =>
      val arr: Array[String] = text.split(" ")
      (arr(0), arr(1).toLong, 1)
    }
    // 1 告知 flink如何獲取數據中的event時間戳  2 告知延遲的watermark
    val textWithEventTimeDStream: DataStream[(String, Long, Int)] = textWithTsDStream.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(3000)) { //time別導錯包了
        override def extractTimestamp(element: (String, Long, Int)): Long = {
          return element._2
        }
      }).setParallelism(1)
    //每5秒開一個窗口 統計key的個數  5秒是一個數據的時間戳為准
    val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDStream.keyBy(0)
    textKeyStream.print("textKey: ")
    //滾動窗口
    //val windowDStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.milliseconds(5000)))
    //滑動窗口
    //val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(SlidingEventTimeWindows.of(Time.milliseconds(5000L), Time.milliseconds(1000L)))

   //會話窗口
    val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(EventTimeSessionWindows.withGap(Time.milliseconds(5000L)))
    windowStream.sum(2).print("windows: ").setParallelism(1)
    env.execute()
  }
}

只能兩次時間的間隔是否滿足條件
在觸發水位5s的基礎上再加延遲3s,
[kris@hadoop101 gmall]$ nc -lk 7777
abc 1000
abc 7000
abc 10000
=======>>>
textKey: :8> (abc,1000,1)
textKey: :8> (abc,7000,1)
textKey: :8> (abc,10000,1) //在上一個基礎上+延遲時間3s才會開車
windows: > (abc,1000,1)


[kris@hadoop101 gmall]$ nc -lk 7777
aaa 1000
aaa 2000
aaa 7001
aaa 9000
aaa 10000
=====>>
textKey: :5> (aaa,1000,1)
textKey: :5> (aaa,2000,1)
textKey: :5> (aaa,7001,1) //兩個時間點之間相差達到鴻溝5s了,在這個基礎之上再加3s才能開車;
textKey: :5> (aaa,9000,1)
textKey: :5> (aaa,10000,1)
windows: > (aaa,1000,2)

 

CountWindow

CountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果

注意:CountWindow的window_size指的是相同Key的元素的個數,不是輸入的所有元素的總數

  滾動窗口

默認的CountWindow是一個滾動窗口,只需要指定窗口大小即可,當元素數量達到窗口大小時,就會觸發窗口的執行。

   滑動窗口

滑動窗口和滾動窗口的函數名是完全一致的,只是在傳參數時需要傳入兩個參數,一個是window_size,一個是sliding_size。

下面代碼中的sliding_size設置為了2,也就是說,每收到兩個相同key的數據就計算一次,每一次計算的window范圍是5個元素。

WindowAPI

 

 Windowall是所有數據都在一個分區上;keyBy之后是分到各個分區再window去處理

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM