Flink 窗口聚合函數之AggregatFunction實踐


一、AggregatFunction概念

 

Flink 的AggregateFunction是一個基於中間計算結果狀態進行增量計算的函數,AggregateFunction接口相對ReduceFunction更加靈活,實現復雜度也相對較高,輸入數據類型和輸出數據類型可以不一致,通常和WindowFunction一起結合使用。

二、案例實踐:每隔3秒計算最近5秒內,每個基站的日志數量

1.創建日志數據對象

case class Log(sid:String,var callOut:String, var callIn:String, callType:String, callTime:Long, duration:Long)

2.業務實現

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**
  * 增量聚合函數
  */
object TestAggregatFunctionByWindow {

  // 每隔3秒計算最近5秒內,每個基站的日志數量
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // source
    var stream = env.socketTextStream("flink101", 8888)
      .map(line => {
        var arr = line.split(",")
        Log(arr(0).trim,arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
      })

    stream.map(log=> (log.sid, 1))
      .keyBy(_._1)
        .window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(3)))
        .aggregate(new MyAggregateFunction, new MyWindowFunction)

    env.execute("TestAggregatFunctionByWindow")
  }
}


// 優點是簡單實現聚合,缺點是不能輸出key
// add方法,是來一條數據執行一次
// getResult 窗口結束的時候執行一次
class MyAggregateFunction extends AggregateFunction[(String, Int), Long, Long]{
  override def createAccumulator(): Long = 0  // 初始化累加器

  override def add(in: (String, Int), acc: Long): Long = acc + in._2  // 定義數據的添加邏輯

  override def getResult(acc: Long): Long = acc  // 定義計算結果的邏輯

  override def merge(acc: Long, acc1: Long): Long = acc + acc1  // 合並分區數據
}

// 為了輸出key,輸入數據來自於AggregateFunction,在窗口結束的時候先執行AggregateFunction對象的getResult方法,然后再執行apply方法
/** WindowFunction
  * * @param IN The type of the input value.
  * * @param OUT The type of the output value.
  * * @param KEY The type of the key.
  */
class MyWindowFunction extends WindowFunction[Long, (String, Long), String, TimeWindow] {
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[(String, Long)]): Unit = {
    out.collect((key, input.iterator.next()))  // next得到第一個值,迭代器中只有一個值
  }
}

三、總結

AggregateFunction 接口中定義了三個 需要復寫的方法,其中 add()定義數據的添加邏輯,getResult 定義了根據 accumulator 計 算結果的邏輯,merge 方法定義合並 accumulator 的邏輯。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM