需求
查詢一個小時之內的用戶訪問量(一個用戶算一個)
難點:如果用戶量很多,要想用Set等數據結構實現去重不太現實,隨時都會OOM,這時就得利用布隆過濾器,先判斷user是否存在,不存在則計數+1,存在則不做計算,這樣能節省大量的內存空間
利用Flink官方實現的布隆過濾器來實現
package project import java.lang import java.sql.Timestamp import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.shaded.guava18.com.google.common.hash.{BloomFilter, Funnels} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector // uv: unique visitor // 有多少用戶訪問過網站;pv按照userid去重 // 滑動窗口:窗口長度1小時,滑動距離5秒鍾,每小時用戶數量1億 // 大數據去重的唯一解決方案:布隆過濾器 // 布隆過濾器的組成:bit數組,哈希函數 object UvByBloomFilterWithoutRedis { case class UserBehavior(userId: Long, itemId: Long, categoryId: Long, behavior: String, timestamp: Long) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val stream = env .readTextFile("D:\\flink-tutorial\\FlinkSZ1128\\src\\main\\resources\\UserBehavior.csv") .map(line => { val arr = line.split(",") UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toLong, arr(3), arr(4).toLong * 1000L) }) .filter(_.behavior.equals("pv")) .assignAscendingTimestamps(_.timestamp) // 分配升序時間戳 DataStream .map(r => ("key", r.userId)) .keyBy(_._1) .timeWindow(Time.hours(1)) .aggregate(new UvAggFunc,new UvProcessFunc) stream.print() env.execute() } //直接用聚合算子,【count,布隆過濾器】 class UvAggFunc extends AggregateFunction[(String,Long),(Long,BloomFilter[lang.Long]),Long]{ override def createAccumulator(): (Long, BloomFilter[lang.Long]) = (0,BloomFilter.create(Funnels.longFunnel(), 100000000, 0.01)) override def add(value: (String, Long), accumulator: (Long, BloomFilter[lang.Long])): (Long, BloomFilter[lang.Long]) = { var bloom: BloomFilter[lang.Long] = accumulator._2 var uvCount = accumulator._1 //通過布隆過濾器判斷是否存在,不存在則+1 if(!bloom.mightContain(value._2)){ bloom.put(value._2) uvCount += 1 } (uvCount,bloom) } override def getResult(accumulator: (Long, BloomFilter[lang.Long])): Long = accumulator._1 //返回count override def merge(a: (Long, BloomFilter[lang.Long]), b: (Long, BloomFilter[lang.Long])): (Long, BloomFilter[lang.Long]) = ??? } class UvProcessFunc extends ProcessWindowFunction[Long, String, String, TimeWindow] { // 連接到redis override def process(key: String, context: Context, elements: Iterable[Long], out: Collector[String]): Unit = { // 窗口結束時間 ==> UV數 // 窗口結束時間 ==> bit數組 // 拿到key val start = new Timestamp(context.window.getStart) val end = new Timestamp(context.window.getEnd) out.collect(s"窗口開始時間為$start 到 $end 的uv 為 ${elements.head}") } } }
利用redis的bitmap自己手動實現一個簡單的布隆過濾器
import java.sql.Timestamp import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import redis.clients.jedis.Jedis // uv: unique visitor // 有多少用戶訪問過網站;pv按照userid去重 // 滑動窗口:窗口長度1小時,滑動距離5秒鍾,每小時用戶數量1億 // 大數據去重的唯一解決方案:布隆過濾器 // 布隆過濾器的組成:bit數組,哈希函數 object UvByBloomFilter { case class UserBehavior(userId: Long, itemId: Long, categoryId: Long, behavior: String, timestamp: Long) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val stream = env .readTextFile("/Users/yuanzuo/Desktop/flink-tutorial/FlinkSZ1128/src/main/resources/UserBehavior.csv") .map(line => { val arr = line.split(",") UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toLong, arr(3), arr(4).toLong * 1000L) }) .filter(_.behavior.equals("pv")) .assignAscendingTimestamps(_.timestamp) // 分配升序時間戳 DataStream .map(r => ("key", r.userId)) .keyBy(_._1) .timeWindow(Time.hours(1)) .trigger(new UvTrigger) .process(new UvProcessFunc) stream.print() env.execute() } class UvTrigger extends Trigger[(String, Long), TimeWindow] { // 來一條元素調用一次 override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { // 來一個事件,就觸發一次窗口計算,並清空窗口 TriggerResult.FIRE_AND_PURGE } override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { TriggerResult.CONTINUE } override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { //窗口關閉是會觸發該函數 val jedis = new Jedis("localhost", 6379) val windowEnd = window.getEnd.toString //從redis中讀取結果並打印 println(new Timestamp(windowEnd.toLong), jedis.hget("UvCount", windowEnd))//在這打印時間 TriggerResult.CONTINUE } override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {} } class UvProcessFunc extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] { // 連接到redis,用懶加載,只會加載一次 lazy val jedis = new Jedis("localhost", 6379) override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = { //redis存儲數據類型 // 窗口結束時間 ==> UV數 // 窗口結束時間 ==> bit數組 // 拿到key val windowEnd = context.window.getEnd.toString var count = 0L if (jedis.hget("UvCount", windowEnd) != null) { count = jedis.hget("UvCount", windowEnd).toLong } // 迭代器中只有一條元素,因為每來一條元素,窗口清空一次,見trigger val userId = elements.head._2.toString // 計算userId對應的bit數組的下標 val idx = hash(userId, 1 << 20) // 判斷userId是否訪問過 if (!jedis.getbit(windowEnd, idx)) { // 對應的bit為0的話,返回false,用戶一定沒訪問過 jedis.setbit(windowEnd, idx, true) // 將idx對應的bit翻轉為1 jedis.hset("UvCount", windowEnd, (count + 1).toString)//寫入結果 } } } // 為了方便理解,只實現一個哈希函數,返回值是Long,bit數組的下標 // value: 字符串;size:bit數組的長度 def hash(value: String, size: Long): Long = { val seed = 61 // 種子,必須是質數,能夠很好的防止相撞 var result = 0L for (i <- 0 until value.length) { result = result * seed + value.charAt(i) } (size - 1) & result } }