網站獨立訪客數(UV)的統計
另外一個統計流量的重要指標是網站的獨立訪客數(Unique Visitor,UV)。UV指的是一段時間(比如一小時)內訪問網站的總人數,1 天內同一訪客的多次訪問
只記錄為一個訪客。通過 IP 和 cookie 一般是判斷 UV 值的兩種方式。當客戶端第一次訪問某個網站服務器的時候,網站服務器會給這個客戶端的電腦發出一個 Cookie,
通常放在這個客戶端電腦的 C 盤當中。在這個 Cookie 中會分配一個獨一無二的編號,這其中會記錄一些訪問服務器的信息,如訪問時間,訪問了哪些頁面等等。當你下
次再訪問這個服務器的時候,服務器就可以直接從你的電腦中找到上一次放進去的Cookie 文件,並且對其進行一些更新,但那個獨一無二的編號是不會變的。
當然,對於 UserBehavior 數據源來說,我們直接可以根據 userId 來區分不同的用戶。
總量統計
package come.atguigu.networkflow_analysis import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.AllWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector //定義樣例類 case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long) case class UvCount(windowEnd: Long, count: Long ) object UniqueVisitor { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(4) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val inputStream:DataStream[String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\UserBehavior.csv") // 將數據轉換成樣例類類型,並提取timestamp定義watermark val dataStream: DataStream[UserBehavior] = inputStream .map(data => { val dataArray = data.split(",") UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)}) .assignAscendingTimestamps(_.timestamp*1000L) val uvStream:DataStream[UvCount] = dataStream .filter(_.behavior == "pv") .timeWindowAll(Time.hours(1)) .apply( new UvCountResult() ) uvStream.print() env.execute("uv job") } } class UvCountResult() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow]{ override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = { var idSet = Set[Long]() for( userBehavior <- input ){ idSet += userBehavior.userId } out.collect(UvCount(window.getEnd, idSet.size)) } }
增量統計
package come.atguigu.networkflow_analysis import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.function.AllWindowFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector //定義樣例類 case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long) case class UvCount(windowEnd: Long, count: Long ) object UniqueVisistorIncr { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val inputStream:DataStream[String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\UserBehavior.csv") // 將數據轉換成樣例類類型,並提取timestamp定義watermark val dataStream: DataStream[UserBehavior] = inputStream .map(data => { val dataArray = data.split(",") UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong) }) .assignAscendingTimestamps(_.timestamp*1000L) val uvStream:DataStream[UvCount] = dataStream .filter(_.behavior == "pv") .timeWindowAll(Time.hours(1)) .aggregate(new UvCountAgg(), new UvCountResultWithIncreAgg()) uvStream.print() env.execute("uv job") } } class UvCountResult() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow]{ override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = { var idSet = Set[Long]() for( userBehavior <- input ){ idSet += userBehavior.userId } out.collect(UvCount(window.getEnd, idSet.size)) } } class UvCountAgg extends AggregateFunction[UserBehavior, Set[Long], Long]{ override def add(value: UserBehavior, acc: Set[Long]): Set[Long] = acc + value.userId override def createAccumulator(): Set[Long] = Set[Long]() override def getResult(acc: Set[Long]): Long = acc.size override def merge(acc: Set[Long], acc1: Set[Long]): Set[Long] = acc ++ acc1 } class UvCountResultWithIncreAgg extends AllWindowFunction[Long, UvCount, TimeWindow]{ override def apply(window: TimeWindow, input: Iterable[Long], out: Collector[UvCount]): Unit = { out.collect( UvCount(window.getEnd, input.head)) } }
布隆過濾統計
我們把所有數據的 userId 都存在了窗口計算的狀態里,在窗口收集數據的過程中,狀態會不斷增大。一般情況下,只要不超出內存的承受范圍,
這種做法也沒什么問題;但如果我們遇到的數據量很大呢?
把所有數據暫存放到內存里,顯然不是一個好注意。我們會想到,可以利用 redis這種內存級 k-v 數據庫,為我們做一個緩存。但如果我們遇到的情況非常極端,數
據大到驚人呢?比如上億級的用戶,要去重計算 UV。
如果放到 redis 中,億級的用戶 id(每個 20 字節左右的話)可能需要幾 G 甚至幾十 G 的空間來存儲。當然放到 redis 中,用集群進行擴展也不是不可以,但明顯
代價太大了。
一個更好的想法是,其實我們不需要完整地存儲用戶 ID 的信息,只要知道他在不在就行了。所以其實我們可以進行壓縮處理,用一位(bit)就可以表示一個用戶
的狀態。這個思想的具體實現就是布隆過濾器(Bloom Filter)。
本質上布隆過濾器是一種數據結構,比較巧妙的概率型數據結構(probabilisticdata structure),特點是高效地插入和查詢,可以用來告訴你 “某樣東西一定不存
在或者可能存在”。
它本身是一個很長的二進制向量,既然是二進制的向量,那么顯而易見的,存放的不是 0,就是 1。相比於傳統的 List、Set、Map 等數據結構,它更高效、占用
空間更少,但是缺點是其返回的結果是概率性的,而不是確切的。
我們的目標就是,利用某種方法(一般是 Hash 函數)把每個數據,對應到一個位圖的某一位上去;如果數據存在,那一位就是 1,不存在則為 0。
注意這里我們用到了 redis 連接存取數據,所以需要加入 redis 客戶端的依賴:
<dependencies> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.8.1</version> </dependency> </dependencies>
package come.atguigu.networkflow_analysis import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers._ import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import redis.clients.jedis.Jedis //定義樣例類 case class UserBehavior(userId:Long, itemId:Long, categoryId:Int, behavior:String, timestamp:Long) case class UvCount(windowEnd: Long, count: Long ) object UvWithBloomFilter { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val inputStream:DataStream[String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\UserBehavior.csv") // 將數據轉換成樣例類類型,並提取timestamp定義watermark val dataStream: DataStream[UserBehavior] = inputStream .map(data => { val dataArray = data.split(",") UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong) }) .assignAscendingTimestamps(_.timestamp*1000L) val uvStream:DataStream[UvCount] = dataStream .filter(_.behavior == "pv") .map( data => ("uv", data.userId)) .keyBy(_._1) .timeWindow(Time.hours(1)) .trigger(new Mytrigger()) .process(new UvCountResultWithBloomFilter()) uvStream.print() env.execute("uv job") } } class Mytrigger() extends Trigger[(String, Long), TimeWindow]{ override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult =TriggerResult.CONTINUE override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {} override def onElement(t: (String, Long), l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.FIRE_AND_PURGE } class UvCountResultWithBloomFilter extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{ var jedis: Jedis = _ var bloom:Bloom = _ override def open(parameters: Configuration): Unit = { jedis = new Jedis("192.168.1.122",6379) bloom = new Bloom(1<<30) } //每次一個數據,主要是要用布隆過濾器判斷redis位圖中對應的位置是否為1 override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = { val storedKey = context.window.getEnd.toString val countMap = "countMap" var count = 0L if(jedis.hget(countMap, storedKey)!= null ){ count = jedis.hget(countMap,storedKey).toLong } // 取userId,計算hash值,判斷是否在位圖中 val userId = elements.last._2.toString val offset = bloom.hash(userId,61) val isExist = jedis.getbit(storedKey, offset) if(!isExist){ jedis.setbit(storedKey, offset, true) jedis.hset(countMap, storedKey, (count+1).toString) } } } //自定義一個布隆過濾器 class Bloom(size:Long) extends Serializable{ // 定義位圖的大小,應該是2的整次冪 private val cap = size //實現一個hash函數 def hash(str:String, seed:Int): Long = { var result = 0 for(i <- 0 until str.length){ result = result*seed + str.charAt(i) } // 返回一個在cap范圍內的一個值 (cap-1) & result } }