Flink 實現 實時TOPN 需求


需求

  求每個小時內用戶點擊量的TOP3,每五分鍾更新一次

bean:

 

利用底層API實現

import java.sql.Timestamp

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object TopHotItems {
  case class UserBehavior(userId: Long,//用戶id
                          itemId: Long,//商品id
                          categoryId: Long,//城市id
                          behavior: String,//行為,pv為瀏覽
                          timestamp: Long)

  case class ItemViewCount(itemId: Long,
                           windowStart: Long,//window開始時間
                           windowEnd: Long,//window結束時間
                           count: Long)//瀏覽量


  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream: DataStream[String] = env
      .readTextFile("filePath")
      //封裝對象,並過濾出pv類型數據
      .map(line => {
        val arr = line.split(",")
        UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toLong, arr(3), arr(4).toLong * 1000L)
      })
      .filter(_.behavior.equals("pv"))
      .assignAscendingTimestamps(_.timestamp) // 分配升序時間戳 DataStream
      .keyBy(_.itemId) // 使用商品ID分流
      .timeWindow(size = Time.hours(1), slide = Time.minutes(5)) // 按需求開窗
      .aggregate(preAggregator = new CountAgg, windowFunction = new WindowResult) // 增量聚合和全窗口聚合結合,最大化優化內存的使用
      .keyBy(_.windowEnd) // 按窗口結束時間分流,這樣就能保證每條流中的數據都是同一個窗口的數據
      .process(keyedProcessFunction = new TopN(3)) // DataStream

    stream.print()
    env.execute()
  }

  class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
    override def createAccumulator(): Long = 0L

    override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1

    override def getResult(accumulator: Long): Long = accumulator

    override def merge(a: Long, b: Long): Long = a + b
  }

  class WindowResult extends ProcessWindowFunction[Long, ItemViewCount, Long, TimeWindow] {
    override def process(key: Long, context: Context, elements: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
      //封裝對象並附加窗口結束時間信息
      out.collect(ItemViewCount(key, context.window.getStart, context.window.getEnd, elements.head))
    }
  }

  class TopN(val topSize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {

    // 只針對當前key可見的
    lazy val listState: ListState[ItemViewCount] = getRuntimeContext.getListState(
      new ListStateDescriptor[ItemViewCount]("list-state", Types.of[ItemViewCount])
    )

    override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = {
      listState.add(value)
      // 不會重復注冊
      ctx.timerService().registerEventTimeTimer(value.windowEnd + 100)
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
     //如果擔心內存溢出可以考慮TreeSet,但是一般不會有那么大的數據量
      val allItems: ListBuffer[ItemViewCount] = ListBuffer()
      import scala.collection.JavaConversions._
      // 將列表狀態中的數據轉移到內存
      for (item <- listState.get) {
        allItems += item
      }
      // 清空狀態
      listState.clear()

      // 使用瀏覽量降序排列
      val sortedItems: ListBuffer[ItemViewCount] = allItems.sortBy(-_.count).take(topSize)

      val result = new StringBuffer()
      result.append("----------------------\n")
        .append(s"time: ${timestamp -100} \n")

      for ( i <-  0 until allItems.size){
        result.append(s" NO.${i+1}  商品id    ${allItems(i).itemId} 點擊量    ${allItems(i).count}  \n")
      }
      result.append("----------------------\n")
      Thread.sleep(5000)
      out.collect(result.toString)
    }
  }
}

利用Flink SQL實現

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Tumble}
import org.apache.flink.types.Row

// 使用sql實現實時top n需求
object HotItemsSQL {

  case class UserBehavior(userId: Long,
                          itemId: Long,
                          categoryId: Long,
                          behavior: String,
                          timestamp: Long)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // 新建表環境
    val settings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    val tableEnv = StreamTableEnvironment.create(env, settings)

    val stream = env
      .readTextFile("/Users/yuanzuo/Desktop/flink-tutorial/FlinkSZ1128/src/main/resources/UserBehavior.csv")
      .map(line => {
        val arr = line.split(",")
        UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toLong, arr(3), arr(4).toLong * 1000L)
      })
      .filter(_.behavior.equals("pv"))
      .assignAscendingTimestamps(_.timestamp) // 分配升序時間戳 DataStream

    // 創建臨時表
    tableEnv.createTemporaryView("t", stream, 'itemId, 'timestamp.rowtime as 'ts)

    // top n只有blink planner支持
    // 最內部的子查詢實現了:stream.keyBy(_.itemId).timeWindow(Time.hours(1), Time.minutes(5)).aggregate(new CountAgg, new WindowResult)
    // 倒數第二層子查詢:.keyBy(_.windowEnd).process(Sort)
    // 最外層:取出前三名
    val result = tableEnv
      .sqlQuery(
        """
          |SELECT *
          |FROM (
          |    SELECT *,
          |           ROW_NUMBER() OVER (PARTITION BY windowEnd ORDER BY icount DESC) as row_num
          |    FROM (
          |          SELECT itemId, count(itemId) as icount,
          |                 HOP_END(ts, INTERVAL '5' MINUTE, INTERVAL '1' HOUR) as windowEnd
          |                 FROM t GROUP BY itemId, HOP(ts, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)
          |    )
          |)
          |WHERE row_num <= 3
          |""".stripMargin)

    result
        .toRetractStream[Row]
        .filter(_._1 == true)
        .print()

    env.execute()
  }
}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM