先說結論:要把hive上的bitmap數據同步到clickhouse的bitmap里面
參考連接:
https://blog.csdn.net/nazeniwaresakini/article/details/108166089
https://blog.csdn.net/qq_27639777/article/details/111005838
https://zhuanlan.zhihu.com/p/351365841
https://blog.csdn.net/yizishou/article/details/78342499
https://github.com/RoaringBitmap/CRoaring
1、Clickhouse的RoaringBitmap結構
目標是將Hive的Binary類型能順利轉成Clickhouse的Bitmap類型
Hive的Binary類型是二進制數組byte[]
Clickhouse的Bitmap類型是,一般是通過groupBitmap方式構建出來的,比如:
select series_id, groupBitmapState(toUInt32(dvid)) bitmap列 FROM dms_pds_flow_interest_dvid_city_day_all group by series_id
其中關鍵sql是:groupBitmapState,源碼對應位置是:AggregateFunctionGroupBitmap.cpp注冊的;
這個C++代碼的關鍵點是:
createAggregateFunctionBitmap<AggregateFunctionGroupBitmapData>
代表通過函數:createAggregateFunctionBitmap來創建bitmap類型:AggregateFunctionGroupBitmapData
然后跟進這個AggregateFunctionGroupBitmapData類,文件(AggregateFunctionGroupBitmapData.h)
結構:
內部:
其中bitmap的一些計算函數邏輯,就是這個AggregateFunctionGroupBitmapData.h文件實現的;比如:
select bitmapOrCardinality(bitmap_a , bitmap_b) 是取兩個bitmap的並集;
那么實現就是:
言歸正傳,根據Rbitmap的數據結構:
參考連接:https://zhuanlan.zhihu.com/p/351365841
1、首先,將 32bit int(無符號的)類型數據 划分為 2^16 個桶(即使用數據的前16位二進制作為桶的編號), 每個桶有一個Container(可以理解為容器也可以理解為這個桶,容器和桶在這里可以理解為一個東西,只是說法不一樣而已) 來存放一個數值的低16位。 2、在存儲和查詢數值時,將數值 k 划分為高 16 位和低 16 位,取高 16 位值找到對應的桶, 然后在將低 16 位值存放在相應的 Container 中。這樣說可能比較抽象不易理解,下面以一個例子來幫助大家理解。
大概意思是,在clickhouse的Rbitmap里面,為了優化存儲空間,會將一個32位的數據,分成高16位和低16位;
高16位會被作為key存儲到short[] keys中,低16位則被看做value
比如我要存儲666這個數字,需要將666划分成高16位和低16位,通過高16位定位到當前桶是5,定位到豎着排列的桶未知后,在將低16位的值存儲到橫着排列的數組中;
之前看clickhouse源碼中C++里面返回的roaring和roaring64map到底是啥,在看CRoaring源碼,創建Rbitmap的地方:
其中的關鍵點是:
上面意思是定義一個結構體,類型是roaring_array_t , 變量名是:high_low_container
這個就是圖片里面說的高16位和低16位的存儲模型,然后查看roaring_array_t的結構:
然后查看ROARING_CONTAINER_T,也就是低16位類型是,因為clickhouse是C++編寫的,因此構建的數組其實是:struct container_s {}指向的各個子類
返回Clickhouse的源碼,要開辟的子類就是:
這樣就又回到了Clickhouse的Rbitmap。雖然轉了一圈,但是已經知道這個Rbitmap底層存儲的其實是數組
2、hive或者sparksql里面的RoaringBitmap
參考hive制作bitmap的連接:https://github.com/sunyaf/bitmapudf
關鍵就是了解UDAF里面的函數:
// 輸入輸出都是Object inspectors public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException; // AggregationBuffer保存數據處理的臨時結果 abstract AggregationBuffer getNewAggregationBuffer() throws HiveException; // 重新設置AggregationBuffer public void reset(AggregationBuffer agg) throws HiveException; // 處理輸入記錄 public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException; // 處理全部輸出數據中的部分數據 public Object terminatePartial(AggregationBuffer agg) throws HiveException; // 把兩個部分數據聚合起來 public void merge(AggregationBuffer agg, Object partial) throws HiveException; // 輸出最終結果 public Object terminate(AggregationBuffer agg) throws HiveException;
我們要兼容hive的Rbitmap和Clickhouse的Rbitmap,只需要關鍵方法:terminate到底返回了什么
查看代碼:
所以關鍵代碼就是:myagg.getPartial()
hive里面返回的Rbitmap其實最終是java的二進制數組;
所以要想Hive的Rbitmap和Clickhouse的Rbitmap能夠兼容,就是演變成:Hive的二進制數組如何有效的存儲到Clickhouse里面
3、Clickhouse的Roaringbitmap是如何存儲的
在回看Clickhouse的Rbitmap,比如看添加像Rbitmap里面添加內容。它的api是:
RoaringBitmap.add(1);
RoaringBitmap.add(2);
其源碼是:
//如果基數超過32個,則會將數據存儲到Rbitmap void toLarge() { //通過智能指針建立對象 rb = std::make_shared<RoaringBitmap>(); //C++ 里面的for循環,翻譯成java就是:for (A x:small) for (const auto & x : small) //將smallSet的數據存儲到Rbitmap里面 rb->add(static_cast<Value>(x.getValue())); //清空smallSet small.clear(); }
void add(T value) { //判斷存儲個數是否小於32 if (isSmall()) { if (small.find(value) == small.end()) { //如果插入的元素沒有超過smallSet的容量,則添加到smallSet if (!small.full()) small.insert(value); //如果插入的元素個數超過了smallSet容量,則插入RoaringBitmap else { toLarge(); rb->add(static_cast<Value>(value)); } } } //如果超過32則按照 else { rb->add(static_cast<Value>(value)); } }
其中內部的寫入是:
void write(DB::WriteBuffer & out) const { //判斷基數,是否超過32來判斷底層的存儲 UInt8 kind = isLarge() ? BitmapKind::Bitmap : BitmapKind::Small; //寫入一個UInt8的標識到buf中,0代表使用smallset 1代表使用RoaringBitmap writeBinary(kind, out); //smallSet的寫入 if (BitmapKind::Small == kind) { small.write(out); } //Rbitmap的寫入 else if (BitmapKind::Bitmap == kind) { //得到要寫入內存的Rbitmap字節大小 auto size = rb->getSizeInBytes(); writeVarUInt(size, out);
//通過指針占有並管理另一對象 std::unique_ptr<char[]> buf(new char[size]); rb->write(buf.get()); out.write(buf.get(), size); } }
其中getSizeInBytes() 這個方法要去CRoaring里面找:
/** * How many bytes are required to serialize this bitmap (meant to be * compatible with Java and Go versions) * * Setting the portable flag to false enable a custom format that * can save space compared to the portable format (e.g., for very * sparse bitmaps). */ size_t getSizeInBytes(bool portable = true) const { if (portable) return api::roaring_bitmap_portable_size_in_bytes(&roaring); else return api::roaring_bitmap_size_in_bytes(&roaring); }
追下去的大概意思就是:
一個header頭部大小
一個Ritmao里面Container數組存儲元素個數
然后header ++ Container數組元素的字節大小
writeVarInt(size , out)的參考連接:https://blog.csdn.net/B_e_a_u_tiful1205/article/details/106064778
所以要寫入Rbitmap,需要存儲結構是:
1、writeBinary(1, out) : java中的Byte(1) 2、 auto size = rb->getSizeInBytes(); writeVarUInt(size, out); 就是像buffer中寫入需要序列化的的字節大小 3、將RoaringBitmap轉化成字節數組
參考一位大神的的,對應java結果就是:https://blog.csdn.net/qq_27639777/article/details/111005838
Byte(1), VarInt(SerializedSizeInBytes), ByteArray(RoaringBitmap)
4、將java的Rbitmap轉成Clickhouse的Rbitmap
在clickhouse中構建一個bitmap:
select bitmapToArray(bitmapBuild([toUInt32(3), toUInt32(4), toUInt32(100)]));
然后對bitmap做一個編碼:
SELECT base64Encode(toString(bitmapBuild([toUInt32(3), toUInt32(4), toUInt32(100)])));
在反過來,將編碼轉回bitmap
1、構建表:
CREATE TABLE test_index.spark_bitmap_test( dt LowCardinality(String) COMMENT '日期', dim_type Int32 COMMENT '維度類型', dim_id Int32 COMMENT '緯度值', encode String COMMENT '編碼', compare_encode AggregateFunction(groupBitmap, UInt32) MATERIALIZED base64Decode(encode) ) Engine = AggregatingMergeTree() PARTITION BY toYYYYMMDD(toDate(dt)) PRIMARY KEY (dim_type, dim_id) ORDER BY (dim_type, dim_id) SETTINGS index_granularity = 4;
2、將編碼插入到bitmap
insert into test_index.spark_bitmap_test values ('2021-12-14' , 1 , 2370 , 'AAMDAAAABAAAAGQAAAA=');
3、查詢:
select dt , dim_type , dim_id , encode , bitmapToArray(compare_encode) as arr , bitmapCardinality(compare_encode) as encode from test_index.spark_bitmap_test;
以上操作就是為了證明,如果在java中能夠將bitmap進行編碼,這樣通過clickhouse的物化視圖自動將編碼字符串轉成bitmap
結合之前分析的源碼:
1、小於32的用smallSet存儲 1):Byte(0) 2):Buffer(RoaringBitmap需要序列化的字節大小) 2、大於32的用RoaringBitmap存儲 1):Byte(0) 2):VarInt(SerializedSizeInBytes) RoaringBitmap需要序列化的字節大小
3):RoaringBitmap的字節數組
綜上轉成java/scala代碼:
import com.test.bitmap.VarInt import org.roaringbitmap.RoaringBitmap import org.roaringbitmap.buffer.{ImmutableRoaringBitmap, MutableRoaringBitmap} import java.io.{ByteArrayOutputStream, DataOutputStream} import java.nio.{ByteBuffer, ByteOrder} import java.util.Base64 object TestBitmapSeries { def main(args: Array[String]): Unit = { val rb = RoaringBitmap.bitmapOf(3, 4, 100) println("starting with bitmap " + rb) //當位圖的基數少於32時,僅使用SmallSet存儲 if (rb.getCardinality <= 32) { //分配緩沖區大小 val initBuffer = ByteBuffer.allocate(2 + 4 * rb.getCardinality) val bos = if (initBuffer.order eq ByteOrder.LITTLE_ENDIAN) initBuffer else initBuffer.slice.order(ByteOrder.LITTLE_ENDIAN) bos.put(new Integer(0).toByte) bos.put(rb.getCardinality.toByte) rb.toArray.foreach(i => bos.putInt(i)) val result = Base64.getEncoder.encodeToString(bos.array()) println("小於32的encode :"+result) } else { //rb.serializedSizeInBytes() 需要序列化的字節數 val seriesByteSize: Int = rb.serializedSizeInBytes() //VarInt.varIntSize返回編碼需要的長度(二進制條件下:>>>) val varIntLen = VarInt.varIntSize(seriesByteSize) //初始化 val initBuffer: ByteBuffer = ByteBuffer.allocate(1 + varIntLen + rb.serializedSizeInBytes()) //字節高低序列,好像意思是在內存的存儲方式 val bos = if (initBuffer.order eq ByteOrder.LITTLE_ENDIAN) initBuffer else initBuffer.slice.order(ByteOrder.LITTLE_ENDIAN) bos.put(new Integer(1).toByte) //TODO VarInt.putVarInt(rb.serializedSizeInBytes(), bos) val baos = new ByteArrayOutputStream() rb.serialize(new DataOutputStream(baos)) bos.put(baos.toByteArray()) val result: String = Base64.getEncoder.encodeToString(bos.array()) println("大於32的encode :"+result) } } }
結果和Clickhouse的編解碼一致
5、利用sparkSql批量序列化RoaringBitmap,然后寫入clickhouse
https://github.com/niutaofan/spark_bitmap.git