一、ReduceFunction的概念
Flink使用ReduceFunction來對窗口中的元素進行增量聚合。要求輸入和輸出的數據類型一致,定義了如何把兩個輸入的元素進行合並來生成相同類型的輸出元素的過程。
二、案例實踐:每隔5秒統計通話日志的數量
1.日志數據對象
case class Log(sid:String,var callOut:String, var callIn:String, callType:String, callTime:Long, duration:Long)
2.統計
1 object TestReduceFunctionByWindow { 2 def main(args: Array[String]): Unit = { 3 4 val env = StreamExecutionEnvironment.getExecutionEnvironment 5 // 數據源 6 var stream = env.socketTextStream("flink101", 8888) 7 .map(line => { 8 var arr = line.split(",") 9 // 轉成日志對象 10 Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong) 11 }) 12 13 val result = stream.map(log=> (log.sid, 1)) 14 .keyBy(_._1) 15 .timeWindow(Time.seconds(5)) 16 .reduce((t1,t2) => (t1._1, t1._2 + t2._2)) // 輸入和輸出數據類型一致 17 18 // 打印測試 19 result.print() 20 21 env.execute("TestReduceFunctionByWindow") 22 } 23 }
三、總結
使用ReduceFunction能夠快速對兩個相同類型的數據元素按照指定的方法進行聚合邏輯,實現sum功能。