一、AggregatFunction概念
Flink 的AggregateFunction是一個基於中間計算結果狀態進行增量計算的函數,AggregateFunction接口相對ReduceFunction更加靈活,實現復雜度也相對較高,輸入數據類型和輸出數據類型可以不一致,通常和WindowFunction一起結合使用。
二、案例實踐:每隔3秒計算最近5秒內,每個基站的日志數量
1.創建日志數據對象
case class Log(sid:String,var callOut:String, var callIn:String, callType:String, callTime:Long, duration:Long)
2.業務實現
import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector /** * 增量聚合函數 */ object TestAggregatFunctionByWindow { // 每隔3秒計算最近5秒內,每個基站的日志數量 def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // source var stream = env.socketTextStream("flink101", 8888) .map(line => { var arr = line.split(",") Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong) }) stream.map(log=> (log.sid, 1)) .keyBy(_._1) .window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(3))) .aggregate(new MyAggregateFunction, new MyWindowFunction) env.execute("TestAggregatFunctionByWindow") } } // 優點是簡單實現聚合,缺點是不能輸出key // add方法,是來一條數據執行一次 // getResult 窗口結束的時候執行一次 class MyAggregateFunction extends AggregateFunction[(String, Int), Long, Long]{ override def createAccumulator(): Long = 0 // 初始化累加器 override def add(in: (String, Int), acc: Long): Long = acc + in._2 // 定義數據的添加邏輯 override def getResult(acc: Long): Long = acc // 定義計算結果的邏輯 override def merge(acc: Long, acc1: Long): Long = acc + acc1 // 合並分區數據 } // 為了輸出key,輸入數據來自於AggregateFunction,在窗口結束的時候先執行AggregateFunction對象的getResult方法,然后再執行apply方法 /** WindowFunction * * @param IN The type of the input value. * * @param OUT The type of the output value. * * @param KEY The type of the key. */ class MyWindowFunction extends WindowFunction[Long, (String, Long), String, TimeWindow] { override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[(String, Long)]): Unit = { out.collect((key, input.iterator.next())) // next得到第一個值,迭代器中只有一個值 } }
三、總結
AggregateFunction 接口中定義了三個 需要復寫的方法,其中 add()定義數據的添加邏輯,getResult 定義了根據 accumulator 計 算結果的邏輯,merge 方法定義合並 accumulator 的邏輯。