flink(四) 電商用戶行為分析(四)實時流量統計(二)網站獨立訪客數(UV)


網站獨立訪客數(UV)的統計

  另外一個統計流量的重要指標是網站的獨立訪客數(Unique Visitor,UV)。UV指的是一段時間(比如一小時)內訪問網站的總人數,1 天內同一訪客的多次訪問
只記錄為一個訪客。通過 IP 和 cookie 一般是判斷 UV 值的兩種方式。當客戶端第一次訪問某個網站服務器的時候,網站服務器會給這個客戶端的電腦發出一個 Cookie,
通常放在這個客戶端電腦的 C 盤當中。在這個 Cookie 中會分配一個獨一無二的編號,這其中會記錄一些訪問服務器的信息,如訪問時間,訪問了哪些頁面等等。當你下
次再訪問這個服務器的時候,服務器就可以直接從你的電腦中找到上一次放進去的Cookie 文件,並且對其進行一些更新,但那個獨一無二的編號是不會變的。
當然,對於 UserBehavior 數據源來說,我們直接可以根據 userId 來區分不同的用戶。
總量統計
package come.atguigu.networkflow_analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

//定義樣例類
case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long)
case class UvCount(windowEnd: Long, count: Long )

object UniqueVisitor {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(4)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream:DataStream[String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\UserBehavior.csv")


    // 將數據轉換成樣例類類型,並提取timestamp定義watermark
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)})
      .assignAscendingTimestamps(_.timestamp*1000L)


    val uvStream:DataStream[UvCount] = dataStream
      .filter(_.behavior == "pv")
      .timeWindowAll(Time.hours(1))
      .apply( new UvCountResult() )

    uvStream.print()
    env.execute("uv job")
  }
}

class UvCountResult() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow]{
  override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
    var idSet = Set[Long]()

    for( userBehavior <- input ){
      idSet += userBehavior.userId
    }

    out.collect(UvCount(window.getEnd, idSet.size))


  }
}

 

增量統計
package come.atguigu.networkflow_analysis

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

//定義樣例類
case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long)
case class UvCount(windowEnd: Long, count: Long )

object UniqueVisistorIncr {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream:DataStream[String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\UserBehavior.csv")


    // 將數據轉換成樣例類類型,並提取timestamp定義watermark
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp*1000L)


    val uvStream:DataStream[UvCount] = dataStream
      .filter(_.behavior == "pv")
      .timeWindowAll(Time.hours(1))
      .aggregate(new UvCountAgg(), new UvCountResultWithIncreAgg())

    uvStream.print()
    env.execute("uv job")
  }

}


class UvCountResult() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow]{
  override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
    var idSet = Set[Long]()

    for( userBehavior <- input ){
      idSet += userBehavior.userId
    }

    out.collect(UvCount(window.getEnd, idSet.size))


  }
}

class UvCountAgg extends AggregateFunction[UserBehavior, Set[Long], Long]{
  override def add(value: UserBehavior, acc: Set[Long]): Set[Long] = acc + value.userId

  override def createAccumulator(): Set[Long] = Set[Long]()

  override def getResult(acc: Set[Long]): Long = acc.size

  override def merge(acc: Set[Long], acc1: Set[Long]): Set[Long] = acc ++ acc1
}

class UvCountResultWithIncreAgg extends AllWindowFunction[Long, UvCount, TimeWindow]{
  override def apply(window: TimeWindow, input: Iterable[Long], out: Collector[UvCount]): Unit = {
    out.collect( UvCount(window.getEnd, input.head))
  }
}
布隆過濾統計
  我們把所有數據的 userId 都存在了窗口計算的狀態里,在窗口收集數據的過程中,狀態會不斷增大。一般情況下,只要不超出內存的承受范圍,
這種做法也沒什么問題;但如果我們遇到的數據量很大呢?
  把所有數據暫存放到內存里,顯然不是一個好注意。我們會想到,可以利用 redis這種內存級 k-v 數據庫,為我們做一個緩存。但如果我們遇到的情況非常極端,數
據大到驚人呢?比如上億級的用戶,要去重計算 UV。
  如果放到 redis 中,億級的用戶 id(每個 20 字節左右的話)可能需要幾 G 甚至幾十 G 的空間來存儲。當然放到 redis 中,用集群進行擴展也不是不可以,但明顯
代價太大了。
  一個更好的想法是,其實我們不需要完整地存儲用戶 ID 的信息,只要知道他在不在就行了。所以其實我們可以進行壓縮處理,用一位(bit)就可以表示一個用戶
的狀態。這個思想的具體實現就是布隆過濾器(Bloom Filter)。
  本質上布隆過濾器是一種數據結構,比較巧妙的概率型數據結構(probabilisticdata structure),特點是高效地插入和查詢,可以用來告訴你 “某樣東西一定不存
在或者可能存在”。
  它本身是一個很長的二進制向量,既然是二進制的向量,那么顯而易見的,存放的不是 0,就是 1。相比於傳統的 List、Set、Map 等數據結構,它更高效、占用
空間更少,但是缺點是其返回的結果是概率性的,而不是確切的。
  我們的目標就是,利用某種方法(一般是 Hash 函數)把每個數據,對應到一個位圖的某一位上去;如果數據存在,那一位就是 1,不存在則為 0。
注意這里我們用到了 redis 連接存取數據,所以需要加入 redis 客戶端的依賴:
    <dependencies>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.8.1</version>
        </dependency>
    </dependencies>

 

 
package come.atguigu.networkflow_analysis

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers._
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis

//定義樣例類
case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long)
case class UvCount(windowEnd: Long, count: Long )
object UvWithBloomFilter {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream:DataStream[String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\UserBehavior.csv")


    // 將數據轉換成樣例類類型,並提取timestamp定義watermark
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp*1000L)


    val uvStream:DataStream[UvCount] = dataStream
      .filter(_.behavior == "pv")
        .map( data => ("uv", data.userId))
        .keyBy(_._1)
        .timeWindow(Time.hours(1))
        .trigger(new Mytrigger())
        .process(new UvCountResultWithBloomFilter())

    uvStream.print()
    env.execute("uv job")
  }
}

class Mytrigger() extends Trigger[(String, Long), TimeWindow]{
  override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

  override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult =TriggerResult.CONTINUE

  override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {}

  override def onElement(t: (String, Long), l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.FIRE_AND_PURGE

}


class UvCountResultWithBloomFilter extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{
  var jedis: Jedis = _
  var bloom:Bloom = _

  override def open(parameters: Configuration): Unit = {
    jedis = new Jedis("192.168.1.122",6379)
    bloom = new Bloom(1<<30)
  }

  //每次一個數據,主要是要用布隆過濾器判斷redis位圖中對應的位置是否為1
  override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
    val storedKey = context.window.getEnd.toString

    val countMap = "countMap"

    var count = 0L

    if(jedis.hget(countMap, storedKey)!= null ){
      count = jedis.hget(countMap,storedKey).toLong
    }

    // 取userId,計算hash值,判斷是否在位圖中
    val userId = elements.last._2.toString
    val offset = bloom.hash(userId,61)
    val isExist = jedis.getbit(storedKey, offset)

    if(!isExist){
      jedis.setbit(storedKey, offset, true)
      jedis.hset(countMap, storedKey, (count+1).toString)
    }

  }
}

//自定義一個布隆過濾器
class Bloom(size:Long) extends Serializable{
  // 定義位圖的大小,應該是2的整次冪
  private val cap = size

  //實現一個hash函數
  def hash(str:String, seed:Int): Long = {
    var result = 0
    for(i <- 0 until str.length){
      result = result*seed + str.charAt(i)
    }
    // 返回一個在cap范圍內的一個值
    (cap-1) & result
  }

}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM