Flink 用布隆過濾器來實現UV統計


需求

  查詢一個小時之內的用戶訪問量(一個用戶算一個)

難點:如果用戶量很多,要想用Set等數據結構實現去重不太現實,隨時都會OOM,這時就得利用布隆過濾器,先判斷user是否存在,不存在則計數+1,存在則不做計算,這樣能節省大量的內存空間

利用Flink官方實現的布隆過濾器來實現

package project

import java.lang
import java.sql.Timestamp

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.shaded.guava18.com.google.common.hash.{BloomFilter, Funnels}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector


// uv: unique visitor
// 有多少用戶訪問過網站;pv按照userid去重
// 滑動窗口:窗口長度1小時,滑動距離5秒鍾,每小時用戶數量1億
// 大數據去重的唯一解決方案:布隆過濾器
// 布隆過濾器的組成:bit數組,哈希函數
object UvByBloomFilterWithoutRedis {

  case class UserBehavior(userId: Long,
                          itemId: Long,
                          categoryId: Long,
                          behavior: String,
                          timestamp: Long)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val stream = env
      .readTextFile("D:\\flink-tutorial\\FlinkSZ1128\\src\\main\\resources\\UserBehavior.csv")
      .map(line => {
        val arr = line.split(",")
        UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toLong, arr(3), arr(4).toLong * 1000L)
      })
      .filter(_.behavior.equals("pv"))
      .assignAscendingTimestamps(_.timestamp) // 分配升序時間戳 DataStream
      .map(r => ("key", r.userId))
      .keyBy(_._1)
      .timeWindow(Time.hours(1))
      .aggregate(new UvAggFunc,new UvProcessFunc)

    stream.print()
    env.execute()
  }

  //直接用聚合算子,【count,布隆過濾器】
  class UvAggFunc extends AggregateFunction[(String,Long),(Long,BloomFilter[lang.Long]),Long]{
    override def createAccumulator(): (Long, BloomFilter[lang.Long]) = (0,BloomFilter.create(Funnels.longFunnel(), 100000000, 0.01))

    override def add(value: (String, Long), accumulator: (Long, BloomFilter[lang.Long])): (Long, BloomFilter[lang.Long]) = {
      var bloom: BloomFilter[lang.Long] = accumulator._2
      var uvCount = accumulator._1
      //通過布隆過濾器判斷是否存在,不存在則+1
      if(!bloom.mightContain(value._2)){
        bloom.put(value._2)
        uvCount += 1
      }
      (uvCount,bloom)
    }

    override def getResult(accumulator: (Long, BloomFilter[lang.Long])): Long = accumulator._1 //返回count

    override def merge(a: (Long, BloomFilter[lang.Long]), b: (Long, BloomFilter[lang.Long])): (Long, BloomFilter[lang.Long]) = ???
  }
  class UvProcessFunc extends ProcessWindowFunction[Long, String, String, TimeWindow] {
    // 連接到redis
    override def process(key: String, context: Context, elements: Iterable[Long], out: Collector[String]): Unit = {
      // 窗口結束時間 ==> UV數
      // 窗口結束時間 ==> bit數組

      // 拿到key
      val start = new Timestamp(context.window.getStart)
      val end = new Timestamp(context.window.getEnd)
        out.collect(s"窗口開始時間為$start 到 $end 的uv 為 ${elements.head}")
      }


    }

}

利用redis的bitmap自己手動實現一個簡單的布隆過濾器

import java.sql.Timestamp

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis

// uv: unique visitor
// 有多少用戶訪問過網站;pv按照userid去重
// 滑動窗口:窗口長度1小時,滑動距離5秒鍾,每小時用戶數量1億
// 大數據去重的唯一解決方案:布隆過濾器
// 布隆過濾器的組成:bit數組,哈希函數
object UvByBloomFilter {

  case class UserBehavior(userId: Long,
                          itemId: Long,
                          categoryId: Long,
                          behavior: String,
                          timestamp: Long)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val stream = env
      .readTextFile("/Users/yuanzuo/Desktop/flink-tutorial/FlinkSZ1128/src/main/resources/UserBehavior.csv")
      .map(line => {
        val arr = line.split(",")
        UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toLong, arr(3), arr(4).toLong * 1000L)
      })
      .filter(_.behavior.equals("pv"))
      .assignAscendingTimestamps(_.timestamp) // 分配升序時間戳 DataStream
      .map(r => ("key", r.userId))
      .keyBy(_._1)
      .timeWindow(Time.hours(1))
      .trigger(new UvTrigger)
      .process(new UvProcessFunc)

    stream.print()
    env.execute()
  }

  class UvTrigger extends Trigger[(String, Long), TimeWindow] {
    // 來一條元素調用一次
    override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
      // 來一個事件,就觸發一次窗口計算,並清空窗口
      TriggerResult.FIRE_AND_PURGE
    }

    override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
      TriggerResult.CONTINUE
    }

    override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
      //窗口關閉是會觸發該函數
        val jedis = new Jedis("localhost", 6379)
        val windowEnd = window.getEnd.toString
      //從redis中讀取結果並打印
        println(new Timestamp(windowEnd.toLong), jedis.hget("UvCount", windowEnd))//在這打印時間

      TriggerResult.CONTINUE
    }

    override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
  }

  class UvProcessFunc extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
    // 連接到redis,用懶加載,只會加載一次
    lazy val jedis = new Jedis("localhost", 6379)

    override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = {
      //redis存儲數據類型
        // 窗口結束時間 ==> UV數
        // 窗口結束時間 ==> bit數組

      // 拿到key
      val windowEnd = context.window.getEnd.toString

      var count = 0L

      if (jedis.hget("UvCount", windowEnd) != null) {
        count = jedis.hget("UvCount", windowEnd).toLong
      }

      // 迭代器中只有一條元素,因為每來一條元素,窗口清空一次,見trigger
      val userId = elements.head._2.toString
      // 計算userId對應的bit數組的下標
      val idx = hash(userId, 1 << 20)

      // 判斷userId是否訪問過
      if (!jedis.getbit(windowEnd, idx)) { // 對應的bit為0的話,返回false,用戶一定沒訪問過
        jedis.setbit(windowEnd, idx, true) // 將idx對應的bit翻轉為1
        jedis.hset("UvCount", windowEnd, (count + 1).toString)//寫入結果
      }
    }
  }

  // 為了方便理解,只實現一個哈希函數,返回值是Long,bit數組的下標
  // value: 字符串;size:bit數組的長度
  def hash(value: String, size: Long): Long = {
    val seed = 61 // 種子,必須是質數,能夠很好的防止相撞
    var result = 0L
    for (i <- 0 until value.length) {
      result = result * seed + value.charAt(i)
    }
    (size - 1) & result
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM