示例
使用org.roaringbitmap.Roaringbitmap ,通過序列化和反序列化的方式來進行計算
這里有兩個例子,主要是最終的返回結果類型不同
使用org.roaringbitmap.buffer.*的方式,使用內存映射文件來進行計算,主要是 ImmutableRoaringBitmap 和 MutableRoaringBitmap
適用場景
多維度條件查詢
多組標簽的人群圈選
01.預計算 - 預聚合和重聚合
02.BitMap Index
03.Inverted index
正排索引(forward index)與倒排索引(inverted index)
倒排索引是一種以關鍵字和文檔編號結合,並以關鍵字作為主鍵的索引結構
預計算是數據倉庫領域常見的一種提升查詢效率的方式,通過將全部或部分計算結果提前計算好並存儲下來,
對於后續的相關的查詢可以直接重用之前的預計算結果,從而加速查詢速度。
在多維分析或報表等查詢模式相對比較固定的場景中,我們可以通過預聚合,將需要處理的數據量下降成百上千倍。
此外對於預計算來說,由於用戶的查詢維度,過濾條件,統計方式非常多,考慮到預計算的計算和存儲代價,
不太可能把每種可能的查詢條件都進行預計算,
通常的方式是按照較細粒度進行分組聚合,然后對於后續更粗粒度的分組聚合查詢,可以使用預計算的結果進行重聚合,
Roaringbitmap for Spark 聚合代碼
01.數據映射
不能在 bitmap 中直接使用,需要用 INT 類型的用戶 ID 來標識所有的用戶。
同時原 hive 表中也是不包含 INT 類型的用戶 ID 這個字段的,
所以需要提前准備好 bitmap 分群方案所需的 bitmap_hive 表
一個映射函數,能夠將統計字段的取值范圍映射成自然數
1)row_number() over() 函數,但是在操作億級別行的數據時,會造成數據傾斜
2)一種針對億級行大數據量的全局唯一連續數字 ID 生成方法
02.bit_mapping: 接受Integer類型字段作為參數,內部維護Bitmap數據結構,將輸入數據插入Bitmap中,
並把Bitmap序列化二進制數據作為輸出結果。
如何將 Hive 表中的關系型數據以 bitmap 的形式保存 ? 字節數組 .在程序中序列化roaringbitmap,將二進制數據寫入數據庫
a binaryFile data sourceon Spark, Array[Byte] is represented as a BinaryType
The data type representing Array[Byte] values. Please use the singleton DataTypes.BinaryType
03.數據類型
hive類型 說明 java類型 實例
1).tinyint 1byte有符號的整數 byte 30
2).smalint 2byte有符號的整數 short 30
3).int 4byte有符號的整數 int 30
4).bigint 8byte有符號的整數 long 30
5).boolean 布爾類型true或false boolean true
6).float 單精度 float 3.33
7).double 雙精度 double 3.22
8).string 字符序列,單雙即可 string 'ggj';"tyhjk"
9).timestamp 時間戳,精確的納秒 timestamp '169030219333'
10).binary 字節數組 byte[]
04.re_count_distinct: 接受二進制數據作為參數,反序列化位Bitmap,merge同一分區的多個Bitmap,把Bitmap的cardinality作為結果輸出。
RoaringBitMap
* In-place bitwise OR (union) operation. The current bitmap is modified.
public void or(final RoaringBitmap x2) {}
* serialized Roaring objects with an incorrect byte order
public void serialize(DataOutput out) throws IOException {}
public int serializedSizeInBytes()
* Deserialize (retrieve) this bitmap.
public void deserialize(DataInput in) throws IOException {}
相關系統案例
01.Bitmap索引是應該在數據寫入的同時實時構建呢,還是應該在數據從內存persist到硬盤的時候批量構建
02.如何分別為每個維度列構建Bitmap索引
03.Bitmap索引如何進行壓縮處理?
標簽屬性
標簽划分成枚舉類型(enum)、連續值類型(continuous)、日期類型(date)
nominal 標稱- 枚舉: 標簽取值從維表中選擇,標簽和取值之間的邏輯關系只有等於、不等於,共 2 種
標簽構成 : 標簽划分成單一標簽和復合標簽
每種標簽的 bitmap 構建和運算轉換規則
對部分標簽的邊界值情況進行處理
代碼示例 讀取IntegerType數據
讀取Inter數據,利用Roaringbitmap,返回不重復的個數
`
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.roaringbitmap.RoaringBitmap;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
public class RoaringBitMapByteUDAF extends UserDefinedAggregateFunction {
/**
* // 聚合函數的輸入數據結構
*/
@Override
public StructType inputSchema() {
List<StructField> structFields = new ArrayList<>();
structFields.add(DataTypes.createStructField("field", DataTypes.IntegerType, true));
return DataTypes.createStructType(structFields);
}
/**
* 聚緩存區數據結構 //聚合的中間過程中產生的數據的數據類型定義
*/
@Override
public StructType bufferSchema() {
List<StructField> structFields = new ArrayList<>();
structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
return DataTypes.createStructType(structFields);
}
/**
* 聚合函數返回值數據結構
*/
@Override
public DataType dataType() {
return DataTypes.LongType;
}
/**
* 聚合函數是否是冪等的,即相同輸入是否總是能得到相同輸出
*/
@Override
public boolean deterministic() {
//是否強制每次執行的結果相同
return true;
}
/**
* 初始化緩沖區
*/
@Override
public void initialize(MutableAggregationBuffer buffer) {
//初始化
buffer.update(0, null);
}
/**
* 給聚合函數傳入一條新數據進行處理
* buffer.getInt(0)獲取的是上一次聚合后的值
* //用輸入數據input更新buffer值,類似於combineByKey
*/
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
// 相同的executor間的數據合並
Object in = input.get(0);
Object out = buffer.get(0);
RoaringBitmap outRR = new RoaringBitmap();
// 1. 輸入為空直接返回不更新
if(in == null){
return ;
}
// 2. 源為空則直接更新值為輸入
int inInt = Integer.valueOf(in.toString());
byte[] inBytes = null ;
if(out == null){
System.out.println(inInt);
outRR.add(inInt);
try{
// 將RoaringBitmap的數據轉成字節數組
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream ndos = new DataOutputStream(bos);
outRR.serialize(ndos);
inBytes = bos.toByteArray();
ndos.close();
} catch (IOException e) {
e.printStackTrace();
}
buffer.update(0, inBytes);
return ;
}
// 3. 源和輸入都不為空使用 bitmap去重合並
byte[] outBytes = (byte[]) buffer.get(0);
System.out.println(outBytes.length);
byte[] result = outBytes;
try {
outRR.deserialize(new DataInputStream(new ByteArrayInputStream(outBytes)));
outRR.add(inInt);
System.out.println("去重后的" + String.valueOf(outRR.getCardinality()));
ByteArrayOutputStream boss = new ByteArrayOutputStream();
DataOutputStream ndosn = new DataOutputStream(boss);
outRR.serialize(ndosn);
result = boss.toByteArray();
ndosn.close();
} catch (IOException e) {
e.printStackTrace();
}
buffer.update(0, result);
}
/**
* 合並聚合函數緩沖區
* //合並兩個buffer,將buffer2合並到buffer1.在合並兩個分區聚合結果的時候會被用到,類似於reduceByKey
* //這里要注意該方法沒有返回值,
* 在實現的時候是把buffer2合並到buffer1中去,你需要實現這個合並細節。
*/
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
//不同excutor間的數據合並
// 合並兩個聚合buffer,該函數在聚合並兩個部分聚合數據集的時候調用
//update(buffer1, buffer2);
RoaringBitmap inRBM = new RoaringBitmap();
RoaringBitmap outRBM = new RoaringBitmap();
Object out = buffer1.get(0);
byte[] inBytes = (byte[]) buffer2.get(0);
if(out == null){
buffer1.update(0, inBytes);
return ;
}
byte[] outBitBytes = (byte[]) out;
byte[] resultBit = outBitBytes;
if (out != null) {
try {
outRBM.deserialize(new DataInputStream(new ByteArrayInputStream(outBitBytes)));
System.out.println("去重后的 outRBM " + String.valueOf(outRBM.getCardinality()));
inRBM.deserialize(new DataInputStream(new ByteArrayInputStream(inBytes)));
System.out.println("去重后的 inRBM " + String.valueOf(inRBM.getCardinality()));
RoaringBitmap rror = RoaringBitmap.or(outRBM, inRBM) ;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream ndosn = new DataOutputStream(bos);
rror.serialize(ndosn);
resultBit = bos.toByteArray();
ndosn.close();
} catch (IOException e) {
e.printStackTrace();
}
buffer1.update(0, resultBit);
}
}
/**
* 計算最終結果
*/
@Override
public Object evaluate(Row buffer) {
//根據Buffer計算結果
long r = 2L;
Object val = buffer.get(0);
if (val != null) {
RoaringBitmap rr = new RoaringBitmap();
try {
rr.deserialize(new DataInputStream(new ByteArrayInputStream((byte[]) val)));
r = rr.getCardinality();
// getLongCardinality()
} catch (IOException e) {
e.printStackTrace();
}
}
return r;
}
}
`
讀取BinaryType 返回數據不重復的計數
讀取 BinaryType 數據,即讀取RoaringBitmap序列化數據 利用Roaringbitmap,返回不重復的個數 --
參考學習於 sparkSQL自定義聚合函數(UDAF)實現bitmap函數 https://blog.csdn.net/xiongbingcool/article/details/81282118
`
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.roaringbitmap.RoaringBitmap;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
/**
* 實現自定義聚合函數Bitmap
*/
public class RoaringBitMapByteDistinctUDAF extends UserDefinedAggregateFunction {
/**
* // 聚合函數的輸入數據結構
*/
@Override
public StructType inputSchema() {
List<StructField> structFields = new ArrayList<>();
structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
return DataTypes.createStructType(structFields);
}
/**
* 聚緩存區數據結構
*/
@Override
public StructType bufferSchema() {
List<StructField> structFields = new ArrayList<>();
structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
return DataTypes.createStructType(structFields);
}
/**
* 聚合函數返回值數據結構
*/
@Override
public DataType dataType() {
return DataTypes.LongType;
}
/**
* 聚合函數是否是冪等的,即相同輸入是否總是能得到相同輸出
*/
@Override
public boolean deterministic() {
//是否強制每次執行的結果相同
return false;
}
/**
* 初始化緩沖區
*/
@Override
public void initialize(MutableAggregationBuffer buffer) {
//初始化
buffer.update(0, null);
}
/**
* 給聚合函數傳入一條新數據進行處理
*/
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
// 相同的executor間的數據合並
// 1. 輸入為空直接返回不更新
Object in = input.get(0);
if(in == null){
return ;
}
// 2. 源為空則直接更新值為輸入
byte[] inBytes = (byte[]) in;
Object out = buffer.get(0);
if(out == null){
buffer.update(0, inBytes);
return ;
}
// 3. 源和輸入都不為空使用bitmap去重合並
byte[] outBytes = (byte[]) out;
byte[] result = outBytes;
RoaringBitmap outRR = new RoaringBitmap();
RoaringBitmap inRR = new RoaringBitmap();
try {
outRR.deserialize(new DataInputStream(new ByteArrayInputStream(outBytes)));
inRR.deserialize(new DataInputStream(new ByteArrayInputStream(inBytes)));
outRR.or(inRR);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
outRR.serialize(new DataOutputStream(bos));
result = bos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
buffer.update(0, result);
}
/**
* 合並聚合函數緩沖區
*/
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
//不同excutor間的數據合並
update(buffer1, buffer2);
}
/**
* 計算最終結果
*/
@Override
public Object evaluate(Row buffer) {
//根據Buffer計算結果
long r = 0L;
Object val = buffer.get(0);
if (val != null) {
RoaringBitmap rr = new RoaringBitmap();
try {
rr.deserialize(new DataInputStream(new ByteArrayInputStream((byte[]) val)));
r = rr.getCardinality();
// getLongCardinality()
} catch (IOException e) {
e.printStackTrace();
}
}
return r;
}
}
`
使用Buffer方式
`
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class MutableRoaringBitmapUDAF extends UserDefinedAggregateFunction {
/**
* // 聚合函數的輸入數據結構
*/
@Override
public StructType inputSchema() {
List<StructField> structFields = new ArrayList<>();
structFields.add(DataTypes.createStructField("field", DataTypes.IntegerType, true));
return DataTypes.createStructType(structFields);
}
/**
* 聚緩存區數據結構 //聚合的中間過程中產生的數據的數據類型定義
*/
@Override
public StructType bufferSchema() {
List<StructField> structFields = new ArrayList<>();
structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
return DataTypes.createStructType(structFields);
}
/**
* 聚合函數返回值數據結構
*/
@Override
public DataType dataType() {
return DataTypes.LongType;
}
/**
* 聚合函數是否是冪等的,即相同輸入是否總是能得到相同輸出
*/
@Override
public boolean deterministic() {
//是否強制每次執行的結果相同
return true;
}
/**
* 初始化緩沖區
*/
@Override
public void initialize(MutableAggregationBuffer buffer) {
//初始化
buffer.update(0, null);
}
/**
* 給聚合函數傳入一條新數據進行處理
* buffer.getInt(0)獲取的是上一次聚合后的值
* //用輸入數據input更新buffer值,類似於combineByKey
*/
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
// 相同的executor間的數據合並
Object in = input.get(0);
Object out = buffer.get(0);
MutableRoaringBitmap outRR = new MutableRoaringBitmap();
// 1. 輸入為空直接返回不更新
if(in == null){
return ;
}
// 2. 源為空則直接更新值為輸入
int inInt = Integer.valueOf(in.toString());
byte[] inBytes = null ;
if(out == null){
outRR.add(inInt);
try{
// 將RoaringBitmap的數據轉成字節數組
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream ndos = new DataOutputStream(bos);
outRR.serialize(ndos);
inBytes = bos.toByteArray();
ndos.close();
} catch (IOException e) {
e.printStackTrace();
}
buffer.update(0, inBytes);
return ;
}
// 3. 源和輸入都不為空使用 bitmap去重合並
byte[] outBytes = (byte[]) buffer.get(0);
byte[] result = outBytes;
try {
outRR.deserialize(new DataInputStream(new ByteArrayInputStream(outBytes)));
outRR.add(inInt);
ByteArrayOutputStream boss = new ByteArrayOutputStream();
DataOutputStream ndosn = new DataOutputStream(boss);
outRR.serialize(ndosn);
result = boss.toByteArray();
ndosn.close();
} catch (IOException e) {
e.printStackTrace();
}
buffer.update(0, result);
}
/**
* 合並聚合函數緩沖區
* //合並兩個buffer,將buffer2合並到buffer1.在合並兩個分區聚合結果的時候會被用到,類似於reduceByKey
* //這里要注意該方法沒有返回值,
* 在實現的時候是把buffer2合並到buffer1中去,你需要實現這個合並細節。
*/
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
//不同excutor間的數據合並
// 合並兩個聚合buffer,該函數在聚合並兩個部分聚合數據集的時候調用
//update(buffer1, buffer2);
Object out = buffer1.get(0);
byte[] outBitBytes = (byte[]) out;
byte[] resultBit = outBitBytes;
byte[] inBytes = (byte[]) buffer2.get(0);
ImmutableRoaringBitmap inRBM = new ImmutableRoaringBitmap(ByteBuffer.wrap(inBytes));
if(out == null){
buffer1.update(0, inBytes);
return ;
}
if (out != null) {
try {
ImmutableRoaringBitmap outRBM = new ImmutableRoaringBitmap(ByteBuffer.wrap(outBitBytes));
outRBM.toMutableRoaringBitmap().or(inRBM);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream ndosn = new DataOutputStream(bos);
outRBM.serialize(ndosn);
resultBit = bos.toByteArray();
ndosn.close();
} catch (IOException e) {
e.printStackTrace();
}
buffer1.update(0, resultBit);
}
}
/**
* 計算最終結果
*/
@Override
public Object evaluate(Row buffer) {
//根據Buffer計算結果
long r = 0L;
Object val = buffer.get(0);
if (val != null) {
ImmutableRoaringBitmap rr = new ImmutableRoaringBitmap(ByteBuffer.wrap((byte[]) val));
r = rr.getCardinality();
}
return r;
}
}
`
附錄
倒排索引
文檔檢索系統中最常用的數據結構。通過倒排索引,可以根據單詞快速獲取包含這個單詞的文檔列表。
倒排索引主要由兩個部分組成:“單詞詞典”和“倒排文件”。
①單詞詞典包含了所有粒度的拆分詞;
②倒排文件則保存了該詞對應的所有相關信息。
參考:
hive udf 讀寫存儲到hbase的roaringbitmap https://blog.csdn.net/qq_34748569/article/details/105252559
如何在Spark中實現Count Distinct重聚合 https://developer.aliyun.com/article/723521
SparkSQL用UDAF實現Bitmap函數 https://my.oschina.net/wangzhiwubigdata/blog/4392249
Spark筆記之使用UDAF(User Defined Aggregate Function) https://www.cnblogs.com/cc11001100/p/9471859.html
Inverted index 倒排索引 https://www.cnblogs.com/ycx95/p/9177274.html
時序數據庫技術體系 – Druid 多維查詢之Bitmap索引 https://blog.csdn.net/matrix_google/article/details/82878214
java 讀取文件流 https://www.cnblogs.com/zhzhlong/p/11420084.html
bitmap用戶分群方法在貝殼DMP的實踐和應用 https://cloud.tencent.com/developer/news/683175
JDBC 將RoaringBitmap寫入greenplum https://www.jianshu.com/p/af6a7ef67518
https://stackoverflow.com/questions/53075020/why-does-spark-infer-a-binary-instead-of-an-arraybyte-when-creating-a-datafram
User Defined Aggregate Functions (UDAFs) http://spark.apache.org/docs/latest/sql-ref-functions-udf-aggregate.html
User-Defined Aggregate Functions(UDAF) Using Apache Spark https://www.nitendratech.com/spark/udaf-apache-spark/
基於bitmap實現用戶畫像的標簽圈人功能 https://blog.51cto.com/sbp810050504/2420208
https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/
spark 編寫udaf函數求中位數 https://cloud.tencent.com/developer/article/1507271
RoaringBitmap的使用 https://www.liangzl.com/get-article-detail-148556.html