SPARK SQL中自定義udf,udaf函數統計uv(使用bitmap)


在實際工作中統計uv時,一般會使用count(distinct userId)的方式去統計人數,但這樣效率不高,假設你是統計多個維度的數據,當某天你想要上卷維度,此時又需要從原始層開始統計,如果數據量大的時候將會耗費很多時間,此時便可以使用最細粒度的聚合結果進行上卷統計,即需要自定義聚合函數進行統計,將bitmap序列化為一個字節數組。

1)一次聚合

package org.shydow.UDF

import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.roaringbitmap.RoaringBitmap
/**
 * @author shydow
 * @date 2021/12/13 22:55
 */

class BitmapGenUDAF extends Aggregator[Int, Array[Byte], Array[Byte]] {

  override def zero: Array[Byte] = {
    // 構造一個空的bitmap
    val bm: RoaringBitmap = RoaringBitmap.bitmapOf()
    // 將bitmap序列化為字節數組
    BitmapUtil.serBitmap(bm)
  }

  override def reduce(b: Array[Byte], a: Int): Array[Byte] = {
    // 將buff反序列化為bitmap
    val bitmap: RoaringBitmap = BitmapUtil.deSerBitmap(b)
    bitmap.add(a)
    BitmapUtil.serBitmap(bitmap)
  }

  override def merge(b1: Array[Byte], b2: Array[Byte]): Array[Byte] = {
    val bitmap1: RoaringBitmap = BitmapUtil.deSerBitmap(b1)
    val bitmap2: RoaringBitmap = BitmapUtil.deSerBitmap(b2)
    bitmap1.or(bitmap2)
    BitmapUtil.serBitmap(bitmap1)
  }

  override def finish(reduction: Array[Byte]): Array[Byte] = reduction

  override def bufferEncoder: Encoder[Array[Byte]] = Encoders.BINARY

  override def outputEncoder: Encoder[Array[Byte]] = Encoders.BINARY
}
package org.shydow.UDF

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}

import org.roaringbitmap.RoaringBitmap

/**
 * @author shydow
 * @date 2021/12/13 22:45
 */
object BitmapUtil {

  /**
   * 序列化bitmap
   */
  def serBitmap(bm: RoaringBitmap): Array[Byte] = {
    val stream = new ByteArrayOutputStream()
    val dataOutput = new DataOutputStream(stream)
    bm.serialize(dataOutput)
    stream.toByteArray
  }

  /**
   * 反序列bitmap
   */
  def deSerBitmap(bytes: Array[Byte]): RoaringBitmap = {
    val bm: RoaringBitmap = RoaringBitmap.bitmapOf()
    val stream = new ByteArrayInputStream(bytes)
    val inputStream = new DataInputStream(stream)
    bm.deserialize(inputStream)
    bm
  }
}
package org.shydow.UDF

import org.apache.spark.sql.{DataFrame, SparkSession, TypedColumn}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.roaringbitmap.RoaringBitmap

/**
 * @author shydow
 * @date 2021/12/13 22:25
 */

object TestBehaviorAnalysis {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder()
      .appName("analysis")
      .master("local[*]")
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    val schema = StructType(Seq(
      StructField("id", LongType),
      StructField("eventType", StringType),
      StructField("code", StringType),
      StructField("timestamp", LongType))
    )
    val frame: DataFrame = spark.read.schema(schema).csv("data/OrderLog.csv")
    frame.createOrReplaceTempView("order_log")

    /**
     * 使用distinct count 計算uv
     */
    spark.sql(
      s"""
         |select
         |  eventType,
         |  count(1) as pv,
         |  count(distinct id) as uv
         |from order_log
         |group by eventType
         |""".stripMargin).show()

    /**
     * 自定義UDAF計算uv
     */
    import org.apache.spark.sql.functions.udaf
    spark.udf.register("gen_bitmap", udaf(new BitmapGenUDAF))  // 這個函數出來的是字節數組,如果要計算具體的基數得寫一個udf

    def card(byteArray: Array[Byte]): Int = {
      val bitmap: RoaringBitmap = BitmapUtil.deSerBitmap(byteArray)
      bitmap.getCardinality
    }
    spark.udf.register("get_card", card _)

    spark.sql(
      s"""
         |select
         |  eventType,
         |  count(1) as pv,
         |  gen_bitmap(id) as uv_arr,
         |  get_card(gen_bitmap(id)) as uv
         |from order_log
         |group by eventType
         |""".stripMargin).show()

    spark.close()
  }
}

 

2)上卷聚合

package org.shydow.UDF

import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator
import org.roaringbitmap.RoaringBitmap

/**
 * @author shydow
 * @date 2021/12/14 8:36
 */

class BitmapOrMergeUDAF extends Aggregator[Array[Byte], Array[Byte], Array[Byte]]{
  override def zero: Array[Byte] = {
    val bitmap: RoaringBitmap = RoaringBitmap.bitmapOf()
    BitmapUtil.serBitmap(bitmap)
  }

  override def reduce(b: Array[Byte], a: Array[Byte]): Array[Byte] = {
    val bitmap1: RoaringBitmap = BitmapUtil.deSerBitmap(b)
    val bitmap2: RoaringBitmap = BitmapUtil.deSerBitmap(a)
    bitmap1.or(bitmap2)
    BitmapUtil.serBitmap(bitmap1)
  }

  override def merge(b1: Array[Byte], b2: Array[Byte]): Array[Byte] = {
    val bitmap1: RoaringBitmap = BitmapUtil.deSerBitmap(b1)
    val bitmap2: RoaringBitmap = BitmapUtil.deSerBitmap(b2)
    bitmap1.or(bitmap2)
    BitmapUtil.serBitmap(bitmap1)
  }

  override def finish(reduction: Array[Byte]): Array[Byte] = reduction

  override def bufferEncoder: Encoder[Array[Byte]] = Encoders.BINARY

  override def outputEncoder: Encoder[Array[Byte]] = Encoders.BINARY
}
package org.shydow.UDF

import org.apache.spark.sql.{DataFrame, SparkSession, TypedColumn}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.roaringbitmap.RoaringBitmap

/**
 * @author shydow
 * @date 2021/12/13 22:25
 */

object TestBehaviorAnalysis {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder()
      .appName("analysis")
      .master("local[*]")
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    val schema = StructType(Seq(
      StructField("id", LongType),
      StructField("eventType", StringType),
      StructField("code", StringType),
      StructField("timestamp", LongType))
    )
    val frame: DataFrame = spark.read.schema(schema).csv("data/OrderLog.csv")
    frame.createOrReplaceTempView("order_log")

    /**
     * 使用distinct count 計算uv
     */
    spark.sql(
      s"""
         |select
         |  eventType,
         |  code,
         |  count(1) as pv,
         |  count(distinct id) as uv
         |from order_log
         |where code is not null
         |group by eventType, code
         |""".stripMargin).show()

    /**
     * 自定義UDAF計算uv
     */
    import org.apache.spark.sql.functions.udaf
    spark.udf.register("gen_bitmap", udaf(new BitmapGenUDAF))  // 這個函數出來的是字節數組,如果要計算具體的基數得寫一個udf

    def card(byteArray: Array[Byte]): Int = {
      val bitmap: RoaringBitmap = BitmapUtil.deSerBitmap(byteArray)
      bitmap.getCardinality
    }
    spark.udf.register("get_card", card _)

    val res: DataFrame = spark.sql(
      s"""
         |select
         |  eventType,
         |  code,
         |  count(1) as pv,
         |  gen_bitmap(id) as uv_arr,
         |  get_card(gen_bitmap(id)) as uv
         |from order_log
         |where code is not null
         |group by eventType, code
         |""".stripMargin)
    res.createTempView("dws_stat")

    spark.udf.register("bitmapOr", udaf(new BitmapOrMergeUDAF))
    spark.sql(
      s"""
        |select
        | eventType,
        | sum(pv) as total_pv,
        | bitmapOr(uv_arr),
        | get_card(bitmapOr(uv_arr)) as total_uv
        |from dws_stat
        |group by eventType
        |""".stripMargin).show()


    spark.close()
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM