互聯網UV,PU,TopN統計


1. UV、PV、TopN概念

1.1 UV(unique visitor) 即獨立訪客數

  指訪問某個站點或點擊某個網頁的不同IP地址的人數。在同一天內,UV只記錄第一次進入網站的具有獨立IP的訪問者,在同一天內再次訪問該網站則不計數。UV提供了一定時間內不同觀眾數量的統計指標,而沒有反應出網站的全面活動。

1.2 PV(page view)頁面瀏覽量或點擊量

  頁面瀏覽量或點擊量,是衡量一個網站或網頁用戶訪問量。具體的說,PV值就是所有訪問者在24小時(0點到24點)內看了某個網站多少個頁面或某個網頁多少次。PV是指頁面刷新的次數,每一次頁面刷新,就算做一次PV流量。

1.3 TopN

  顧名思義,就是獲取前10或前N的數據。

2. 離線計算UV、PV、TopN

  這里主要使用hive或者MapReduce計算。

2.1 統計每個時段網站的PV和UV

hive> select date,hour,count(url) pv, count(distinct guid) uv from track_log group by date, hour;

date    hour    pv    uv
20160624    18    64972    23938
20160624    19    61162    22330

2.2 hive中創建結果表

create table db_track_daily_hour_visit(
    date string,
    hour string,
    pv string,
    uv string
)
row format delimited fields terminated by "\t";

2.3 結果寫入Hive表(這里最好使用shell腳本去做)

  結果步驟2.1與2.2,把2.1產生的結果數據寫入到2.2的結果表中

hive> insert overwrite table db_track_daily_hour_visit select date, hour, count(url), pv, count(distinct guid) uv from track_log group by date, hour;

2.4 創建crontab命令,每天定時調度2.3的shell腳本

2.5 mysql中創建一張表,永久存儲分析結果

mysql> create table visit(
    -> date int,
    -> hour int,
    -> pv bigint,
    -> uv bigint          
);

2.6 利用sqoop導入數據到Mysql

  注:以下代碼也可以放到crontab里面每天自動執行

$ bin/sqoop --options-file job1/visit.opt
mysql> select * from visit;

+----------+------+-------+-------+
| date     | hour | pv    | uv    |
+----------+------+-------+-------+
| 20160624 |   18 | 64972 | 23938 |
| 20160624 |   19 | 61162 | 22330 |
+----------+------+-------+-------+

3. 實時計算UV、PV、TopN

   在實時流式計算中,最重要的是在任何情況下,消息不重復、不丟失,即Exactly-once。本文以Kafka->Spark Streaming->Redis為例,一方面說明一下如何做到Exactly-once,另一方面說明一下如何計算實時去重指標的。

3.1 關於數據源

  日志格式為:

  2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=DEIBAH&siteid=3

  2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=GLLIEG&siteid=3

  2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJMEC&siteid=8

  2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HMGBDE&siteid=3

  2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJFLA&siteid=4

  2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=JCEBBC&siteid=9

  2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=KJLAKG&siteid=8

  2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=FHEIKI&siteid=3

  2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IGIDLB&siteid=3

  2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IIIJCD&siteid=5

  日志是由測試程序模擬產生的,字段之間由|~|分隔。pcid為計算機pc的id,siteid為網站id

3.2 實時計算需求

  分天、分小時PV、UV;

  分天、分小時、分網站(siteid)PV、UV;

3.3 Spark Streaming消費Kafka數據

  在Spark Streaming中消費Kafka數據,保證Exactly-once的核心有三點:

  ①使用Direct方式連接Kafka;

  ②自己保存和維護Offset;

  ③更新Offset和計算在同一事務中完成;

  

  后面的Spark Streaming程序,主要有以下步驟:

  ①啟動后,先從Redis中獲取上次保存的Offset,Redis中的key為“topic_partition”,即每個分區維護一個Offset;

  ②使用獲取到的Offset,創建DirectStream;

  ③在處理每批次的消息時,利用Redis的事務機制,確保在Redis中指標的計算和Offset的更新維護,在同一事務中完成。只有這兩者同步,才能真正保證消息的Exactly-once。

./spark-submit \
--class com.lxw1234.spark.TestSparkStreaming \
--master local[2] \
--conf spark.streaming.kafka.maxRatePerPartition=20000 \
--jars /data1/home/dmp/lxw/realtime/commons-pool2-2.3.jar,\
/data1/home/dmp/lxw/realtime/jedis-2.9.0.jar,\
/data1/home/dmp/lxw/realtime/kafka-clients-0.11.0.1.jar,\
/data1/home/dmp/lxw/realtime/spark-streaming-kafka-0-10_2.11-2.2.1.jar \
/data1/home/dmp/lxw/realtime/testsparkstreaming.jar \
--executor-memory 4G \
--num-executors 1

  在啟動Spark Streaming程序時候,有個參數最好指定:

  spark.streaming.kafka.maxRatePartition=20000(每秒鍾從topic的每個partition最多消費的消息條數)

  如果程序第一次運行,或者因為某種原因暫停了很久重新啟動時候,會積累很多消息,如果這些消息同時被消費,很有可能會因為內存不夠而掛掉,因此,需要根據實際的數據量大小,以及批次的間隔時間來設置該參數,以限定批次的消息量。

  如果該參數設置20000,而批次間隔時間未10秒,那么每個批次最多從Kafka中消費20萬消息。

3.4 Redis中的數據模型

  ① 分小時、分網站PV

    普通K-V結構,計算時候使用incr命令遞增,

    Key為 “site_pv_網站ID_小時”,

    如:site_pv_9_2018-02-21-00、site_pv_10_2018-02-12-01

  ② 分小時PV、分天PV(分天的暫無)

    普通K-V結構,計算時候使用incr命令遞增,

    Key為 “pv_小時”,如:pv_2018-02-21-14、pv_2018-02-22-03

    該數據模型用於計算按小時及按天總PV。

  ③ 分小時、分網站UV

    Set結構,計算時候使用sadd命令添加,

    Key為 “site_uv_網站ID_小時”,如:site_uv_8_2018-02-21-12、site_uv_6_2019-09-12-09

    該數據模型用戶計算按小時和網站的總UV(獲取時候使用SCARD命令獲取Set元素個數

  ④ 分小時UV、分天UV(分天的暫無)

    Set結構,計算時候使用sadd命令添加,

    Key為 “uv_小時”,如:uv_2018-02-21-08、uv_2018-02-20-09

    該數據模型用戶計算按小時及按天的總UV(獲取時候使用SCARD命令獲取Set元素個數

  注意:這些Key對應的時間,均有實際消息中的第一個字段(時間)而定。

3.5 代碼程序

  maven依賴包

<dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!-- streaming kafka redis start-->
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-pool2</artifactId>
      <version>2.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.4.3</version>
    </dependency>
    <!-- streaming kafka redis end -->

    <dependency>
      <groupId>ch.ethz.ganymed</groupId>
      <artifactId>ganymed-ssh2</artifactId>
      <version>262</version>
    </dependency>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-xml</artifactId>
      <version>2.11.0-M4</version>
    </dependency>
  </dependencies>

  kafka偏移量管理工具類KafkaOffsetUtils:

package com.swordfall.common

import java.time.Duration
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer, NoOffsetForPartitionException}
import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConversions._
import scala.collection.mutable

object KafkaOffsetUtils {

  /**
    * 獲取最小offset
    *
    * @param consumer   消費者
    * @param partitions topic分區
    * @return
    */
  def getEarliestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
    consumer.seekToBeginning(partitions)
    partitions.map(tp => tp -> consumer.position(tp)).toMap
  }

  /**
    * 獲取最小offset
    * Returns the earliest (lowest) available offsets, taking new partitions into account.
    *
    * @param kafkaParams kafka客戶端配置
    * @param topics      獲取offset的topic
    */
  def getEarliestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
    consumer.subscribe(topics)
    val parts = consumer.assignment()
    consumer.seekToBeginning(parts)
    consumer.pause(parts)
    val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
    consumer.unsubscribe()
    consumer.close()
    offsets
  }

  /**
    * 獲取最大offset
    *
    * @param consumer   消費者
    * @param partitions topic分區
    * @return
    */
  def getLatestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
    consumer.seekToEnd(partitions)
    partitions.map(tp => tp -> consumer.position(tp)).toMap
  }

  /**
    * 獲取最大offset
    * Returns the latest (highest) available offsets, taking new partitions into account.
    *
    * @param kafkaParams kafka客戶端配置
    * @param topics      需要獲取offset的topic
    **/
  def getLatestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
    consumer.subscribe(topics)
    val parts = consumer.assignment()
    consumer.seekToEnd(parts)
    consumer.pause(parts)
    val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
    consumer.unsubscribe()
    consumer.close()
    offsets
  }

  /**
    * 獲取消費者當前offset
    *
    * @param consumer   消費者
    * @param partitions topic分區
    * @return
    */
  def getCurrentOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
    partitions.map(tp => tp -> consumer.position(tp)).toMap
  }

  /**
    * 獲取offsets
    *
    * @param kafkaParams kafka參數
    * @param topics      topic
    * @return
    */
  def getCurrentOffset(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
    val offsetResetConfig = kafkaParams.getOrElse(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").toString.toLowerCase()
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
    val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
    consumer.subscribe(topics)
    val notOffsetTopicPartition = mutable.Set[TopicPartition]()

    try {
      consumer.poll(Duration.ofMillis(0))
    } catch {
      case ex: NoOffsetForPartitionException  =>
        println(s"consumer topic partition offset not found:${ex.partition()}")
        notOffsetTopicPartition.add(ex.partition())
    }

    val parts = consumer.assignment().toSet
    consumer.pause(parts)
    val topicPartition = parts.diff(notOffsetTopicPartition)
    //獲取當前offset
    val currentOffset = mutable.Map[TopicPartition, Long]()
    topicPartition.foreach(x => {
      try {
        currentOffset.put(x, consumer.position(x))
      } catch {
        case ex: NoOffsetForPartitionException =>
          println(s"consumer topic partition offset not found:${ex.partition()}")
          notOffsetTopicPartition.add(ex.partition())
      }
    })
    //獲取earliiestOffset
    val earliestOffset = getEarliestOffsets(consumer, parts)
    earliestOffset.foreach(x => {
      val value = currentOffset.get(x._1)
      if (value.isEmpty){
        currentOffset(x._1) = x._2
      }else if (value.get < x._2){
        println(s"kafka data is lost from partition:${x._1} offset ${value.get} to ${x._2}")
        currentOffset(x._1) = x._2
      }
    })
    // 獲取lastOffset
    val lateOffset = if (offsetResetConfig.equalsIgnoreCase("earliest")){
      getLatestOffsets(consumer, topicPartition)
    }else {
      getLatestOffsets(consumer, parts)
    }

    lateOffset.foreach(x => {
      val value = currentOffset.get(x._1)
      if (value.isEmpty || value.get > x._2){
        currentOffset(x._1) = x._2
      }
    })
    consumer.unsubscribe()
    consumer.close()
    currentOffset.toMap
  }
}

  Redis資源管理工具類InternalRedisClient:

package com.swordfall.streamingkafka

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool

/**
  * @Author: Yang JianQiu
  * @Date: 2019/9/10 0:19
  */
object InternalRedisClient extends Serializable {

  @transient private var pool: JedisPool = null

  def makePool(redisHost: String, redisPort: Int, redisTimeout: Int, maxTotal: Int, maxIdle: Int, minIdle: Int): Unit ={
      makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true, false, 10000)
  }

  def makePool(redisHost: String, redisPort: Int, redisTimeout: Int, maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean, testOnReturn: Boolean, maxWaitMills: Long): Unit ={
    if (pool == null){
      val poolConfig = new GenericObjectPoolConfig()
      poolConfig.setMaxTotal(maxTotal)
      poolConfig.setMaxIdle(maxIdle)
      poolConfig.setMinIdle(minIdle)
      poolConfig.setTestOnBorrow(testOnBorrow)
      poolConfig.setTestOnReturn(testOnReturn)
      poolConfig.setMaxWaitMillis(maxWaitMills)
      pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout)

      val hook = new Thread{
        override def run = pool.destroy()
      }
      sys.addShutdownHook(hook.run)
    }
  }

  def getPool: JedisPool = {
    if (pool != null) pool else null
  }

}

  核心Spark Streaming處理kafka數據,並統計UV、PV到redis里面,同時在redis里面維護kafka偏移量:

package com.swordfall.streamingkafka

import com.swordfall.common.KafkaOffsetUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Pipeline

/**
  * 獲取topic最小的offset
  */
object SparkStreamingKafka {

  def main(args: Array[String]): Unit = {
    val brokers = "192.168.187.201:9092"
    val topic = "nginx"
    val partition: Int = 0 // 測試topic只有一個分區
    val start_offset: Long = 0L

    // Kafka參數
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "test",
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "auto.offset.reset" -> "latest"
    )

    // Redis configurations
    val maxTotal = 10
    val maxIdle = 10
    val minIdle = 1
    val redisHost = "192.168.187.201"
    val redisPort = 6379
    val redisTimeout = 30000
    // 默認db,用戶存放Offset和pv數據
    val dbDefaultIndex = 8
    InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)

    val conf = new SparkConf().setAppName("SparkStreamingKafka").setIfMissing("spark.master", "local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))

    // 從Redis獲取上一次存的Offset
    val jedis = InternalRedisClient.getPool.getResource
    jedis.select(dbDefaultIndex)
    val topic_partition_key = topic + "_" + partition

    val lastSavedOffset = jedis.get(topic_partition_key)
    var fromOffsets: Map[TopicPartition, Long] = null
    if (null != lastSavedOffset){
      var lastOffset = 0L
      try{
        lastOffset = lastSavedOffset.toLong
      }catch{
        case ex: Exception => println(ex.getMessage)
          println("get lastSavedOffset error, lastSavedOffset from redis [" + lastSavedOffset + "]")
          System.exit(1)
      }
      // 設置每個分區起始的Offset
      fromOffsets = Map{ new TopicPartition(topic, partition) -> lastOffset }

      println("lastOffset from redis -> " + lastOffset)
    }else{
      //等於null,表示第一次, redis里面沒有存儲偏移量,但是可能會存在kafka存在一部分數據丟失或者過期,但偏移量沒有記錄在redis里面,
      // 偏移量還是按0的話,會導致偏移量范圍出錯,故需要拿到earliest或者latest的偏移量
      fromOffsets = KafkaOffsetUtils.getCurrentOffset(kafkaParams, List(topic))
    }
    InternalRedisClient.getPool.returnResource(jedis)


    // 使用Direct API 創建Stream
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
    )

    // 開始處理批次消息
    stream.foreachRDD{
      rdd =>
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        val result = processLogs(rdd)
        println("================= Total " + result.length + " events in this batch ..")

        val jedis = InternalRedisClient.getPool.getResource
        // redis是單線程的,下一次請求必須等待上一次請求執行完成后才能繼續執行,然而使用Pipeline模式,客戶端可以一次性的發送多個命令,無需等待服務端返回。這樣就大大的減少了網絡往返時間,提高了系統性能。
        val pipeline: Pipeline = jedis.pipelined()
        pipeline.select(dbDefaultIndex)
        pipeline.multi() // 開啟事務

        // 逐條處理消息
        result.foreach{
          record =>

            // 增加網站小時pv
            val site_pv_by_hour_key = "site_pv_" + record.site_id + "_" + record.hour
            pipeline.incr(site_pv_by_hour_key)

            // 增加小時總pv
            val pv_by_hour_key = "pv_" + record.hour
            pipeline.incr(pv_by_hour_key)

            // 使用set保存當天每個小時網站的uv
            val site_uv_by_hour_key = "site_uv_" + record.site_id + "_" + record.hour
            pipeline.sadd(site_uv_by_hour_key, record.user_id)

            // 使用set保存當天每個小時的uv
            val uv_by_hour_key = "uv_" + record.hour
            pipeline.sadd(uv_by_hour_key, record.user_id)
        }

        // 更新Offset
        offsetRanges.foreach{
          offsetRange =>
            println("partition: " + offsetRange.partition + " fromOffset: " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
            val topic_partition_key = offsetRange.topic + "_" + offsetRange.partition
            pipeline.set(topic_partition_key, offsetRange.untilOffset + "")
        }

        pipeline.exec() // 提交事務
        pipeline.sync() // 關閉pipeline

        InternalRedisClient.getPool.returnResource(jedis)
    }

    ssc.start()
    ssc.awaitTermination()
  }

  case class MyRecord(hour: String, user_id: String, site_id: String)

  def processLogs(messages: RDD[ConsumerRecord[String, String]]): Array[MyRecord] = {
    messages.map(_.value()).flatMap(parseLog).collect()
  }

  def parseLog(line: String): Option[MyRecord] = {
    val ary: Array[String] = line.split("\\|~\\|", -1)
    try{
      val hour = ary(0).substring(0, 13).replace("T", "-")
      val uri = ary(2).split("[=|&]", -1)
      val user_id = uri(1)
      val site_id = uri(3)
      return scala.Some(MyRecord(hour, user_id, site_id))
    }catch{
      case ex: Exception => println(ex.getMessage)
    }
    return None
  }

}

4. 總結

【github地址】

https://github.com/SwordfallYeung/SparkStreamingDemo

【參考資料】

http://www.cj318.cn/?p=4

https://blog.csdn.net/liam08/article/details/80155006

http://www.ikeguang.com/2018/08/03/statistic-hive-daily-week-month/

https://dongkelun.com/2018/06/25/KafkaUV/

https://blog.csdn.net/wwwzydcom/article/details/89506227

http://lxw1234.com/archives/2018/02/901.htm

https://blog.csdn.net/qq_35946969/article/details/83654369

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM