flink(三) 電商用戶行為分析(三)實時流量統計(一)熱門頁面瀏覽量、網站總瀏覽量


1 模塊創建和數據准備

  在 UserBehaviorAnalysis 下 新 建 一 個 maven module 作 為 子 項 目 , 命 名 為NetworkFlowAnalysis。在這個子模塊中,我們同樣並沒有引入更多的依賴,所以也
不需要改動 pom 文件。
  在 src/main/目錄下,將默認源文件目錄 java 改名為 scala。將 apache 服務器的日志文件 apache.log 復制到資源文件目錄 src/main/resources 下,我們將從這里讀取
數據。
  當然,我們也可以仍然用 UserBehavior.csv 作為數據源,這時我們分析的就不是每一次對服務器的訪問請求了,而是具體的頁面瀏覽(“pv”)操作。

2 基於服務器 log 的熱門頁面瀏覽量統計

  我們現在要實現的模塊是 “實時流量統計”。對於一個電商平台而言,用戶登錄的入口流量、不同頁面的訪問流量都是值得分析的重要數據,而這些數據,
可以簡單地從 web 服務器的日志中提取出來
 
  我們在這里先實現“熱門頁面瀏覽數”的統計,也就是讀取服務器日志中的每一行 log,統計在一段時間內用戶訪問每一個 url 的次數,然后排序輸出顯示。
具體做法為:每隔 5 秒,輸出最近 10 分鍾內訪問量最多的前 N 個 URL。可以看出,這個需求與之前“實時熱門商品統計”非常類似,所以我們完全可以借鑒此
前的代碼。
package come.atguigu.networkflow_analysis

import java.text.SimpleDateFormat

import com.sun.jmx.snmp.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)

case class PageViewCount(url:String, windowEnd:Long, count:Long )

object NetworkTopNPage {
  def main(args: Array[String]): Unit = {
    //創建流處理環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream:DataStream[String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\apache.log")

    val dataStream:DataStream[ApacheLogEvent] = inputStream
      .map(data =>{
        val dataArray = data.split(" ")

        val simpleDataFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
        val timestamp = simpleDataFormat.parse(dataArray(3)).getTime

        ApacheLogEvent(dataArray(0), dataArray(1), timestamp, dataArray(5), dataArray(6))
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(60)) {
        override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime
      })

    val aggStream = dataStream
      .keyBy(_.url)
      .timeWindow(Time.minutes(10),Time.seconds(5))
      .aggregate(new PageCountAgg(), new PageCountWindowResult())

    val resultStream = aggStream
      .keyBy(_.windowEnd)
      .process(new TopNHotPage(3))

    resultStream.print()
    env.execute("top n page job")

  }

}

class PageCountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long]{
  override def add(in: ApacheLogEvent, accumulator: Long): Long = accumulator + 1

  override def createAccumulator(): Long = 0L

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a+b

}

class PageCountWindowResult() extends WindowFunction[Long, PageViewCount, String, TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PageViewCount]): Unit = {
    out.collect(PageViewCount(key, window.getEnd, input.head))
  }
}

class TopNHotPage(n: Int) extends  KeyedProcessFunction[Long, PageViewCount, String]{
  lazy val pageCountListState: ListState[PageViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[PageViewCount]("pagecount-list", classOf[PageViewCount]))
  override def processElement(value: PageViewCount, context: KeyedProcessFunction[Long, PageViewCount, String]#Context, collector: Collector[String]): Unit = {
    pageCountListState.add(value)
    context.timerService().registerEventTimeTimer(value.windowEnd+1)
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
    // 獲取收到的所有 URL 訪問量
    val allPageCountList: ListBuffer[PageViewCount] = ListBuffer()
    val iter = pageCountListState.get().iterator()
    while(iter.hasNext){
      allPageCountList += iter.next()
    }
    // 提前清除狀態中的數據,釋放空間
    pageCountListState.clear()

    // 按照訪問量從大到小排序
    val sortedPageCountList = allPageCountList.sortWith(_.count > _.count).take(n)

    //將排名信息格式化成string,方便監控顯示
    val result:StringBuilder = new StringBuilder
    result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n")

    //遍歷sorted列表,輸出TopN信息
    for(i <- sortedPageCountList.indices){
      //獲取當前商品的count信息
      val currentItemCount = sortedPageCountList(i)
      result.append("Top").append(i+1).append(":")
        .append(" 頁面url").append(currentItemCount.url)
        .append(" 訪問量=").append(currentItemCount.count)
        .append("\n")
    }

    result.append("====================================\n\n")

    // 控制輸出頻率
    Thread.sleep(1000)
    out.collect(result.toString())
  }
}

基於時間延遲的代碼

package come.atguigu.networkflow_analysis

import java.text.SimpleDateFormat

import com.sun.jmx.snmp.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer


case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
case class PageViewCount(url:String, windowEnd:Long, count:Long )

object NetworkTopNPageLateness {
  def main(args: Array[String]): Unit = {
    //創建流處理環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //val inputStream:DataStream[String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\apache.log")
    val inputStream = env.socketTextStream("localhost", 777)

    val dataStream:DataStream[ApacheLogEvent] = inputStream
      .map(data =>{
        val dataArray = data.split(" ")
        val simpleDataFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
        val timestamp = simpleDataFormat.parse(dataArray(3)).getTime

        ApacheLogEvent(dataArray(0), dataArray(1), timestamp, dataArray(5), dataArray(6))
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(60)) {
        override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime
      })

    val lateOutputTag = new OutputTag[ApacheLogEvent]("late data")

    val aggStream = dataStream
      .keyBy(_.url)
      .timeWindow(Time.minutes(10),Time.seconds(5))
      .allowedLateness(Time.minutes(1))
      .sideOutputLateData(lateOutputTag)
      .aggregate(new PageCountAgg(), new PageCountWindowResult())

    val lateDataStream = aggStream.getSideOutput(lateOutputTag)

    val resultStream = aggStream
      .keyBy(_.windowEnd)
      .process(new TopNHotPage(3))


    dataStream.print("data")
    aggStream.print("agg")
    lateDataStream.print("late")
    resultStream.print("result")
    env.execute("top n page job")

  }

}


class PageCountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long]{
  override def add(in: ApacheLogEvent, accumulator: Long): Long = accumulator + 1

  override def createAccumulator(): Long = 0L

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a+b

}

class PageCountWindowResult() extends WindowFunction[Long, PageViewCount, String, TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PageViewCount]): Unit = {
    out.collect(PageViewCount(key, window.getEnd, input.head))
  }
}

class TopNHotPage(n: Int) extends  KeyedProcessFunction[Long, PageViewCount, String]{
  lazy val pageCountMapState: MapState[String, Long] = getRuntimeContext.getMapState(new MapStateDescriptor[String, Long]("pagecount-map", classOf[String], classOf[Long]))
  override def processElement(value: PageViewCount, context: KeyedProcessFunction[Long, PageViewCount, String]#Context, collector: Collector[String]): Unit = {
    pageCountMapState.put(value.url, value.count)
 context.timerService().registerEventTimeTimer(value.windowEnd+1)
    context.timerService().registerEventTimeTimer(value.windowEnd+60*1000L)
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
    if(timestamp == ctx.getCurrentKey + 60*1000L){

      pageCountMapState.clear()
      return
    }

    val allPageCountList: ListBuffer[(String, Long)] = ListBuffer()
    val iter = pageCountMapState.entries().iterator()
    while(iter.hasNext){
      val entry = iter.next()
      allPageCountList += ((entry.getKey, entry.getValue))
    }


    val sortedPageCountList = allPageCountList.sortWith(_._2 > _._2 ).take(n)

    //將排名信息格式化成string,方便監控顯示
    val result:StringBuilder = new StringBuilder
    result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n")

    //遍歷sorted列表,輸出TopN信息
    for(i <- sortedPageCountList.indices){
      //獲取當前商品的count信息
      val currentItemCount = sortedPageCountList(i)
      result.append("Top").append(i+1).append(":")
        .append(" 頁面url").append(currentItemCount._1)
        .append(" 訪問量=").append(currentItemCount._2)
        .append("\n")
    }

    result.append("====================================\n\n")

    // 控制輸出頻率
    Thread.sleep(1000)
    out.collect(result.toString())
  }
}

 

3 基於埋點日志數據的網絡流量統計

  我們發現,從 web 服務器 log 中得到的 url,往往更多的是請求某個資源地址(/*.js、/*.css),如果要針對頁面進行統計往往還需要進行過濾。而在實際電商應
用中,相比每個單獨頁面的訪問量,我們可能更加關心整個電商網站的網絡流量。這個指標,除了合並之前每個頁面的統計結果之外,還可以通過統計埋點日志數據
中的“pv”行為來得到。
 
3.1 網站總瀏覽量(PV)的統計
 
  衡量網站流量一個最簡單的指標,就是網站的頁面瀏覽量(Page View,PV)。用戶每次打開一個頁面便記錄 1 次 PV,多次打開同一頁面則瀏覽量累計。
一般來說,PV 與來訪者的數量成正比,但是 PV 並不直接決定頁面的真實來訪者數量,如同一個來訪者通過不斷的刷新頁面,也可以制造出非常高的 PV。
  我們知道,用戶瀏覽頁面時,會從瀏覽器向網絡服務器發出一個請求(Request),網絡服務器接到這個請求后,會將該請求對應的一個網頁(Page)發送給瀏覽器,
從而產生了一個 PV。所以我們的統計方法,可以是從 web 服務器的日志中去提取對應的頁面訪問然后統計,就向上一節中的做法一樣;也可以直接從埋點日志中提
取用戶發來的頁面請求,從而統計出總瀏覽量。
  所以,接下來我們用 UserBehavior.csv 作為數據源,實現一個網站總瀏覽量的統計。我們可以設置滾動時間窗口,實時統計每小時內的網站 PV。
package come.atguigu.networkflow_analysis

import java.util.Properties

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector

import scala.tools.cmd.Spec.Accumulator

//定義樣例類
case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long)

case class PvCount(windowEnd:Long, count:Long)


object PageView {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream:DataStream[String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\UserBehavior.csv")


    // 將數據轉換成樣例類類型,並提取timestamp定義watermark
    val dataStream:DataStream[UserBehavior] = inputStream
      .map(data =>{
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)})
      .assignAscendingTimestamps(_.timestamp*1000L)


    val pvStream:DataStream[PvCount] = dataStream
      .filter(_.behavior == "pv")
      .map(data => ("pv", 1L))
      .keyBy(_._1)
      .timeWindow(Time.hours(1))
      .aggregate(new PvCountAgg(), new PvCountResult())

    pvStream.print()
    env.execute("pv job")
  }

}

class PvCountAgg() extends AggregateFunction[(String, Long), Long, Long]{
  override def add(in: (String, Long), acc: Long): Long = acc + 1

  override def createAccumulator(): Long = 0L

  override def getResult(acc: Long): Long = acc

  override def merge(acc: Long, acc1: Long): Long = acc + acc1
}

class PvCountResult() extends WindowFunction[Long, PvCount, String, TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {
    out.collect(PvCount(window.getEnd, input.head))
  }
}

性能優化后代碼

package come.atguigu.networkflow_analysis

import org.apache.flink.api.common.functions.{AggregateFunction, MapFunction}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random


//定義樣例類
case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long)

case class PvCount(windowEnd:Long, count:Long)

object PageViewOp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(4)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream:DataStream[String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\UserBehavior.csv")


    // 將數據轉換成樣例類類型,並提取timestamp定義watermark
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)})
      .assignAscendingTimestamps(_.timestamp*1000L)


    val pvStream:DataStream[PvCount] = dataStream
      .filter(_.behavior == "pv")
      .map(new MyMapper()) // 主要是為了能夠將數據分區進而利用並行計算以及防止數據傾斜
      .keyBy(_._1)
      .timeWindow(Time.hours(1))
      .aggregate(new PvCountAgg(), new PvCountResult())

    val pvTotalStream:DataStream[PvCount] = pvStream
        .keyBy(_.windowEnd)
        .process(new TotalPvCountResult())

    pvTotalStream.print()
    env.execute("pv job")
  }

}


class PvCountAgg() extends AggregateFunction[(String, Long), Long, Long]{
  override def add(in: (String, Long), acc: Long): Long = acc + 1

  override def createAccumulator(): Long = 0L

  override def getResult(acc: Long): Long = acc

  override def merge(acc: Long, acc1: Long): Long = acc + acc1
}

class PvCountResult() extends WindowFunction[Long, PvCount, String, TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {
    out.collect(PvCount(window.getEnd, input.head))
  }
}

class MyMapper() extends MapFunction[UserBehavior, (String, Long)]{
  override def map(value: UserBehavior): (String, Long) = (Random.nextString(10), 1L)

}

class TotalPvCountResult() extends KeyedProcessFunction[Long, PvCount, PvCount]{
  lazy val totalCountState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("total-count", classOf[Long]))
  override def processElement(value: PvCount, context: KeyedProcessFunction[Long, PvCount, PvCount]#Context, collector: Collector[PvCount]): Unit = {
    totalCountState.update(totalCountState.value() + value.count)
    context.timerService().registerEventTimeTimer(value.windowEnd + 1)

  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#OnTimerContext, out: Collector[PvCount]): Unit = {
    out.collect(PvCount(ctx.getCurrentKey, totalCountState.value()))
    totalCountState.clear()
  }
}

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM