Flink 窗口聚合函數之ProcessWindowFunction實踐


一、ProcessWindowFunction使用場景

前面提到的 ReduceFunction 和 AggregateFunction 都是基於中間狀態實現增量計算的窗口函數,雖然已經滿足絕大多數場景,但在某些情況下,統計更復雜的指標可能需要依賴於窗口中所有的數據元素,或需要操作窗口中的狀態數據和窗口元數據,這時就需要使用到 ProcessWindowsFunction,ProcessWindowsFunction 能夠更加靈活地支持基於窗口全部數 據元素的結果計算,例如對整個窗口數據排序取 TopN,這樣的需要就必須使用 ProcessWindowFunction。

 

二、ProcessWindowFunction業務實踐:每隔5秒統計每個基站的日志數量

1.創建日志數據對象

case class Log(sid:String,var callOut:String, var callIn:String, callType:String, callTime:Long, duration:Long)

 

2.業務實現

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**
  * 全量聚合函數
  */
object TestProcessFunctionByWindow {
  
  // 每隔5秒統計每個基站的日志數量
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // source
    var stream = env.socketTextStream("flink101", 8888)
      .map(line => {
        var arr = line.split(",")
        Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
      })

    // 設置並行度
    stream.setParallelism(1)
    stream.map(log=> (log.sid, 1))
      .keyBy(_._1)
//        .timeWindow(Time.seconds(5))
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .process(new MyProcessWindowFunction)  // 一個窗口結束的時候調用一次(在一個並行度中)
        .print()


    env.execute("TestReduceFunctionByWindow")
  }
}

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Int), (String, Long), String, TimeWindow] {

  // 一個窗口結束的時候調用一次(一個分組執行一次),不適合大量數據,全量數據保存在內存中,會造成內存溢出
  override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Long)]): Unit = {
    // 聚合,注意:整個窗口的數據保存到Iterable,里面有很多行數據, Iterable的size就是日志的總行數
    out.collect(key, elements.size)
  }
}

  

三、總結

ProcessWindowFunction獲得一個包含窗口所有元素的可迭代器,以及一個具有時間和狀態信息訪問權的上下文對象,這使得它比其他窗口函數提供更大的靈活性。這是以性能和資源消耗為代價的,因為元素不能增量地聚合,而是需要在內部緩沖,直到認為窗口可以處理為止,使用該函數需要注意數據量,數據量太大,全量數據保存在內存中,會造成內存溢出。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM