一、ProcessWindowFunction使用場景
前面提到的 ReduceFunction 和 AggregateFunction 都是基於中間狀態實現增量計算的窗口函數,雖然已經滿足絕大多數場景,但在某些情況下,統計更復雜的指標可能需要依賴於窗口中所有的數據元素,或需要操作窗口中的狀態數據和窗口元數據,這時就需要使用到 ProcessWindowsFunction,ProcessWindowsFunction 能夠更加靈活地支持基於窗口全部數 據元素的結果計算,例如對整個窗口數據排序取 TopN,這樣的需要就必須使用 ProcessWindowFunction。
二、ProcessWindowFunction業務實踐:每隔5秒統計每個基站的日志數量
1.創建日志數據對象
case class Log(sid:String,var callOut:String, var callIn:String, callType:String, callTime:Long, duration:Long)
2.業務實現
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector /** * 全量聚合函數 */ object TestProcessFunctionByWindow { // 每隔5秒統計每個基站的日志數量 def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // source var stream = env.socketTextStream("flink101", 8888) .map(line => { var arr = line.split(",") Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong) }) // 設置並行度 stream.setParallelism(1) stream.map(log=> (log.sid, 1)) .keyBy(_._1) // .timeWindow(Time.seconds(5)) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new MyProcessWindowFunction) // 一個窗口結束的時候調用一次(在一個並行度中) .print() env.execute("TestReduceFunctionByWindow") } } class MyProcessWindowFunction extends ProcessWindowFunction[(String, Int), (String, Long), String, TimeWindow] { // 一個窗口結束的時候調用一次(一個分組執行一次),不適合大量數據,全量數據保存在內存中,會造成內存溢出 override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Long)]): Unit = { // 聚合,注意:整個窗口的數據保存到Iterable,里面有很多行數據, Iterable的size就是日志的總行數 out.collect(key, elements.size) } }
三、總結
ProcessWindowFunction獲得一個包含窗口所有元素的可迭代器,以及一個具有時間和狀態信息訪問權的上下文對象,這使得它比其他窗口函數提供更大的靈活性。這是以性能和資源消耗為代價的,因為元素不能增量地聚合,而是需要在內部緩沖,直到認為窗口可以處理為止,使用該函數需要注意數據量,數據量太大,全量數據保存在內存中,會造成內存溢出。