Flink 窗口聚合函數之ReduceFunction實踐


一、ReduceFunction的概念

Flink使用ReduceFunction來對窗口中的元素進行增量聚合。要求輸入和輸出的數據類型一致,定義了如何把兩個輸入的元素進行合並來生成相同類型的輸出元素的過程。

二、案例實踐:每隔5秒統計通話日志的數量

1.日志數據對象

case class Log(sid:String,var callOut:String, var callIn:String, callType:String, callTime:Long, duration:Long)

 

2.統計

 1 object TestReduceFunctionByWindow {
 2   def main(args: Array[String]): Unit = {
 3 
 4     val env = StreamExecutionEnvironment.getExecutionEnvironment
 5     // 數據源
 6     var stream = env.socketTextStream("flink101", 8888)
 7       .map(line => {
 8         var arr = line.split(",")
 9         // 轉成日志對象
10         Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
11       })
12 
13     val result = stream.map(log=> (log.sid, 1))
14       .keyBy(_._1)
15         .timeWindow(Time.seconds(5))
16         .reduce((t1,t2) => (t1._1, t1._2 + t2._2))  // 輸入和輸出數據類型一致
17 
18     // 打印測試
19     result.print()
20 
21     env.execute("TestReduceFunctionByWindow")
22   }
23 }

三、總結

使用ReduceFunction能夠快速對兩個相同類型的數據元素按照指定的方法進行聚合邏輯,實現sum功能。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM