Flink--time-window 的高級用法


 

 

1.現實世界中的時間是不一致的,在 flink 中被划分為事件時間,提取時間,處理時間三種。
2.如果以 EventTime 為基准來定義時間窗口那將形成 EventTimeWindow,要求消息本身就應該攜帶 EventTime 
3.如果以 IngesingtTime 為基准來定義時間窗口那將形成 IngestingTimeWindow,以 source 的 systemTime 為准。 
4.如果以 ProcessingTime 基准來定義時間窗口那將形成 ProcessingTimeWindow,以 operator 的 systemTime 為准。

EventTime 

1.要求消息本身就應該攜帶 EventTime
2.時間對應關系如下 

需求:

EventTime 3 數據: 

1527911155000,boos1,pc1,100.0 1527911156000,boos2,pc1,200.0 1527911157000,boos1,pc1,300.0 1527911158000,boos2,pc1,500.0 1527911159000,boos1,pc1,600.0 1527911160000,boos1,pc1,700.0 1527911161000,boos2,pc2,700.0 1527911162000,boos2,pc2,900.0 1527911163000,boos2,pc2,1000.0 1527911164000,boos2,pc2,1100.0 1527911165000,boos1,pc2,1100.0 1527911166000,boos2,pc2,1300.0 1527911167000,boos2,pc2,1400.0 1527911168000,boos2,pc2,1600.0
1527911169000,boos1,pc2,1300.0
View Code

代碼實現: 

object EventTimeExample {
def main(args: Array[String]) {
//1.創建執行環境,並設置為使用 EventTime
val env = StreamExecutionEnvironment.getExecutionEnvironment
//置為使用 EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//2.創建數據流,並進行數據轉化
val source = env.socketTextStream("localhost", 9999)
case class SalePrice(time: Long, boosName: String, productName: String, price: Double)
val dst1: DataStream[SalePrice] = source.map(value => {
val columns = value.split(",")
SalePrice(columns(0).toLong, columns(1), columns(2), columns(3).toDouble)
 })
//3.使用 EventTime 進行求最值操作
val dst2: DataStream[SalePrice] = dst1
//提取消息中的時間戳屬性
.assignAscendingTimestamps(_.time)
.keyBy(_.productName)
.timeWindow(Time.seconds(3))//設置 window 方法一
.max("price")
//4.顯示結果
dst2.print()
//5.觸發流計算
 
 env.execute()
}
}
View Code

當前代碼理論上看沒有任何問題,在實際使用的時候就會出現很多問題,甚至接 收不到數據或者接收到的數據是不准確的;這是因為對於 flink 最初設計的時 候,就考慮到了網絡延遲,網絡亂序等問題,所以提出了一個抽象概念基座水印 

(WaterMark); 

水印分成兩種形式:
第一種:

第二種: 

所以,我們需要考慮到網絡延遲的狀況,那么代碼中就需要添加水印操作:
object EventTimeOperator {
  def main(args: Array[String]): Unit = {
    //創建執行環境,並設置為使用EventTime
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)//注意控制並發數
    //置為使用EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val source = env.socketTextStream("localhost", 9999)
    val dst1: DataStream[SalePrice] = source.map(value => {
      val columns = value.split(",")
      SalePrice(columns(0).toLong, columns(1), columns(2), columns(3).toDouble)
    })
    //todo 水印時間  assignTimestampsAndWatermarks
    val timestamps_data = dst1.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SalePrice]{

      var currentMaxTimestamp:Long = 0
      val maxOutOfOrderness = 2000L //最大允許的亂序時間是2s
      var wm : Watermark = null
      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
      override def getCurrentWatermark: Watermark = {
        wm = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
        wm
      }

      override def extractTimestamp(element: SalePrice, previousElementTimestamp: Long): Long = {
        val timestamp = element.time
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
       
      }
    })
    val data: KeyedStream[SalePrice, String] = timestamps_data.keyBy(line => line.productName)
    val window_data: WindowedStream[SalePrice, String, TimeWindow] = data.timeWindow(Time.seconds(3))
    val apply: DataStream[SalePrice] = window_data.apply(new MyWindowFunc)
    apply.print()
    env.execute()

  }
}
case class SalePrice(time: Long, boosName: String, productName: String, price: Double)
class MyWindowFunc extends WindowFunction[SalePrice , SalePrice , String, TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[SalePrice], out: Collector[SalePrice]): Unit = {
    val seq = input.toArray
    val take: Array[SalePrice] = seq.sortBy(line => line.price).reverse.take(1)
    for(info <- take){
      out.collect(info)
    }
  }
}

 

ProcessingTime 

對於 processTime 而言,是 flink 處理數據的時間,所以就不關心發過來的數據 是不是有延遲操作,只關心數據具體的處理時間,所以不需要水印處理,操作相 對來說簡單了很多 

object ProcessingTimeExample {
  def main(args: Array[String]) {
    //創建執行環境,並設置為使用EventTime
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)//注意控制並發數
    //置為使用ProcessingTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    val source = env.socketTextStream("localhost", 9999)
    case class SalePrice(time: Long, boosName: String, productName: String, price: Double)

    val dst1: DataStream[SalePrice] = source.map(value => {
      val columns = value.split(",")
      SalePrice(columns(0).toLong, columns(1), columns(2), columns(3).toDouble)
    })
    //processTime不需要提取消息中的時間
//    val timestamps_data: DataStream[SalePrice] = dst1.assignAscendingTimestamps(line => line.time)
    val keyby_data: KeyedStream[SalePrice, String] = dst1.keyBy(line => line.productName)
    //TODO 窗口事件是:TumblingProcessingTimeWindows
    val window_data: WindowedStream[SalePrice, String, TimeWindow] = keyby_data.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    val max_price: DataStream[SalePrice] = window_data.max("price")
    max_price.print()
    env.execute()
  }
}
View Code

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM