Flink學習(十五) 滑動事件時間窗口加上水位線開始窗口時間如何確定?(底層源碼)


先看上一節的代碼程序

package com.wyh.windowsApi

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


object WindowTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //周期性生成watermark 默認是200毫秒
    env.getConfig.setAutoWatermarkInterval(100L)

    /**
      * 從文件中讀取數據
      *
      *
      */
    //val stream = env.readTextFile("F:\\flink-study\\wyhFlinkSD\\data\\sensor.txt")

    val stream = env.socketTextStream("localhost", 7777)

    //Transform操作
    val dataStream: DataStream[SensorReading] = stream.map(data => {
      val dataArray = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })
      //===到來的數據是升序的,准時發車,用assignAscendingTimestamps
      //指定哪個字段是時間戳 需要的是毫秒 * 1000
      //      .assignAscendingTimestamps(_.timestamp * 1000)
      //===處理亂序數據
      //      .assignTimestampsAndWatermarks(new MyAssignerPeriodic())
      //==底層也是周期性生成的一個方法 處理亂序數據 延遲1秒種生成水位 同時分配水位和時間戳 括號里傳的是等待延遲的時間
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
      override def extractTimestamp(t: SensorReading): Long = {
        t.timestamp * 1000
      }
    })

    //統計10秒內的最小溫度
    val minTemPerWindowStream = dataStream
      .map(data => (data.id, data.temperature))
      .keyBy(0)
      //      .timeWindow(Time.seconds(10)) //開時間窗口  滾動窗口 沒有數據的窗口不會觸發
      //左閉右開 包含開始 不包含結束 延遲1秒觸發的那個時間的數據不包含
      //可以直接調用底層方法,第三個參數傳offset代表時區
      //.window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5),Time.hours(-8)))
      .timeWindow(Time.seconds(15), Time.seconds(5)) //滑動窗口,每隔5秒輸出一次
      .reduce((data1, data2) => (data1._1, data1._2.min(data2._2))) //用reduce做增量聚合


    minTemPerWindowStream.print("min temp")

    dataStream.print("input data")

    env.execute("window Test")

  }

}


//設置水位線(水印) 這里有兩種方式實現
//一種是周期性生成 一種是以數據的某種特性進行生成水位線(水印)
/**
  * 周期性生成watermark 默認200毫秒
  */
class MyAssignerPeriodic() extends AssignerWithPeriodicWatermarks[SensorReading] {
  val bound: Long = 60 * 1000
  var maxTs: Long = Long.MaxValue

  override def getCurrentWatermark: Watermark = {
    //定義一個規則進行生成
    new Watermark(maxTs - bound)
  }

  //用什么抽取這個時間戳
  override def extractTimestamp(t: SensorReading, l: Long): Long = {
    //保存當前最大的時間戳
    maxTs = maxTs.max(t.timestamp)
    t.timestamp * 1000
  }
}


/**
  * 亂序生成watermark
  * 每來一條數據就生成一個watermark
  */
class MyAssignerPunctuated() extends AssignerWithPunctuatedWatermarks[SensorReading] {
  override def checkAndGetNextWatermark(t: SensorReading, l: Long): Watermark = {
    new Watermark(l)
  }

  override def extractTimestamp(t: SensorReading, l: Long): Long = {
    t.timestamp * 1000
  }
}

開始點源碼 Ctrl + 鼠標左鍵

 

 點進去發現是KededStream里面的其中一個方法,繼續點

 

 我們發現實際上是封裝了一層java代碼,代碼中TimeWindow本身就是一個簡寫,這里發現底層還是.window() 方法 傳入窗口類型參數

 

 我們發現,如果窗口的時間是處理時間就調用滑動處理時間窗口,我們在代碼中設置了事件時間,

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

所以這里是滑動事件處理時間窗口。

繼續點

 

 點進去我們就看到實現的方法了

 

 先來看最開始的時間是如何生成的,繼續點

 

 我們就看到這樣的一個計算公式

 

 來解釋一下,我們可以看到這樣一個參數,offset,它如果我們沒有設置就默認為0。它本身是用來指定時間的時區的。注意:這里有個參數其實叫windowSize 其實傳進來的是一個滑動步長!!!但是不影響結果

如何在代碼中添加這個offset呢:.window() 方法中傳入

SlidingEventTimeWindows.of()  第三個參數就是offset
.window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5),Time.hours(-8)))

在這里,就計算出第一個窗口開始時間。

繼續看調用的

我們可以看到,一個for循環追加了好多窗口window.

判斷如果開始時間大於時間戳減去窗口的大小,那么就把當前這個窗口加上一個創建口大小,然后再減去一個滑動步長,再判斷是否大於時間戳減去窗口的大小,以此類推,知道小於,就結束創建,就可以得出最早創建的窗口。

 

 

如果是滾動窗口,傳進來的就是最早結束的時間,直接加上窗口大小

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM