flink(二) 電商用戶行為分析(二)實時熱門商品統計(計算最熱門 Top N 商品)


1.簡介

  首先要實現的是實時熱門商品統計,我們將會基於 UserBehavior 數據集來進行分析。
  項目主體用 Scala 編寫,采用 IDEA 作為開發環境進行項目編寫,采用 maven作為項目構建和管理工具。首先我們需要搭建項目框架。

2 創建 Maven 項目

2.1 項目框架搭建
  打開 IDEA,創建一個 maven 項目,命名為 UserBehaviorAnalysis。由於包含了多個模塊,我們可以以 UserBehaviorAnalysis 作為父項目,並在其下建一個名為
HotItemsAnalysis 的子項目,用於實時統計熱門 top N 商品。
  在 UserBehaviorAnalysis 下 新 建 一 個 maven module 作 為 子 項 目 , 命 名 為HotItemsAnalysis。
  父項目只是為了規范化項目結構,方便依賴管理,本身是不需要代碼實現的,所以 UserBehaviorAnalysis 下的 src 文件夾可以刪掉。
2.2 聲明項目中工具的版本信息
  我們整個項目需要的工具的不同版本可能會對程序運行造成影響,所以應該在最外層的 UserBehaviorAnalysis 中聲明所有子模塊共用的版本信息。
  在 pom.xml 中加入以下配置:
       UserBehaviorAnalysis\pom.xml
    <properties>
        <flink.version>1.10.0</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <kafka.version>2.2.0</kafka.version>
    </properties>
2.3 添加項目依賴
對 於 整 個 項 目 而 言 , 所 有 模 塊 都 會 用 到 flink 相 關 的 組 件 , 所 以 我們在UserBehaviorAnalysis 中引入公有依賴:
UserBehaviorAnalysis/pom.xml
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.binary.version}</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
同樣,對於 maven 項目的構建,可以引入公有的插件:
    <build>
    <plugins>
    <!-- 該插件用於將 Scala 代碼編譯成 class 文件 -->
    <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.4.6</version>
        <executions>
            <execution>
                <!-- 聲明綁定到 maven 的 compile 階段 -->
                <goals>
                    <goal>testCompile</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
    <configuration>
        <descriptorRefs>
            <descriptorRef>
                jar-with-dependencies
            </descriptorRef>
        </descriptorRefs>
    </configuration>
        <executions>
            <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                    <goal>single</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
    </plugins>
    </build>
在 HotItemsAnalysis 子模塊中,我們並沒有引入更多的依賴,所以不需要改動pom 文件。
2.4 數據准備
  在 src/main/目錄下,可以看到已有的默認源文件目錄是 java,我們可以將其改名為 scala。將數據文件 UserBehavior.csv 復制到資源文件目錄 src/main/resources 下,
我們將從這里讀取數據。
至此,我們的准備工作都已完成,接下來可以寫代碼了。

3  模塊代碼實現

計算最熱門 Top N 商品
  為了統計每個窗口下最熱門的商品,我們需要再次按窗口進行分組,這里根據ItemViewCount 中的 windowEnd 進行 keyBy()操作。然后使用 ProcessFunction 實現
一個自定義的 TopN 函數 TopNHotItems 來計算點擊量排名前 5 名的商品,並將排名結果格式化成字符串,便於后續輸出。
 
package com.atguigu.hotitems_analysis


import java.util.Properties

import com.sun.jmx.snmp.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer
import scala.tools.cmd.Spec.Accumulator

//定義樣例類
case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long)

//定義窗口聚合結果的樣例類
case class ItemViewCount(itemID:Long, windowEnd:Long, count:Long)

object HotItems {
  def main(args: Array[String]): Unit = {
    //創建流處理環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //從文件讀取數據
    //val inputStream:DataStream[String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\HotItemAnalysis\\src\\main\\resources\\UserBehavior.csv")
    // 從kafka中讀取數據
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","192.168.1.122:9092,192.168.1.133:9092,192.168.1.144:9092")
    properties.setProperty("group.id","consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")
    val inputStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))



    // 將數據轉換成樣例類類型,並提取timestamp定義watermark
    val dataStream:DataStream[UserBehavior] = inputStream
      .map(data =>{
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp*1000L)

    // 對數據進行轉換,過濾出pv行為,開窗聚合統計個數,並自定義單個窗口輸出的結果
    val aggStream:DataStream[ItemViewCount] = dataStream
        .filter(_.behavior == "pv")
        .keyBy("itemId")
        .timeWindow(Time.hours(1),Time.minutes(5))
        .aggregate(new CountAgg(),new ItemCountWindowResult())

    //對窗口聚合結果按照窗口進行分組,並做排序取TopN輸出
    val resultStream:DataStream[String] = aggStream
        .keyBy("windowEnd")
        .process(new TopNHotItem(5))

    resultStream.print()

    env.execute("hot items job")
  }

}

//自定義預聚合函數
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{
  override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1

  override def createAccumulator(): Long = 0L

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a+b

}

class ItemCountWindowResult() extends  WindowFunction[Long, ItemViewCount, Tuple, TimeWindow]{
  override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
    val itemId = key.asInstanceOf[Tuple1[Long]].f0
    val windEnd = window.getEnd
    val count = input.iterator.next()
    out.collect(ItemViewCount(itemId,windEnd,count))
  }

}

class TopNHotItem(n: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String]{
  //定義一個ListState,用來保存當前窗口所有的count結果
  lazy val itemCountListState: ListState[ItemViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemcount-list", classOf[ItemViewCount]))

  override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
    // 每來一條數據,就把他保存到狀態中
    itemCountListState.add(value)

    //注冊定時器,在windowEnd+100觸發
    ctx.timerService().registerEventTimeTimer(value.windowEnd + 100)


  }

  //定時器觸發時,從狀態中取數據,然后排序輸出
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
    // 先把狀態的數據提取到一個ListBuffer中
    val allItemCountList: ListBuffer[ItemViewCount] = ListBuffer()
    import scala.collection.JavaConversions._
    for( itemCount <- itemCountListState.get()){
      allItemCountList += itemCount
    }

    //按照count值大小排序
    val sortedItemCountList = allItemCountList.sortBy(_.count)(Ordering.Long.reverse).take(n)

    //清除狀態
    itemCountListState.clear()

    //將排名信息格式化成string,方便監控顯示
    val result:StringBuilder = new StringBuilder
    result.append("時間: ").append(new Timestamp(timestamp - 100)).append("\n")

    //遍歷sorted列表,輸出TopN信息
    for(i <- sortedItemCountList.indices){
      //獲取當前商品的count信息
      val currentItemCount = sortedItemCountList(i)
      result.append("Top").append(i+1).append(":")
        .append(" 商品ID=").append(currentItemCount.itemID)
        .append(" 訪問量=").append(currentItemCount.count)
        .append("\n")
    }

    result.append("====================================\n\n")

    // 控制輸出頻率
    Thread.sleep(1000)
    out.collect(result.toString())
  }

}

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM