通過clickhouse源碼了解hive/spark的RoaringBitmap寫入Clickhouse的bitmap


 

 先說結論:要把hive上的bitmap數據同步到clickhouse的bitmap里面

參考連接:

https://blog.csdn.net/nazeniwaresakini/article/details/108166089

https://blog.csdn.net/qq_27639777/article/details/111005838

https://zhuanlan.zhihu.com/p/351365841

https://blog.csdn.net/yizishou/article/details/78342499

https://github.com/RoaringBitmap/CRoaring

1、Clickhouse的RoaringBitmap結構

目標是將Hive的Binary類型能順利轉成Clickhouse的Bitmap類型

Hive的Binary類型是二進制數組byte[]

 

 

Clickhouse的Bitmap類型是,一般是通過groupBitmap方式構建出來的,比如:

select 
series_id, 
groupBitmapState(toUInt32(dvid))  bitmap列
FROM 
dms_pds_flow_interest_dvid_city_day_all 
group by 
series_id

其中關鍵sql是:groupBitmapState,源碼對應位置是:AggregateFunctionGroupBitmap.cpp注冊的;

 

 

 這個C++代碼的關鍵點是:

createAggregateFunctionBitmap<AggregateFunctionGroupBitmapData>

代表通過函數:createAggregateFunctionBitmap來創建bitmap類型:AggregateFunctionGroupBitmapData

然后跟進這個AggregateFunctionGroupBitmapData類,文件(AggregateFunctionGroupBitmapData.h)

結構:

 

 

 內部:

 

 

其中bitmap的一些計算函數邏輯,就是這個AggregateFunctionGroupBitmapData.h文件實現的;比如:

select bitmapOrCardinality(bitmap_a , bitmap_b) 是取兩個bitmap的並集;

那么實現就是:

 

 

 言歸正傳,根據Rbitmap的數據結構:

參考連接:https://zhuanlan.zhihu.com/p/351365841

1、首先,將 32bit int(無符號的)類型數據 划分為 2^16 個桶(即使用數據的前16位二進制作為桶的編號),
每個桶有一個Container(可以理解為容器也可以理解為這個桶,容器和桶在這里可以理解為一個東西,只是說法不一樣而已) 來存放一個數值的低16位。

2、在存儲和查詢數值時,將數值 k 划分為高 16 位和低 16 位,取高 16 位值找到對應的桶,
然后在將低 16 位值存放在相應的 Container 中。這樣說可能比較抽象不易理解,下面以一個例子來幫助大家理解。

 

 

 大概意思是,在clickhouse的Rbitmap里面,為了優化存儲空間,會將一個32位的數據,分成高16位和低16位;

高16位會被作為key存儲到short[] keys中,低16位則被看做value

比如我要存儲666這個數字,需要將666划分成高16位和低16位,通過高16位定位到當前桶是5,定位到豎着排列的桶未知后,在將低16位的值存儲到橫着排列的數組中;

 

之前看clickhouse源碼中C++里面返回的roaring和roaring64map到底是啥,在看CRoaring源碼,創建Rbitmap的地方:

 

 

 其中的關鍵點是:

 

 

 上面意思是定義一個結構體,類型是roaring_array_t , 變量名是:high_low_container

這個就是圖片里面說的高16位和低16位的存儲模型,然后查看roaring_array_t的結構:

 

 

 然后查看ROARING_CONTAINER_T,也就是低16位類型是,因為clickhouse是C++編寫的,因此構建的數組其實是:struct container_s {}指向的各個子類

 

 

 

返回Clickhouse的源碼,要開辟的子類就是:

 

 這樣就又回到了Clickhouse的Rbitmap。雖然轉了一圈,但是已經知道這個Rbitmap底層存儲的其實是數組

 

2、hive或者sparksql里面的RoaringBitmap

參考hive制作bitmap的連接:https://github.com/sunyaf/bitmapudf

關鍵就是了解UDAF里面的函數:

// 輸入輸出都是Object inspectors  
public  ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;  
  
// AggregationBuffer保存數據處理的臨時結果  
abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;  
  
// 重新設置AggregationBuffer  
public void reset(AggregationBuffer agg) throws HiveException;  
  
// 處理輸入記錄  
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;  
  
// 處理全部輸出數據中的部分數據  
public Object terminatePartial(AggregationBuffer agg) throws HiveException;  
  
// 把兩個部分數據聚合起來  
public void merge(AggregationBuffer agg, Object partial) throws HiveException;  
  
// 輸出最終結果  
public Object terminate(AggregationBuffer agg) throws HiveException;  

我們要兼容hive的Rbitmap和Clickhouse的Rbitmap,只需要關鍵方法:terminate到底返回了什么

查看代碼:

 

 所以關鍵代碼就是:myagg.getPartial()

 

 hive里面返回的Rbitmap其實最終是java的二進制數組;

所以要想Hive的Rbitmap和Clickhouse的Rbitmap能夠兼容,就是演變成:Hive的二進制數組如何有效的存儲到Clickhouse里面

3、Clickhouse的Roaringbitmap是如何存儲的

在回看Clickhouse的Rbitmap,比如看添加像Rbitmap里面添加內容。它的api是:

RoaringBitmap.add(1);

RoaringBitmap.add(2);

其源碼是:

//如果基數超過32個,則會將數據存儲到Rbitmap
    void toLarge()
    {
        //通過智能指針建立對象
        rb = std::make_shared<RoaringBitmap>();
        //C++ 里面的for循環,翻譯成java就是:for (A x:small)
        for (const auto & x : small)
            //將smallSet的數據存儲到Rbitmap里面
            rb->add(static_cast<Value>(x.getValue()));
        //清空smallSet
        small.clear();
    }

 

 

 void add(T value)
    {
        //判斷存儲個數是否小於32
        if (isSmall())
        {
            if (small.find(value) == small.end())
            {
                //如果插入的元素沒有超過smallSet的容量,則添加到smallSet
                if (!small.full())
                    small.insert(value);
                //如果插入的元素個數超過了smallSet容量,則插入RoaringBitmap
                else
                {
                    toLarge();
                    rb->add(static_cast<Value>(value));
                }
            }
        }
        //如果超過32則按照
        else
        {
            rb->add(static_cast<Value>(value));
        }
    }

 

其中內部的寫入是:

void write(DB::WriteBuffer & out) const
    {
        //判斷基數,是否超過32來判斷底層的存儲
        UInt8 kind = isLarge() ? BitmapKind::Bitmap : BitmapKind::Small;
        //寫入一個UInt8的標識到buf中,0代表使用smallset 1代表使用RoaringBitmap
        writeBinary(kind, out);
        //smallSet的寫入
        if (BitmapKind::Small == kind)
        {
            small.write(out);
        }
        //Rbitmap的寫入
        else if (BitmapKind::Bitmap == kind)
        {
            //得到要寫入內存的Rbitmap字節大小
            auto size = rb->getSizeInBytes();
            writeVarUInt(size, out);
       //通過指針占有並管理另一對象 std::unique_ptr
<char[]> buf(new char[size]); rb->write(buf.get()); out.write(buf.get(), size); } }

 其中getSizeInBytes() 這個方法要去CRoaring里面找:

/**
     * How many bytes are required to serialize this bitmap (meant to be
     * compatible with Java and Go versions)
     *
     * Setting the portable flag to false enable a custom format that
     * can save space compared to the portable format (e.g., for very
     * sparse bitmaps).
     */
    size_t getSizeInBytes(bool portable = true) const {
        if (portable)
            return api::roaring_bitmap_portable_size_in_bytes(&roaring);
        else
            return api::roaring_bitmap_size_in_bytes(&roaring);
    }

追下去的大概意思就是:

一個header頭部大小

一個Ritmao里面Container數組存儲元素個數

然后header ++ Container數組元素的字節大小

 

writeVarInt(size , out)的參考連接:https://blog.csdn.net/B_e_a_u_tiful1205/article/details/106064778

所以要寫入Rbitmap,需要存儲結構是:

1、writeBinary(1, out)  : java中的Byte(1)
2、
auto size = rb->getSizeInBytes();
writeVarUInt(size, out);
就是像buffer中寫入需要序列化的的字節大小
3、將RoaringBitmap轉化成字節數組

參考一位大神的的,對應java結果就是:https://blog.csdn.net/qq_27639777/article/details/111005838

Byte(1), VarInt(SerializedSizeInBytes), ByteArray(RoaringBitmap)

 

4、將java的Rbitmap轉成Clickhouse的Rbitmap

在clickhouse中構建一個bitmap:

select bitmapToArray(bitmapBuild([toUInt32(3), toUInt32(4), toUInt32(100)]));

 

 

 然后對bitmap做一個編碼:

SELECT base64Encode(toString(bitmapBuild([toUInt32(3), toUInt32(4), toUInt32(100)])));

 

 

 在反過來,將編碼轉回bitmap

1、構建表:

CREATE TABLE test_index.spark_bitmap_test(
  dt LowCardinality(String) COMMENT '日期',
  dim_type Int32 COMMENT '維度類型',
  dim_id Int32 COMMENT '緯度值',
  encode String COMMENT '編碼',
  compare_encode AggregateFunction(groupBitmap, UInt32)
      MATERIALIZED base64Decode(encode)
)
Engine = AggregatingMergeTree()
PARTITION BY toYYYYMMDD(toDate(dt))
PRIMARY KEY (dim_type, dim_id)
ORDER BY (dim_type, dim_id)
SETTINGS index_granularity = 4;

2、將編碼插入到bitmap

insert into test_index.spark_bitmap_test values ('2021-12-14' , 1 , 2370 , 'AAMDAAAABAAAAGQAAAA=');

3、查詢:

select 
       dt , 
       dim_type ,
       dim_id , 
       encode , 
       bitmapToArray(compare_encode) as arr , 
       bitmapCardinality(compare_encode) as encode 
from test_index.spark_bitmap_test;

 

 

 以上操作就是為了證明,如果在java中能夠將bitmap進行編碼,這樣通過clickhouse的物化視圖自動將編碼字符串轉成bitmap

結合之前分析的源碼:

1、小於32的用smallSet存儲
    1):Byte(0)
    2):Buffer(RoaringBitmap需要序列化的字節大小)

2、大於32的用RoaringBitmap存儲
     1):Byte(0)
     2):VarInt(SerializedSizeInBytes)  RoaringBitmap需要序列化的字節大小
   3):RoaringBitmap的字節數組
 
        

綜上轉成java/scala代碼:

import com.test.bitmap.VarInt
import org.roaringbitmap.RoaringBitmap
import org.roaringbitmap.buffer.{ImmutableRoaringBitmap, MutableRoaringBitmap}

import java.io.{ByteArrayOutputStream, DataOutputStream}
import java.nio.{ByteBuffer, ByteOrder}
import java.util.Base64

object TestBitmapSeries {
  def main(args: Array[String]): Unit = {
    val rb = RoaringBitmap.bitmapOf(3, 4, 100)
    println("starting with  bitmap " + rb)

    //當位圖的基數少於32時,僅使用SmallSet存儲
    if (rb.getCardinality <= 32) {
      //分配緩沖區大小
      val initBuffer = ByteBuffer.allocate(2 + 4 * rb.getCardinality)
      val bos = if (initBuffer.order eq ByteOrder.LITTLE_ENDIAN) initBuffer else initBuffer.slice.order(ByteOrder.LITTLE_ENDIAN)
      bos.put(new Integer(0).toByte)
      bos.put(rb.getCardinality.toByte)
      rb.toArray.foreach(i => bos.putInt(i))
      val result = Base64.getEncoder.encodeToString(bos.array())
      println("小於32的encode :"+result)
    } else {
      //rb.serializedSizeInBytes() 需要序列化的字節數
      val seriesByteSize: Int = rb.serializedSizeInBytes()
      //VarInt.varIntSize返回編碼需要的長度(二進制條件下:>>>)
      val varIntLen = VarInt.varIntSize(seriesByteSize)
      //初始化
      val initBuffer: ByteBuffer = ByteBuffer.allocate(1 + varIntLen + rb.serializedSizeInBytes())
      //字節高低序列,好像意思是在內存的存儲方式
      val bos = if (initBuffer.order eq ByteOrder.LITTLE_ENDIAN) initBuffer else initBuffer.slice.order(ByteOrder.LITTLE_ENDIAN)
      bos.put(new Integer(1).toByte)
      //TODO
      VarInt.putVarInt(rb.serializedSizeInBytes(), bos)
      val baos = new ByteArrayOutputStream()
      rb.serialize(new DataOutputStream(baos))
      bos.put(baos.toByteArray())
      val result: String = Base64.getEncoder.encodeToString(bos.array())
      println("大於32的encode :"+result)
    }
  }
}

 

 結果和Clickhouse的編解碼一致

 

5、利用sparkSql批量序列化RoaringBitmap,然后寫入clickhouse

https://github.com/niutaofan/spark_bitmap.git

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM