BitMap算法知識筆記以及在大數據方向的使用


概述

所謂的BitMap算法就是位圖算法,簡單說就是用一個bit位來標記某個元素所對應的value,而key即是該元素,由於BitMap使用了bit位來存儲數據,因此可以大大節省存儲空間,這是很常用的數據結構,比如用於Bloom Filter中、用於無重復整數的排序等等。bitmap通常基於數組來實現,數組中每個元素可以看成是一系列二進制數,所有元素組成更大的二進制集合。

基本思想

 我用一個簡單的例子來詳細介紹BitMap算法的原理。假設我們要對0-7內的5個元素(4,7,2,5,3)進行排序(這里假設元素沒有重復)。我們可以使用BitMap算法達到排序目的。要表示8個數,我們需要8個byte,

  1.首先我們開辟一個字節(8byte)的空間,將這些空間的所有的byte位都設置為0

  2.然后便利這5個元素,第一個元素是4,因為下邊從0開始,因此我們把第五個字節的值設置為1

  3.然后再處理剩下的四個元素,最終8個字節的狀態如下圖

       4.現在我們遍歷一次bytes區域,把值為1的byte的位置輸出(2,3,4,5,7),這樣便達到了排序的目的

從上面的例子我們可以看出,BitMap算法的思想還是比較簡單的,關鍵的問題是如何確定10進制的數到2進制的映射圖。

MAP映射:

假設需要排序或則查找的數的總數N=100000000,BitMap中1bit代表一個數字,1個int = 4Bytes = 4*8bit = 32 bit,那么N個數需要N/32 int空間。所以我們需要申請內存空間的大小為int a[1 + N/32],其中:a[0]在內存中占32為可以對應十進制數0-31,依次類推:

  a[0]-----------------------------> 0-31

  a[1]------------------------------> 32-63

  a[2]-------------------------------> 64-95

  a[3]--------------------------------> 96-127

那么十進制數如何轉換為對應的bit位,下面介紹用位移將十進制數轉換為對應的bit位:

  1.求十進制數在對應數組a中的下標

  十進制數0-31,對應在數組a[0]中,32-63對應在數組a[1]中,64-95對應在數組a[2]中………,使用數學歸納分析得出結論:對於一個十進制數n,其在數組a中的下標為:a[n/32]

  2.求出十進制數在對應數a[i]中的下標

  例如十進制數1在a[0]的下標為1,十進制數31在a[0]中下標為31,十進制數32在a[1]中下標為0。 在十進制0-31就對應0-31,而32-63則對應也是0-31,即給定一個數n可以通過模32求得在對應數組a[i]中的下標。

  3.位移

  對於一個十進制數n,對應在數組a[n/32][n%32]中,但數組a畢竟不是一個二維數組,我們通過移位操作實現置1

  a[n/32] |= 1 << n % 32
  移位操作:
  a[n>>5] |= 1 << (n & 0x1F)

  n & 0x1F 保留n的后五位 相當於 n % 32 求十進制數在數組a[i]中的下標。

BitMap簡單使用示例

用戶標簽使用BitMap的數據結構來存儲,比如表示用戶對應的標簽表如下所示:

 如果使用標簽,也就是一個標簽對應多個用戶,如下所示,比較簡單一看就會:

 讓每一個標簽存儲包含此標簽的所有用戶 ID,每一個標簽都是一個獨立的 Bitmap。這樣,實現用戶的去重和查詢統計,就變得一目了然:

 對上面例子的使用:

在用戶群做交集和並集運算的時候,例如:

1,如何查找使用蘋果手機的程序員用戶?

 2.如何查找所有男性或者00后的用戶?

 3,同樣是剛才的例子,我們給定 90 后用戶的 Bitmap,再給定一個全量用戶的 Bitmap。最終要求出的是存在於全量用戶,但又不存在於 90 后用戶的部分。

 如何求出呢?我們可以使用異或操作,即相同位為 0,不同位為 1。 (1^1)

BitMap的代碼實現

java實現: 

 1 /**
 2  * ClassName BitMap4.java
 3  * author Rhett.wang
 4  * version 1.0.0
 5  * Description TODO
 6  * createTime 2020年01月24日 07:53:00
 7  */
 8 public class BitMap4 {
 9     //保存數據的
10     private byte[] bits;
11 
12     //能夠存儲多少數據
13     private int capacity;
14 
15 
16     public BitMap4(int capacity){
17         this.capacity = capacity;
18 
19         //1bit能存儲8個數據,那么capacity數據需要多少個bit呢,capacity/8+1,右移3位相當於除以8
20         bits = new byte[(capacity >>3 )+1];
21     }
22 
23     public void add(int num){
24         // num/8得到byte[]的index
25         int arrayIndex = num >> 3;
26 
27         // num%8得到在byte[index]的位置
28         int position = num & 0x07;
29 
30         //將1左移position后,那個位置自然就是1,然后和以前的數據做|,這樣,那個位置就替換成1了。
31         bits[arrayIndex] |= 1 << position;
32     }
33 
34     public boolean contain(int num){
35         // num/8得到byte[]的index
36         int arrayIndex = num >> 3;
37 
38         // num%8得到在byte[index]的位置
39         int position = num & 0x07;
40 
41         //將1左移position后,那個位置自然就是1,然后和以前的數據做&,判斷是否為0即可
42         return (bits[arrayIndex] & (1 << position)) !=0;
43     }
44 
45     public void clear(int num){
46         // num/8得到byte[]的index
47         int arrayIndex = num >> 3;
48 
49         // num%8得到在byte[index]的位置
50         int position = num & 0x07;
51 
52         //將1左移position后,那個位置自然就是1,然后對取反,再與當前值做&,即可清除當前的位置了.
53         bits[arrayIndex] &= ~(1 << position);
54 
55     }
56 
57     public static void main(String[] args) {
58         BitMap4 bitmap = new BitMap4(100);
59         bitmap.add(7);
60         System.out.println("插入7成功");
61 
62         boolean isexsit = bitmap.contain(7);
63         System.out.println("7是否存在:"+isexsit);
64 
65         bitmap.clear(7);
66         isexsit = bitmap.contain(7);
67         System.out.println("7是否存在:"+isexsit);
68     }
69 }

PYTHON代碼實現:

 1 class BitMap():
 2     def __init__(self,max):
 3         self.size=int((max +31 -1)/31)
 4         self.array=[0 for i in range(self.size)]
 5 
 6     def bitindex(self,num):
 7         return num%31
 8 
 9     def set_1(self,num):
10         elemindex=(num//31)
11         byteindex=self.bitindex(num)
12         ele=self.array[elemindex]
13         self.array[elemindex] = ele| (1<< byteindex)
14 
15 
16     def test_1(self,i):
17         elemindex=(i//31)
18         bytearray= self.bitindex(i)
19         if self.array[elemindex] & (1 << bytearray):
20             return True
21         return False
22 
23 if __name__ =="__main__":
24     Max = ord('z')
25     shuffle_array=[x for x in 'qwelajkda']
26     ret =[]
27     bitmap =BitMap(Max)
28     for c in shuffle_array:
29         bitmap.set_1(ord(c))
30 
31     for i in range(Max+1):
32         if bitmap.test_1(i):
33             ret.append(chr(i))
34 
35     print(u'原始數組是:%s' % shuffle_array)
36     print(u'排序以后的數組是:%s' % ret)

scala代碼實現

 1 /**
 2   * ClassName BitMap.java
 3   * author Rhett.wang
 4   * version 1.0.0
 5   * Description TODO
 6   * createTime 2020年01月24日 10:30:00
 7   */
 8 class BitMap(bitmap:Array[Byte], length:Int) {
 9 
10 }
11 object BitMap {
12   var bitmap:Array[Int]=Array()
13   def main(args: Array[String]) {
14 
15     var bitmaps=TestBit(100)
16     setBit(32)
17     println(getBit(32))
18     println(getBit(11))
19   }
20   def TestBit(length:Int):Unit={
21     bitmap= new Array[Int]((length >> 5).toInt + (if ((length & 31) > 0) 1
22     else 0))
23   }
24 
25   def getBit(index: Long): Int = {
26     var intData: Int = bitmap(((index - 1) >> 5).toInt)
27     var offset: Int = ((index - 1) & 31).toInt
28     return intData >> offset & 0x01
29   }
30 
31   def setBit(index: Long) {
32     var belowIndex: Int = ((index - 1) >> 5).toInt
33     var offset: Int = ((index - 1) & 31).toInt
34     var inData: Int = bitmap(belowIndex)
35     bitmap(belowIndex) = inData | (0x01 << offset)
36   }
37 
38    /* def clear(num:Int):Unit={
39       var arrayIndex: Int = num >> 5
40       var position: Int = num & 0x1F
41       bitmap(arrayIndex) =(bitmap(arrayIndex) & ~(1 << position)).toByte
42     }*/
43 
44 
45 }

Roaring BitMap的原理和使用

位圖索引被廣泛用於數據庫和搜索引擎中,通過利用位級並行,它們可以顯著加快查詢速度。但是,位圖索引會占用大量的內存,因此我們會更喜歡壓縮位圖索引。 Roaring Bitmaps 就是一種十分優秀的壓縮位圖索引,后文統稱 RBM。壓縮位圖索引有很多種,比如基於 RLE(Run-Length Encoding運行長度編碼)的WAH (Word Aligned Hybrid Compression Scheme) 和 Concise (Compressed ‘n’ Composable Integer Set)。相比較前者, RBM 能提供更優秀的壓縮性能和更快的查詢效率。

RBM 的用途和 Bitmap 很差不多(比如說索引),只是說從性能、空間利用率各方面更優秀了。目前 RBM 已經在很多成熟的開源大數據平台中使用,簡單列幾個作為參考。

1 Apache Lucene and derivative systems such as Solr and Elasticsearch,
2 Metamarkets’ Druid,
3 Apache Spark,
4 Apache Hive,
5 eBay’s Apache Kylin,
6 ……

RBM 的主要思想並不復雜,簡單來講,有如下三條:

1,我們將 32-bit 的范圍 ([0, n)) 划分為 2^16 個桶,每一個桶有一個 Container 來存放一個數值的低16位;

2,在存儲和查詢數值的時候,我們將一個數值 k 划分為高 16 位(k % 2^16)和低 16 位(k mod 2^16),取高 16 位找到對應的桶,然后在低 16 位存放在相應的 Container 中;

3,容器的話, RBM 使用兩種容器結構: Array Container 和 Bitmap Container。Array Container 存放稀疏的數據,Bitmap Container 存放稠密的數據。即,若一個 Container 里面的 Integer 數量小於 4096,就用 Short 類型的有序數組來存儲值。若大於 4096,就用 Bitmap 來存儲值。

如下圖,就是官網給出的一個例子,三個容器分別代表了三個數據集:

the list of the first 1000 multiples of 62

all integers [216, 216 + 100)

all even numbers in [2216, 3216)

舉例說明:

看完前面的還不知道在說什么?沒關系,舉個栗子說明就好了。現在我們要將 821697800 這個 32 bit 的整數插入 RBM 中,整個算法流程是這樣的:

821697800 對應的 16 進制數為 30FA1D08, 其中高 16 位為 30FA, 低16位為 1D08。

我們先用二分查找從一級索引(即 Container Array)中找到數值為 30FA 的容器(如果該容器不存在,則新建一個),從圖中我們可以看到,該容器是一個 Bitmap 容器。

找到了相應的容器后,看一下低 16 位的數值 1D08,它相當於是 7432,因此在 Bitmap 中找到相應的位置,將其置為 1 即可。

下面介紹到的是RoaringBitmap的核心,三種Container。

通過上面的介紹我們知道,每個32位整形的高16位已經作為key存儲在RoaringArray中了,那么Container只需要處理低16位的數據。

ArrayContainer

結構很簡單,只有一個short[] content,將16位value直接存儲。

short[] content始終保持有序,方便使用二分查找,且不會存儲重復數值。

因為這種Container存儲數據沒有任何壓縮,因此只適合存儲少量數據。

ArrayContainer占用的空間大小與存儲的數據量為線性關系,每個short為2字節,因此存儲了N個數據的ArrayContainer占用空間大致為2N字節。存儲一個數據占用2字節,存儲4096個數據占用8kb。

根據源碼可以看出,常量DEFAULT_MAX_SIZE值為4096,當容量超過這個值的時候會將當前Container替換為BitmapContainer。

BitmapContainer

這種Container使用long[]存儲位圖數據。我們知道,每個Container處理16位整形的數據,也就是0~65535,因此根據位 圖的原理,需要65536個比特來存儲數據,每個比特位用1來表示有,0來表示無。每個long有64位,因此需要1024個long來提供65536個 比特。

因此,每個BitmapContainer在構建時就會初始化長度為1024的long[]。這就意味着,不管一個BitmapContainer中只存儲了1個數據還是存儲了65536個數據,占用的空間都是同樣的8kb。

解釋一下為什么這里用的 4096 這個閾值?因為一個 Integer 的低 16 位是 2Byte,因此對應到 Arrary Container 中的話就是 2Byte * 4096 = 8KB;同樣,對於 Bitmap Container 來講,2^16 個 bit 也相當於是 8KB。

RunContainer

RunContainer中的Run指的是行程長度壓縮算法(Run Length Encoding),對連續數據有比較好的壓縮效果。

它的原理是,對於連續出現的數字,只記錄初始數字和后續數量。即:

對於數列11,它會壓縮為11,0;
對於數列11,12,13,14,15,它會壓縮為11,4;
對於數列11,12,13,14,15,21,22,它會壓縮為11,4,21,1;
源碼中的short[] valueslength中存儲的就是壓縮后的數據。

這種壓縮算法的性能和數據的連續性(緊湊性)關系極為密切,對於連續的100個short,它能從200字節壓縮為4字節,但對於完全不連續的100個short,編碼完之后反而會從200字節變為400字節。

如果要分析RunContainer的容量,我們可以做下面兩種極端的假設:

最好情況,即只存在一個數據或只存在一串連續數字,那么只會存儲2個short,占用4字節

最壞情況,0~65535的范圍內填充所有的奇數位(或所有偶數位),需要存儲65536個short,128kb

代碼測試示例:

 1 import org.roaringbitmap.RoaringBitmap;
 2 
 3 import java.util.function.Consumer;
 4 
 5 /**
 6  * ClassName RBitMap.java
 7  * author Rhett.wang
 8  * version 1.0.0
 9  * Description TODO
10  * createTime 2020年01月25日 21:09:00
11  */
12 public class RBitMap {
13     public static void main(String[] args) {
14         test1();
15     }
16     private static void test1(){
17         //向rr中添加1、2、3、1000四個數字
18         RoaringBitmap rr = RoaringBitmap.bitmapOf(1,2,3,1000);
19         //創建RoaringBitmap rr2
20         RoaringBitmap rr2 = new RoaringBitmap();
21         //向rr2中添加10000-12000共2000個數字
22         rr2.add(10000L,12000L);
23         //返回第3個數字是1000,第0個數字是1,第1個數字是2,則第3個數字是1000
24         rr.select(3);
25         //返回value = 2 時的索引為 1。value = 1 時,索引是 0 ,value=3的索引為2
26         rr.rank(2);
27         //判斷是否包含1000
28         rr.contains(1000); // will return true
29         //判斷是否包含7
30         rr.contains(7); // will return false
31 
32         //兩個RoaringBitmap進行or操作,數值進行合並,合並后產生新的RoaringBitmap叫rror
33         RoaringBitmap rror = RoaringBitmap.or(rr, rr2);
34         //rr與rr2進行位運算,並將值賦值給rr
35         rr.or(rr2);
36         //判斷rror與rr是否相等,顯然是相等的
37         boolean equals = rror.equals(rr);
38         if(!equals) throw new RuntimeException("bug");
39         // 查看rr中存儲了多少個值,1,2,3,1000和10000-12000,共2004個數字
40         long cardinality = rr.getLongCardinality();
41         System.out.println(cardinality);
42         //遍歷rr中的value
43         for(int i : rr) {
44             System.out.println(i);
45         }
46         //這種方式的遍歷比上面的方式更快
47         rr.forEach((Consumer<? super Integer>) i -> {
48             System.out.println(i.intValue());
49         });
50     }
51 }

在RoaringBitmap中,32位整數被分成了2^16個塊。任何一個32位整數的前16位決定放在哪個塊里。后16位就是放在這個塊里的內容。比如0xFFFF0000和0xFFFF0001,前16位都是FFFF,表明這兩個數應該放在一個塊里。后16位分別是0和1。在這個塊中指保存0和1就可以了,不需要保存完整的整數。

在最開始的時候,一個塊中包含一個長度為4的short數組,后16位所對應的值就存在這個short數組里。注意在插入的時候要保持順序性,這里就需要用到二分查找來加快速度了。如果當塊中的元素大於short數組的長度時,就需要重新分配更大的數組,把當前數組copy過去,並把新值插入對應的位置。擴展數組大小和STL中vector的方式類似,不過並不是完全的加倍,而且上限是4096,也就是說最多只保存4096個元素。那么問題來了,超過了4096怎么辦呢?

一個塊里最多可能需要存放2^16個元素,那么如果是用short來存放,最多需要65536個short,那么就是131072個byte。如果換一種方式,用位來存儲元素,那么就需要65536個bit,相當於1024個long型數組,即2048個int,也就是4096個short。

所以,當一個塊中元素數量小於等於4096的時候,用有序short數組來保存元素,而當元素數量大於4096的時候,用長度為1024的long數組來按位表示元素是否存在。

當bitmap中有多個塊的時候,塊的信息是用數組來保存的。這個數組同樣需要保持順序性,也是用二分查找找到一個塊的位置。所以,當一個整數過來之后,首先根據前16位計算出塊的key,然后在塊的數組中二分查找。找到的話,就把后16位保存在這個塊中。找不到,就創建一個新塊,把后16位保存在塊中,再把塊插入對應的位置。

 大數據分析常用去重算法分析-Kylin

首先,請大家思考一個問題:在大數據處理領域中,什么環節是你最不希望見到的?以我的觀點來看,shuffle 是我最不願意見到的環節,因為一旦出現了非常多的 shuffle,就會占用大量的磁盤和網絡 IO,從而導致任務進行得非常緩慢。而今天我們所討論的去重分析,就是一個會產生非常多 shuffle 的場景,先大概介紹一下shuffle原理:

Shuffle描述着數據從map task輸出到reduce task輸入的這段過程。在分布式情況下,reduce task需要跨節點去拉取其它節點上的map task結果。這一過程將會產生網絡資源消耗和內存,磁盤IO的消耗。

先來看以下場景:

我們有一張商品訪問表,表上有 item 和 user_id 兩個列,我們希望求商品的 UV,這是去重非常典型的一個場景。我們的數據是存儲在分布式平台上的,分別在數據節點 1 和 2 上。

我們從物理執行層面上想一下這句 SQL 背后會發生什么故事:首先分布式計算框架啟動任務, 從兩個節點上去拿數據, 因為 SQL group by 了 item 列, 所以需要以 item 為 key 對兩個表中的原始數據進行一次 shuffle。我們來看看需要 shuffle 哪些數據:因為 select/group by了 item,所以 item 需要 shuffle 。但是,user_id  我們只需要它的一個統計值,能不能不 shuffle 整個 user_id 的原始值呢?

如果只是簡單的求 count 的話, 每個數據節點分別求出對應 item 的 user_id 的 count, 然后只要 shuffle 這個 count 就行了,因為count 只是一個數字, 所以 shuffle 的量非常小。但是由於分析的指標是 count distinct,我們不能簡單相加兩個節點user_id 的 count distinct 值,我們只有得到一個 key 對應的所有 user_id 才能統計出正確的 count distinct值,而這些值原先可能分布在不同的節點上,所以我們只能通過 shuffle 把這些值收集到同一個節點上再做去重。而當 user_id 這一列的數據量非常大的時候,需要 shuffle 的數據量也會非常大。我們其實最后只需要一個 count 值,那么有辦法可以不 shuffle 整個列的原始值嗎?我下面要介紹的兩種算法就提供了這樣的一種思路,使用更少的信息位,同樣能夠求出該列不重復元素的個數(基數)

第一種要介紹的算法是一種精確的去重算法,主要利用了 Bitmap 的原理。Bitmap 也稱之為 Bitset,它本質上是定義了一個很大的 bit 數組,每個元素對應到 bit 數組的其中一位。例如有一個集合[2,3,5,8]對應的 Bitmap 數組是[001101001],集合中的 2 對應到數組 index 為 2 的位置,3 對應到 index 為 3 的位置,下同,得到的這樣一個數組,我們就稱之為 Bitmap。很直觀的,數組中 1 的數量就是集合的基數。追本溯源,我們的目的是用更小的存儲去表示更多的信息,而在計算機最小的信息單位是 bit,如果能夠用一個 bit 來表示集合中的一個元素,比起原始元素,可以節省非常多的存儲。

這就是最基礎的 Bitmap,我們可以把 Bitmap 想象成一個容器,我們知道一個 Integer 是32位的,如果一個 Bitmap 可以存放最多 Integer.MAX_VALUE 個值,那么這個 Bitmap 最少需要 32 的長度。一個 32 位長度的 Bitmap 占用的空間是512 M (2^32/8/1024/1024),這種 Bitmap 存在着非常明顯的問題:這種 Bitmap 中不論只有 1 個元素或者有 40 億個元素,它都需要占據 512 M 的空間。回到剛才求 UV 的場景,不是每一個商品都會有那么多的訪問,一些爆款可能會有上億的訪問,但是一些比較冷門的商品可能只有幾個用戶瀏覽,如果都用這種 Bitmap,它們占用的空間都是一樣大的,這顯然是不可接受的。

 對於上節說的問題,有一種設計的非常的精巧 Bitmap,叫做 Roaring Bitmap,能夠很好地解決上面說的這個問題。我們還是以存放 Integer 值的 Bitmap 來舉例,Roaring Bitmap 把一個 32 位的 Integer 划分為高 16 位和低 16 位,取高 16 位找到該條數據所對應的 key,每個 key 都有自己的一個 Container。我們把剩余的低 16 位放入該 Container 中。依據不同的場景,有 3 種不同的 Container,分別是 Array Container、Bitmap Container 和 Run Container,下文將一一介紹。

 首先第一種,是 Roaring Bitmap 初始化時默認的 Container,叫做 Array Container。Array Container 適合存放稀疏的數據,Array Container 內部的數據結構是一個 short array,這個 array 是有序的,方便查找。數組初始容量為 4,數組最大容量為 4096。超過最大容量 4096 時,會轉換為 Bitmap Container。這邊舉例來說明數據放入一個 Array Container 的過程:有 0xFFFF0000 和 0xFFFF0001 兩個數需要放到 Bitmap 中, 它們的前 16 位都是 FFFF,所以他們是同一個 key,它們的后 16 位存放在同一個 Container 中; 它們的后 16 位分別是 0 和 1, 在 Array Container 的數組中分別保存 0 和 1 就可以了,相較於原始的 Bitmap 需要占用 512M 內存來存儲這兩個數,這種存放實際只占用了 2+4=6 個字節(key 占 2 Bytes,兩個 value 占 4 Bytes,不考慮數組的初始容量)。

第二種 Container 是 Bitmap Container,其原理就是上文說的 Bitmap。它的數據結構是一個 long 的數組,數組容量固定為 1024,和上文的 Array Container 不同,Array Container 是一個動態擴容的數組。這邊推導下 1024 這個值:由於每個 Container 還需處理剩余的后 16 位數據,使用 Bitmap 來存儲需要 8192 Bytes(2^16/8), 而一個 long 值占 8 個 Bytes,所以一共需要 1024(8192/8)個 long 值。所以一個 Bitmap container 固定占用內存 8 KB(1024 * 8 Byte)。當 Array Container 中元素到 4096 個時,也恰好占用 8 k(4096*2Bytes)的空間,正好等於 Bitmap 所占用的 8 KB。而當你存放的元素個數超過 4096 的時候,Array Container 的大小占用還是會線性的增長,但是 Bitmap Container 的內存空間並不會增長,始終還是占用 8 K,所以當 Array Container 超過最大容量(DEFAULT_MAX_SIZE)會轉換為 Bitmap Container。

我們自己在 Kylin 中實踐使用 Roaring Bitmap 時,我們發現 Array Container 隨着數據量的增加會不停地 resize 自己的數組,而 Java 數組的 resize 其實非常消耗性能,因為它會不停地申請新的內存,同時老的內存在復制完成前也不會釋放,導致內存占用變高,所以我們建議把 DEFAULT_MAX_SIZE 調得低一點,調成 1024 或者 2048,減少 Array Container 后期 reszie 數組的次數和開銷。

 最后一種 Container 叫做Run Container,這種 Container 適用於存放連續的數據。比如說 1 到 100,一共 100 個數,這種類型的數據稱為連續的數據。這邊的Run指的是Run Length Encoding(RLE),它對連續數據有比較好的壓縮效果。原理是對於連續出現的數字, 只記錄初始數字和后續數量。例如: 對於 [11, 12, 13, 14, 15, 21, 22],會被記錄為 11, 4, 21, 1。很顯然,該 Container 的存儲占用與數據的分布緊密相關。最好情況是如果數據是連續分布的,就算是存放 65536 個元素,也只會占用 2 個 short。而最壞的情況就是當數據全部不連續的時候,會占用 128 KB 內存。

 總結:用一張圖來總結3種 Container 所占的存儲空間,可以看到元素個數達到 4096 之前,選用 Array Container 的收益是最好的,當元素個數超過了 4096 時,Array Container 所占用的空間還是線性的增長,而 Bitmap Container 的存儲占用則與數據量無關,這個時候 Bitmap Container 的收益就會更好。而 Run Container 占用的存儲大小完全看數據的連續性, 因此只能畫出一個上下限范圍 [4 Bytes, 128 KB]。

 我們再來看一下Bitmap 在 Kylin 中的應用,Kylin 中編輯 measure 的時候,可以選擇 Count Distinct,且Return Type 選為 Precisely,點保存就可以了。但是事情沒有那么簡單,剛才上文在講 Bitmap 時,一直都有一個前提,放入的值都是數值類型,但是如果不是數值類型的值,它們不能夠直接放入 Bitmap,這時需要構建一個全區字典,做一個值到數值的映射,然后再放入 Bitmap 中。

在 Kylin 中構建全局字典,當列的基數非常高的時候,全局字典會成為一個性能的瓶頸。針對這種情況,社區也一直在努力做優化,這邊簡單介紹幾種優化的策略,更詳細的優化策略可以見文末的參考鏈接。

 1)當一個列的值完全被另外一個列包含,而另一個列有全局字典,可以復用另一個列的全局字典

 2)當精確去重指標不需要跨 Segment 聚合的時候,可以使用這個列的 Segment 字典代替(這個列需要字典編碼)。在 Kylin 中,Segment 就相當於時間分片的概念。當不會發生跨 Segments 的分析時,這個列的 Segment 字典就可以代替這個全局字典。

 3)如果你的 cube 包含很多的精確去重指標,可以考慮將這些指標放到不同的列族上。不止是精確去重,像一些復雜 measure,我們都建議使用多個列族去存儲,可以提升查詢的性能。

 快手BitBase數據庫

 如上圖所示,首先將原始數據的一列的某個值抽象成 bitmap(比特數組),舉例:city=bj,city 是維度,bj (北京) 是維度值,抽象成 bitmap 值就是10100,表示第0個用戶在 bj,第1個用戶不在北京,依次類推。然后將多維度之間的組合轉換為 bitmap 計算:bitmap 之間做與、或、非、異或,舉例:比如在北京的用戶,且興趣是籃球,這樣的用戶有多少個,就轉換為圖中所示的兩個 bitmap 做與運算,得到橙色的 bitmap,最后,再對 bitmap 做 count 運算。count 表示統計“1”的個數,list 是列舉“1”所在的數組 index,業務上表示 userId。

SparkSQL自定義聚合函數(UDAF)實現bitmap函數

創建表:使用phoenix在HBase中創建測試表,字段使用VARBINARY類型

1 CREATE TABLE IF NOT EXISTS test_binary (
2 date VARCHAR NOT NULL,
3 dist_mem VARBINARY
4  CONSTRAINT test_binary_pk PRIMARY KEY (date)
5  ) SALT_BUCKETS=6;

創建完成后使用RoaringBitmap序列化數據存入數據庫:

 自定義代碼:

  1 import org.apache.spark.sql.Row;
  2 import org.apache.spark.sql.expressions.MutableAggregationBuffer;
  3 import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
  4 import org.apache.spark.sql.types.DataType;
  5 import org.apache.spark.sql.types.DataTypes;
  6 import org.apache.spark.sql.types.StructField;
  7 import org.apache.spark.sql.types.StructType;
  8 import org.roaringbitmap.RoaringBitmap;
  9  
 10 import java.io.*;
 11 import java.util.ArrayList;
 12 import java.util.List;
 13  
 14 /**
 15  * 實現自定義聚合函數Bitmap
 16  */
 17 public class UdafBitMap extends UserDefinedAggregateFunction {
 18     @Override
 19     public StructType inputSchema() {
 20         List<StructField> structFields = new ArrayList<>();
 21         structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
 22         return DataTypes.createStructType(structFields);
 23     }
 24  
 25     @Override
 26     public StructType bufferSchema() {
 27         List<StructField> structFields = new ArrayList<>();
 28         structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
 29         return DataTypes.createStructType(structFields);
 30     }
 31  
 32     @Override
 33     public DataType dataType() {
 34         return DataTypes.LongType;
 35     }
 36  
 37     @Override
 38     public boolean deterministic() {
 39         //是否強制每次執行的結果相同
 40         return false;
 41     }
 42  
 43     @Override
 44     public void initialize(MutableAggregationBuffer buffer) {
 45         //初始化
 46         buffer.update(0, null);
 47     }
 48  
 49     @Override
 50     public void update(MutableAggregationBuffer buffer, Row input) {
 51         // 相同的executor間的數據合並
 52         // 1. 輸入為空直接返回不更新
 53         Object in = input.get(0);
 54         if(in == null){
 55             return ;
 56         }
 57         // 2. 源為空則直接更新值為輸入
 58         byte[] inBytes = (byte[]) in;
 59         Object out = buffer.get(0);
 60         if(out == null){
 61             buffer.update(0, inBytes);
 62             return ;
 63         }
 64         // 3. 源和輸入都不為空使用bitmap去重合並
 65         byte[] outBytes = (byte[]) out;
 66         byte[] result = outBytes;
 67         RoaringBitmap outRR = new RoaringBitmap();
 68         RoaringBitmap inRR = new RoaringBitmap();
 69         try {
 70             outRR.deserialize(new DataInputStream(new ByteArrayInputStream(outBytes)));
 71             inRR.deserialize(new DataInputStream(new ByteArrayInputStream(inBytes)));
 72             outRR.or(inRR);
 73             ByteArrayOutputStream bos = new ByteArrayOutputStream();
 74             outRR.serialize(new DataOutputStream(bos));
 75             result = bos.toByteArray();
 76         } catch (IOException e) {
 77             e.printStackTrace();
 78         }
 79         buffer.update(0, result);
 80     }
 81  
 82     @Override
 83     public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
 84         //不同excutor間的數據合並
 85         update(buffer1, buffer2);
 86     }
 87  
 88     @Override
 89     public Object evaluate(Row buffer) {
 90         //根據Buffer計算結果
 91         long r = 0l;
 92         Object val = buffer.get(0);
 93         if (val != null) {
 94             RoaringBitmap rr = new RoaringBitmap();
 95             try {
 96                 rr.deserialize(new DataInputStream(new ByteArrayInputStream((byte[]) val)));
 97                 r = rr.getLongCardinality();
 98             } catch (IOException e) {
 99                 e.printStackTrace();
100             }
101         }
102         return r;
103     }
104 }

調用例子:

 1  /**
 2      * 使用自定義函數解析bitmap
 3      *
 4      * @param sparkSession
 5      * @return
 6      */
 7     private static void udafBitmap(SparkSession sparkSession) {
 8         try {
 9             Properties prop = PropUtil.loadProp(DB_PHOENIX_CONF_FILE);
10             // JDBC連接屬性
11             Properties connProp = new Properties();
12             connProp.put("driver", prop.getProperty(DB_PHOENIX_DRIVER));
13             connProp.put("user", prop.getProperty(DB_PHOENIX_USER));
14             connProp.put("password", prop.getProperty(DB_PHOENIX_PASS));
15             connProp.put("fetchsize", prop.getProperty(DB_PHOENIX_FETCHSIZE));
16             // 注冊自定義聚合函數
17             sparkSession.udf().register("bitmap",new UdafBitMap());
18             sparkSession
19                     .read()
20                     .jdbc(prop.getProperty(DB_PHOENIX_URL), "test_binary", connProp)
21                     // sql中必須使用global_temp.表名,否則找不到
22                     .createOrReplaceGlobalTempView("test_binary");
23             //sparkSession.sql("select YEAR(TO_DATE(date)) year,bitmap(dist_mem) memNum from global_temp.test_binary group by YEAR(TO_DATE(date))").show();
24             sparkSession.sql("select date,bitmap(dist_mem) memNum from global_temp.test_binary group by date").show();
25         } catch (Exception e) {
26             e.printStackTrace();
27         }
28     }  

總結

感謝網絡大神的分享:技術共勉

https://kyligence.io/zh/blog/count-distinct-bitmap/

https://www.jianshu.com/p/ded6e8ecd0d1

https://blog.csdn.net/xiongbingcool/article/details/81282118

http://www.sohu.com/a/327627247_315839

https://blog.csdn.net/xywtalk/article/details/52590275

https://blog.csdn.net/qq_22520215/article/details/77893326

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM