在實際工作中統計uv時,一般會使用count(distinct userId)的方式去統計人數,但這樣效率不高,假設你是統計多個維度的數據,當某天你想要上卷維度,此時又需要從原始層開始統計,如果數據量大的時候將會耗費很多時間,此時便可以使用最細粒度的聚合結果進行上卷統計,即需要自定義聚合函數進行統計,將bitmap序列化為一個字節數組。
1)一次聚合
package org.shydow.UDF import org.apache.spark.sql.{Encoder, Encoders} import org.apache.spark.sql.expressions.Aggregator import org.roaringbitmap.RoaringBitmap /** * @author shydow * @date 2021/12/13 22:55 */ class BitmapGenUDAF extends Aggregator[Int, Array[Byte], Array[Byte]] { override def zero: Array[Byte] = { // 構造一個空的bitmap val bm: RoaringBitmap = RoaringBitmap.bitmapOf() // 將bitmap序列化為字節數組 BitmapUtil.serBitmap(bm) } override def reduce(b: Array[Byte], a: Int): Array[Byte] = { // 將buff反序列化為bitmap val bitmap: RoaringBitmap = BitmapUtil.deSerBitmap(b) bitmap.add(a) BitmapUtil.serBitmap(bitmap) } override def merge(b1: Array[Byte], b2: Array[Byte]): Array[Byte] = { val bitmap1: RoaringBitmap = BitmapUtil.deSerBitmap(b1) val bitmap2: RoaringBitmap = BitmapUtil.deSerBitmap(b2) bitmap1.or(bitmap2) BitmapUtil.serBitmap(bitmap1) } override def finish(reduction: Array[Byte]): Array[Byte] = reduction override def bufferEncoder: Encoder[Array[Byte]] = Encoders.BINARY override def outputEncoder: Encoder[Array[Byte]] = Encoders.BINARY }
package org.shydow.UDF import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} import org.roaringbitmap.RoaringBitmap /** * @author shydow * @date 2021/12/13 22:45 */ object BitmapUtil { /** * 序列化bitmap */ def serBitmap(bm: RoaringBitmap): Array[Byte] = { val stream = new ByteArrayOutputStream() val dataOutput = new DataOutputStream(stream) bm.serialize(dataOutput) stream.toByteArray } /** * 反序列bitmap */ def deSerBitmap(bytes: Array[Byte]): RoaringBitmap = { val bm: RoaringBitmap = RoaringBitmap.bitmapOf() val stream = new ByteArrayInputStream(bytes) val inputStream = new DataInputStream(stream) bm.deserialize(inputStream) bm } }
package org.shydow.UDF import org.apache.spark.sql.{DataFrame, SparkSession, TypedColumn} import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} import org.roaringbitmap.RoaringBitmap /** * @author shydow * @date 2021/12/13 22:25 */ object TestBehaviorAnalysis { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("analysis") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val schema = StructType(Seq( StructField("id", LongType), StructField("eventType", StringType), StructField("code", StringType), StructField("timestamp", LongType)) ) val frame: DataFrame = spark.read.schema(schema).csv("data/OrderLog.csv") frame.createOrReplaceTempView("order_log") /** * 使用distinct count 計算uv */ spark.sql( s""" |select | eventType, | count(1) as pv, | count(distinct id) as uv |from order_log |group by eventType |""".stripMargin).show() /** * 自定義UDAF計算uv */ import org.apache.spark.sql.functions.udaf spark.udf.register("gen_bitmap", udaf(new BitmapGenUDAF)) // 這個函數出來的是字節數組,如果要計算具體的基數得寫一個udf def card(byteArray: Array[Byte]): Int = { val bitmap: RoaringBitmap = BitmapUtil.deSerBitmap(byteArray) bitmap.getCardinality } spark.udf.register("get_card", card _) spark.sql( s""" |select | eventType, | count(1) as pv, | gen_bitmap(id) as uv_arr, | get_card(gen_bitmap(id)) as uv |from order_log |group by eventType |""".stripMargin).show() spark.close() } }
2)上卷聚合
package org.shydow.UDF import org.apache.spark.sql.{Encoder, Encoders} import org.apache.spark.sql.expressions.Aggregator import org.roaringbitmap.RoaringBitmap /** * @author shydow * @date 2021/12/14 8:36 */ class BitmapOrMergeUDAF extends Aggregator[Array[Byte], Array[Byte], Array[Byte]]{ override def zero: Array[Byte] = { val bitmap: RoaringBitmap = RoaringBitmap.bitmapOf() BitmapUtil.serBitmap(bitmap) } override def reduce(b: Array[Byte], a: Array[Byte]): Array[Byte] = { val bitmap1: RoaringBitmap = BitmapUtil.deSerBitmap(b) val bitmap2: RoaringBitmap = BitmapUtil.deSerBitmap(a) bitmap1.or(bitmap2) BitmapUtil.serBitmap(bitmap1) } override def merge(b1: Array[Byte], b2: Array[Byte]): Array[Byte] = { val bitmap1: RoaringBitmap = BitmapUtil.deSerBitmap(b1) val bitmap2: RoaringBitmap = BitmapUtil.deSerBitmap(b2) bitmap1.or(bitmap2) BitmapUtil.serBitmap(bitmap1) } override def finish(reduction: Array[Byte]): Array[Byte] = reduction override def bufferEncoder: Encoder[Array[Byte]] = Encoders.BINARY override def outputEncoder: Encoder[Array[Byte]] = Encoders.BINARY }
package org.shydow.UDF import org.apache.spark.sql.{DataFrame, SparkSession, TypedColumn} import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} import org.roaringbitmap.RoaringBitmap /** * @author shydow * @date 2021/12/13 22:25 */ object TestBehaviorAnalysis { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("analysis") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val schema = StructType(Seq( StructField("id", LongType), StructField("eventType", StringType), StructField("code", StringType), StructField("timestamp", LongType)) ) val frame: DataFrame = spark.read.schema(schema).csv("data/OrderLog.csv") frame.createOrReplaceTempView("order_log") /** * 使用distinct count 計算uv */ spark.sql( s""" |select | eventType, | code, | count(1) as pv, | count(distinct id) as uv |from order_log |where code is not null |group by eventType, code |""".stripMargin).show() /** * 自定義UDAF計算uv */ import org.apache.spark.sql.functions.udaf spark.udf.register("gen_bitmap", udaf(new BitmapGenUDAF)) // 這個函數出來的是字節數組,如果要計算具體的基數得寫一個udf def card(byteArray: Array[Byte]): Int = { val bitmap: RoaringBitmap = BitmapUtil.deSerBitmap(byteArray) bitmap.getCardinality } spark.udf.register("get_card", card _) val res: DataFrame = spark.sql( s""" |select | eventType, | code, | count(1) as pv, | gen_bitmap(id) as uv_arr, | get_card(gen_bitmap(id)) as uv |from order_log |where code is not null |group by eventType, code |""".stripMargin) res.createTempView("dws_stat") spark.udf.register("bitmapOr", udaf(new BitmapOrMergeUDAF)) spark.sql( s""" |select | eventType, | sum(pv) as total_pv, | bitmapOr(uv_arr), | get_card(bitmapOr(uv_arr)) as total_uv |from dws_stat |group by eventType |""".stripMargin).show() spark.close() } }
