Flink模擬項目: 計算最熱門Top N商品


  為了統計每個窗口下最熱門的商品,我們需要再次按窗口進行分組,這里根據ItemViewCount中的windowEnd進行keyBy()操作。然后使用ProcessFunction實現一個自定義的TopN函數TopNHotItems來計算點擊量排名前3名的商品,並將排名結果格式化成字符串,便於后續輸出。

 .keyBy("windowEnd")
    .process(new TopNHotItems(3));  // 求點擊量前3名的商品

  ProcessFunction是Flink提供的一個low-level API,用於實現更高級的功能。它主要提供了定時器timer的功能(支持EventTime或ProcessingTime)。本案例中我們將利用timer來判斷何時收齊了某個window下所有商品的點擊量數據。由於Watermark的進度是全局的,在processElement方法中,每當收到一條數據ItemViewCount,我們就注冊一個windowEnd+1的定時器(Flink框架會自動忽略同一時間的重復注冊)。windowEnd+1的定時器被觸發時,意味着收到了windowEnd+1的Watermark,即收齊了該windowEnd下的所有商品窗口統計值。我們在onTimer()中處理將收集的所有商品及點擊量進行排序,選出TopN,並將排名信息格式化成字符串后進行輸出。

這里我們還使用了ListState<ItemViewCount>來存儲收到的每條ItemViewCount消息,保證在發生故障時,狀態數據的不丟失和一致性。ListState是Flink提供的類似Java List接口的State API,它集成了框架的checkpoint機制,自動做到了exactly-once的語義保證。

// 求某個窗口中前 N 名的熱門點擊商品,key 為窗口時間戳,輸出為 TopN 的結果字符串
  class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
    private var itemState : ListState[ItemViewCount] = _

    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      // 命名狀態變量的名字和狀態變量的類型
      val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state", classOf[ItemViewCount])
      // 定義狀態變量
      itemState = getRuntimeContext.getListState(itemsStateDesc)
    }

    override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
      // 每條數據都保存到狀態中
      itemState.add(input)
      // 注冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬於windowEnd窗口的所有商品數據
      // 也就是當程序看到windowend + 1的水位線watermark時,觸發onTimer回調函數
      context.timerService.registerEventTimeTimer(input.windowEnd + 1)
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
      // 獲取收到的所有商品點擊量
      val allItems: ListBuffer[ItemViewCount] = ListBuffer()
      import scala.collection.JavaConversions._
      for (item <- itemState.get) {
        allItems += item
      }
      // 提前清除狀態中的數據,釋放空間
      itemState.clear()
      // 按照點擊量從大到小排序
      val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
      // 將排名信息格式化成 String, 便於打印
      val result: StringBuilder = new StringBuilder
      result.append("====================================\n")
      result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n")

      for(i <- sortedItems.indices){
        val currentItem: ItemViewCount = sortedItems(i)
        // e.g.  No1:  商品ID=12224  瀏覽量=2413
        result.append("No").append(i+1).append(":")
.append("  商品ID=").append(currentItem.itemId)
.append("  瀏覽量=").append(currentItem.count).append("\n")
      }
      result.append("====================================\n\n")
      // 控制輸出頻率,模擬實時滾動結果
      Thread.sleep(1000)
      out.collect(result.toString)
    }
  }

最后我們可以在main函數中將結果打印輸出到控制台,方便實時觀測:

.print();

  至此整個程序代碼全部完成,我們直接運行main函數,就可以在控制台看到不斷輸出的各個時間點統計出的熱門商品。

2.2.5 完整代碼

最終完整代碼如下:

case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)

case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

object HotItems {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val stream = env
      .readTextFile("YOUR_PATH\\resources\\UserBehavior.csv")
      .map(line => {
        val linearray = line.split(",")
        UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000)
      .filter(_.behavior=="pv")
      .keyBy("itemId")
      .timeWindow(Time.minutes(60), Time.minutes(5))
      .aggregate(new CountAgg(), new WindowResultFunction())    
      .keyBy(1)
      .process(new TopNHotItems(3))
      .print()

    env.execute("Hot Items Job")
  }

  // COUNT 統計的聚合函數實現,每出現一條記錄加一
  class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
    override def createAccumulator(): Long = 0L
    override def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1
    override def getResult(acc: Long): Long = acc
    override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
  }
  // 用於輸出窗口的結果
  class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
    override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long],
                       collector: Collector[ItemViewCount]) : Unit = {
      val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0
      val count = aggregateResult.iterator.next
      collector.collect(ItemViewCount(itemId, window.getEnd, count))
    }
  }

// 求某個窗口中前 N 名的熱門點擊商品,key 為窗口時間戳,輸出為 TopN 的結果字符串
  class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
    private var itemState : ListState[ItemViewCount] = _

    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      // 命名狀態變量的名字和狀態變量的類型
      val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state", classOf[ItemViewCount])
      // 從運行時上下文中獲取狀態並賦值
      itemState = getRuntimeContext.getListState(itemsStateDesc)
    }

    override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
      // 每條數據都保存到狀態中
      itemState.add(input)
      // 注冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬於windowEnd窗口的所有商品數據
      // 也就是當程序看到windowend + 1的水位線watermark時,觸發onTimer回調函數
      context.timerService.registerEventTimeTimer(input.windowEnd + 1)
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
      // 獲取收到的所有商品點擊量
      val allItems: ListBuffer[ItemViewCount] = ListBuffer()
      import scala.collection.JavaConversions._
      for (item <- itemState.get) {
        allItems += item
      }
      // 提前清除狀態中的數據,釋放空間
      itemState.clear()
      // 按照點擊量從大到小排序
      val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
      // 將排名信息格式化成 String, 便於打印
      val result: StringBuilder = new StringBuilder
      result.append("====================================\n")
      result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n")

      for(i <- sortedItems.indices){
        val currentItem: ItemViewCount = sortedItems(i)
        // e.g.  No1:  商品ID=12224  瀏覽量=2413
        result.append("No").append(i+1).append(":")
.append("  商品ID=").append(currentItem.itemId)
.append("  瀏覽量=").append(currentItem.count).append("\n")
      }
      result.append("====================================\n\n")
      // 控制輸出頻率,模擬實時滾動結果
      Thread.sleep(1000)
      out.collect(result.toString)
    }
  }
}

2.2.6 更換Kafka 作為數據源

實際生產環境中,我們的數據流往往是從Kafka獲取到的。如果要讓代碼更貼近生產實際,我們只需將source更換為Kafka即可:

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val stream = env
  .addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))

 

當然,根據實際的需要,我們還可以將Sink指定為Kafka、ES、Redis或其它存儲,這里就不一一展開實現了。

 

 

  

 

  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM