概述
所謂的BitMap算法就是位圖算法,簡單說就是用一個bit位來標記某個元素所對應的value,而key即是該元素,由於BitMap使用了bit位來存儲數據,因此可以大大節省存儲空間,這是很常用的數據結構,比如用於Bloom Filter中、用於無重復整數的排序等等。bitmap通常基於數組來實現,數組中每個元素可以看成是一系列二進制數,所有元素組成更大的二進制集合。
基本思想
我用一個簡單的例子來詳細介紹BitMap算法的原理。假設我們要對0-7內的5個元素(4,7,2,5,3)進行排序(這里假設元素沒有重復)。我們可以使用BitMap算法達到排序目的。要表示8個數,我們需要8個byte,
1.首先我們開辟一個字節(8byte)的空間,將這些空間的所有的byte位都設置為0
2.然后便利這5個元素,第一個元素是4,因為下邊從0開始,因此我們把第五個字節的值設置為1
3.然后再處理剩下的四個元素,最終8個字節的狀態如下圖
4.現在我們遍歷一次bytes區域,把值為1的byte的位置輸出(2,3,4,5,7),這樣便達到了排序的目的
從上面的例子我們可以看出,BitMap算法的思想還是比較簡單的,關鍵的問題是如何確定10進制的數到2進制的映射圖。
MAP映射:
假設需要排序或則查找的數的總數N=100000000,BitMap中1bit代表一個數字,1個int = 4Bytes = 4*8bit = 32 bit,那么N個數需要N/32 int空間。所以我們需要申請內存空間的大小為int a[1 + N/32],其中:a[0]在內存中占32為可以對應十進制數0-31,依次類推:
a[0]-----------------------------> 0-31
a[1]------------------------------> 32-63
a[2]-------------------------------> 64-95
a[3]--------------------------------> 96-127
那么十進制數如何轉換為對應的bit位,下面介紹用位移將十進制數轉換為對應的bit位:
1.求十進制數在對應數組a中的下標
十進制數0-31,對應在數組a[0]中,32-63對應在數組a[1]中,64-95對應在數組a[2]中………,使用數學歸納分析得出結論:對於一個十進制數n,其在數組a中的下標為:a[n/32]
2.求出十進制數在對應數a[i]中的下標
例如十進制數1在a[0]的下標為1,十進制數31在a[0]中下標為31,十進制數32在a[1]中下標為0。 在十進制0-31就對應0-31,而32-63則對應也是0-31,即給定一個數n可以通過模32求得在對應數組a[i]中的下標。
3.位移
對於一個十進制數n,對應在數組a[n/32][n%32]中,但數組a畢竟不是一個二維數組,我們通過移位操作實現置1
a[n/32] |= 1 << n % 32
移位操作:
a[n>>5] |= 1 << (n & 0x1F)
n & 0x1F 保留n的后五位 相當於 n % 32 求十進制數在數組a[i]中的下標。
BitMap簡單使用示例
用戶標簽使用BitMap的數據結構來存儲,比如表示用戶對應的標簽表如下所示:
如果使用標簽,也就是一個標簽對應多個用戶,如下所示,比較簡單一看就會:
讓每一個標簽存儲包含此標簽的所有用戶 ID,每一個標簽都是一個獨立的 Bitmap。這樣,實現用戶的去重和查詢統計,就變得一目了然:
對上面例子的使用:
在用戶群做交集和並集運算的時候,例如:
1,如何查找使用蘋果手機的程序員用戶?
2.如何查找所有男性或者00后的用戶?
3,同樣是剛才的例子,我們給定 90 后用戶的 Bitmap,再給定一個全量用戶的 Bitmap。最終要求出的是存在於全量用戶,但又不存在於 90 后用戶的部分。
如何求出呢?我們可以使用異或操作,即相同位為 0,不同位為 1。 (1^1)
BitMap的代碼實現
java實現:
1 /** 2 * ClassName BitMap4.java 3 * author Rhett.wang 4 * version 1.0.0 5 * Description TODO 6 * createTime 2020年01月24日 07:53:00 7 */ 8 public class BitMap4 { 9 //保存數據的 10 private byte[] bits; 11 12 //能夠存儲多少數據 13 private int capacity; 14 15 16 public BitMap4(int capacity){ 17 this.capacity = capacity; 18 19 //1bit能存儲8個數據,那么capacity數據需要多少個bit呢,capacity/8+1,右移3位相當於除以8 20 bits = new byte[(capacity >>3 )+1]; 21 } 22 23 public void add(int num){ 24 // num/8得到byte[]的index 25 int arrayIndex = num >> 3; 26 27 // num%8得到在byte[index]的位置 28 int position = num & 0x07; 29 30 //將1左移position后,那個位置自然就是1,然后和以前的數據做|,這樣,那個位置就替換成1了。 31 bits[arrayIndex] |= 1 << position; 32 } 33 34 public boolean contain(int num){ 35 // num/8得到byte[]的index 36 int arrayIndex = num >> 3; 37 38 // num%8得到在byte[index]的位置 39 int position = num & 0x07; 40 41 //將1左移position后,那個位置自然就是1,然后和以前的數據做&,判斷是否為0即可 42 return (bits[arrayIndex] & (1 << position)) !=0; 43 } 44 45 public void clear(int num){ 46 // num/8得到byte[]的index 47 int arrayIndex = num >> 3; 48 49 // num%8得到在byte[index]的位置 50 int position = num & 0x07; 51 52 //將1左移position后,那個位置自然就是1,然后對取反,再與當前值做&,即可清除當前的位置了. 53 bits[arrayIndex] &= ~(1 << position); 54 55 } 56 57 public static void main(String[] args) { 58 BitMap4 bitmap = new BitMap4(100); 59 bitmap.add(7); 60 System.out.println("插入7成功"); 61 62 boolean isexsit = bitmap.contain(7); 63 System.out.println("7是否存在:"+isexsit); 64 65 bitmap.clear(7); 66 isexsit = bitmap.contain(7); 67 System.out.println("7是否存在:"+isexsit); 68 } 69 }
PYTHON代碼實現:
1 class BitMap(): 2 def __init__(self,max): 3 self.size=int((max +31 -1)/31) 4 self.array=[0 for i in range(self.size)] 5 6 def bitindex(self,num): 7 return num%31 8 9 def set_1(self,num): 10 elemindex=(num//31) 11 byteindex=self.bitindex(num) 12 ele=self.array[elemindex] 13 self.array[elemindex] = ele| (1<< byteindex) 14 15 16 def test_1(self,i): 17 elemindex=(i//31) 18 bytearray= self.bitindex(i) 19 if self.array[elemindex] & (1 << bytearray): 20 return True 21 return False 22 23 if __name__ =="__main__": 24 Max = ord('z') 25 shuffle_array=[x for x in 'qwelajkda'] 26 ret =[] 27 bitmap =BitMap(Max) 28 for c in shuffle_array: 29 bitmap.set_1(ord(c)) 30 31 for i in range(Max+1): 32 if bitmap.test_1(i): 33 ret.append(chr(i)) 34 35 print(u'原始數組是:%s' % shuffle_array) 36 print(u'排序以后的數組是:%s' % ret)
scala代碼實現
1 /** 2 * ClassName BitMap.java 3 * author Rhett.wang 4 * version 1.0.0 5 * Description TODO 6 * createTime 2020年01月24日 10:30:00 7 */ 8 class BitMap(bitmap:Array[Byte], length:Int) { 9 10 } 11 object BitMap { 12 var bitmap:Array[Int]=Array() 13 def main(args: Array[String]) { 14 15 var bitmaps=TestBit(100) 16 setBit(32) 17 println(getBit(32)) 18 println(getBit(11)) 19 } 20 def TestBit(length:Int):Unit={ 21 bitmap= new Array[Int]((length >> 5).toInt + (if ((length & 31) > 0) 1 22 else 0)) 23 } 24 25 def getBit(index: Long): Int = { 26 var intData: Int = bitmap(((index - 1) >> 5).toInt) 27 var offset: Int = ((index - 1) & 31).toInt 28 return intData >> offset & 0x01 29 } 30 31 def setBit(index: Long) { 32 var belowIndex: Int = ((index - 1) >> 5).toInt 33 var offset: Int = ((index - 1) & 31).toInt 34 var inData: Int = bitmap(belowIndex) 35 bitmap(belowIndex) = inData | (0x01 << offset) 36 } 37 38 /* def clear(num:Int):Unit={ 39 var arrayIndex: Int = num >> 5 40 var position: Int = num & 0x1F 41 bitmap(arrayIndex) =(bitmap(arrayIndex) & ~(1 << position)).toByte 42 }*/ 43 44 45 }
Roaring BitMap的原理和使用
位圖索引被廣泛用於數據庫和搜索引擎中,通過利用位級並行,它們可以顯著加快查詢速度。但是,位圖索引會占用大量的內存,因此我們會更喜歡壓縮位圖索引。 Roaring Bitmaps 就是一種十分優秀的壓縮位圖索引,后文統稱 RBM。壓縮位圖索引有很多種,比如基於 RLE(Run-Length Encoding,運行長度編碼)的WAH (Word Aligned Hybrid Compression Scheme) 和 Concise (Compressed ‘n’ Composable Integer Set)。相比較前者, RBM 能提供更優秀的壓縮性能和更快的查詢效率。
RBM 的用途和 Bitmap 很差不多(比如說索引),只是說從性能、空間利用率各方面更優秀了。目前 RBM 已經在很多成熟的開源大數據平台中使用,簡單列幾個作為參考。
1 Apache Lucene and derivative systems such as Solr and Elasticsearch, 2 Metamarkets’ Druid, 3 Apache Spark, 4 Apache Hive, 5 eBay’s Apache Kylin, 6 ……
RBM 的主要思想並不復雜,簡單來講,有如下三條:
1,我們將 32-bit 的范圍 ([0, n)) 划分為 2^16 個桶,每一個桶有一個 Container 來存放一個數值的低16位;
2,在存儲和查詢數值的時候,我們將一個數值 k 划分為高 16 位(k % 2^16)
和低 16 位(k mod 2^16)
,取高 16 位找到對應的桶,然后在低 16 位存放在相應的 Container 中;
3,容器的話, RBM 使用兩種容器結構: Array Container 和 Bitmap Container。Array Container 存放稀疏的數據,Bitmap Container 存放稠密的數據。即,若一個 Container 里面的 Integer 數量小於 4096,就用 Short 類型的有序數組來存儲值。若大於 4096,就用 Bitmap 來存儲值。
如下圖,就是官網給出的一個例子,三個容器分別代表了三個數據集:
the list of the first 1000 multiples of 62
all integers [216, 216 + 100)
all even numbers in [2216, 3216)
舉例說明:
看完前面的還不知道在說什么?沒關系,舉個栗子說明就好了。現在我們要將 821697800 這個 32 bit 的整數插入 RBM 中,整個算法流程是這樣的:
821697800 對應的 16 進制數為 30FA1D08, 其中高 16 位為 30FA, 低16位為 1D08。
我們先用二分查找從一級索引(即 Container Array)中找到數值為 30FA 的容器(如果該容器不存在,則新建一個),從圖中我們可以看到,該容器是一個 Bitmap 容器。
找到了相應的容器后,看一下低 16 位的數值 1D08,它相當於是 7432,因此在 Bitmap 中找到相應的位置,將其置為 1 即可。
下面介紹到的是RoaringBitmap的核心,三種Container。
通過上面的介紹我們知道,每個32位整形的高16位已經作為key存儲在RoaringArray中了,那么Container只需要處理低16位的數據。
ArrayContainer
結構很簡單,只有一個short[] content,將16位value直接存儲。
short[] content始終保持有序,方便使用二分查找,且不會存儲重復數值。
因為這種Container存儲數據沒有任何壓縮,因此只適合存儲少量數據。
ArrayContainer占用的空間大小與存儲的數據量為線性關系,每個short為2字節,因此存儲了N個數據的ArrayContainer占用空間大致為2N字節。存儲一個數據占用2字節,存儲4096個數據占用8kb。
根據源碼可以看出,常量DEFAULT_MAX_SIZE值為4096,當容量超過這個值的時候會將當前Container替換為BitmapContainer。
BitmapContainer
這種Container使用long[]存儲位圖數據。我們知道,每個Container處理16位整形的數據,也就是0~65535,因此根據位 圖的原理,需要65536個比特來存儲數據,每個比特位用1來表示有,0來表示無。每個long有64位,因此需要1024個long來提供65536個 比特。
因此,每個BitmapContainer在構建時就會初始化長度為1024的long[]。這就意味着,不管一個BitmapContainer中只存儲了1個數據還是存儲了65536個數據,占用的空間都是同樣的8kb。
解釋一下為什么這里用的 4096 這個閾值?因為一個 Integer 的低 16 位是 2Byte,因此對應到 Arrary Container 中的話就是 2Byte * 4096 = 8KB;同樣,對於 Bitmap Container 來講,2^16 個 bit 也相當於是 8KB。
RunContainer
RunContainer中的Run指的是行程長度壓縮算法(Run Length Encoding),對連續數據有比較好的壓縮效果。
它的原理是,對於連續出現的數字,只記錄初始數字和后續數量。即:
對於數列11,它會壓縮為11,0;
對於數列11,12,13,14,15,它會壓縮為11,4;
對於數列11,12,13,14,15,21,22,它會壓縮為11,4,21,1;
源碼中的short[] valueslength中存儲的就是壓縮后的數據。
這種壓縮算法的性能和數據的連續性(緊湊性)關系極為密切,對於連續的100個short,它能從200字節壓縮為4字節,但對於完全不連續的100個short,編碼完之后反而會從200字節變為400字節。
如果要分析RunContainer的容量,我們可以做下面兩種極端的假設:
最好情況,即只存在一個數據或只存在一串連續數字,那么只會存儲2個short,占用4字節
最壞情況,0~65535的范圍內填充所有的奇數位(或所有偶數位),需要存儲65536個short,128kb
代碼測試示例:
1 import org.roaringbitmap.RoaringBitmap; 2 3 import java.util.function.Consumer; 4 5 /** 6 * ClassName RBitMap.java 7 * author Rhett.wang 8 * version 1.0.0 9 * Description TODO 10 * createTime 2020年01月25日 21:09:00 11 */ 12 public class RBitMap { 13 public static void main(String[] args) { 14 test1(); 15 } 16 private static void test1(){ 17 //向rr中添加1、2、3、1000四個數字 18 RoaringBitmap rr = RoaringBitmap.bitmapOf(1,2,3,1000); 19 //創建RoaringBitmap rr2 20 RoaringBitmap rr2 = new RoaringBitmap(); 21 //向rr2中添加10000-12000共2000個數字 22 rr2.add(10000L,12000L); 23 //返回第3個數字是1000,第0個數字是1,第1個數字是2,則第3個數字是1000 24 rr.select(3); 25 //返回value = 2 時的索引為 1。value = 1 時,索引是 0 ,value=3的索引為2 26 rr.rank(2); 27 //判斷是否包含1000 28 rr.contains(1000); // will return true 29 //判斷是否包含7 30 rr.contains(7); // will return false 31 32 //兩個RoaringBitmap進行or操作,數值進行合並,合並后產生新的RoaringBitmap叫rror 33 RoaringBitmap rror = RoaringBitmap.or(rr, rr2); 34 //rr與rr2進行位運算,並將值賦值給rr 35 rr.or(rr2); 36 //判斷rror與rr是否相等,顯然是相等的 37 boolean equals = rror.equals(rr); 38 if(!equals) throw new RuntimeException("bug"); 39 // 查看rr中存儲了多少個值,1,2,3,1000和10000-12000,共2004個數字 40 long cardinality = rr.getLongCardinality(); 41 System.out.println(cardinality); 42 //遍歷rr中的value 43 for(int i : rr) { 44 System.out.println(i); 45 } 46 //這種方式的遍歷比上面的方式更快 47 rr.forEach((Consumer<? super Integer>) i -> { 48 System.out.println(i.intValue()); 49 }); 50 } 51 }
在RoaringBitmap中,32位整數被分成了2^16個塊。任何一個32位整數的前16位決定放在哪個塊里。后16位就是放在這個塊里的內容。比如0xFFFF0000和0xFFFF0001,前16位都是FFFF,表明這兩個數應該放在一個塊里。后16位分別是0和1。在這個塊中指保存0和1就可以了,不需要保存完整的整數。
在最開始的時候,一個塊中包含一個長度為4的short數組,后16位所對應的值就存在這個short數組里。注意在插入的時候要保持順序性,這里就需要用到二分查找來加快速度了。如果當塊中的元素大於short數組的長度時,就需要重新分配更大的數組,把當前數組copy過去,並把新值插入對應的位置。擴展數組大小和STL中vector的方式類似,不過並不是完全的加倍,而且上限是4096,也就是說最多只保存4096個元素。那么問題來了,超過了4096怎么辦呢?
一個塊里最多可能需要存放2^16個元素,那么如果是用short來存放,最多需要65536個short,那么就是131072個byte。如果換一種方式,用位來存儲元素,那么就需要65536個bit,相當於1024個long型數組,即2048個int,也就是4096個short。
所以,當一個塊中元素數量小於等於4096的時候,用有序short數組來保存元素,而當元素數量大於4096的時候,用長度為1024的long數組來按位表示元素是否存在。
當bitmap中有多個塊的時候,塊的信息是用數組來保存的。這個數組同樣需要保持順序性,也是用二分查找找到一個塊的位置。所以,當一個整數過來之后,首先根據前16位計算出塊的key,然后在塊的數組中二分查找。找到的話,就把后16位保存在這個塊中。找不到,就創建一個新塊,把后16位保存在塊中,再把塊插入對應的位置。
大數據分析常用去重算法分析-Kylin
首先,請大家思考一個問題:在大數據處理領域中,什么環節是你最不希望見到的?以我的觀點來看,shuffle 是我最不願意見到的環節,因為一旦出現了非常多的 shuffle,就會占用大量的磁盤和網絡 IO,從而導致任務進行得非常緩慢。而今天我們所討論的去重分析,就是一個會產生非常多 shuffle 的場景,先大概介紹一下shuffle原理:
Shuffle描述着數據從map task輸出到reduce task輸入的這段過程。在分布式情況下,reduce task需要跨節點去拉取其它節點上的map task結果。這一過程將會產生網絡資源消耗和內存,磁盤IO的消耗。
先來看以下場景:
我們有一張商品訪問表,表上有 item 和 user_id 兩個列,我們希望求商品的 UV,這是去重非常典型的一個場景。我們的數據是存儲在分布式平台上的,分別在數據節點 1 和 2 上。
我們從物理執行層面上想一下這句 SQL 背后會發生什么故事:首先分布式計算框架啟動任務, 從兩個節點上去拿數據, 因為 SQL group by 了 item 列, 所以需要以 item 為 key 對兩個表中的原始數據進行一次 shuffle。我們來看看需要 shuffle 哪些數據:因為 select/group by了 item,所以 item 需要 shuffle 。但是,user_id 我們只需要它的一個統計值,能不能不 shuffle 整個 user_id 的原始值呢?
如果只是簡單的求 count 的話, 每個數據節點分別求出對應 item 的 user_id 的 count, 然后只要 shuffle 這個 count 就行了,因為count 只是一個數字, 所以 shuffle 的量非常小。但是由於分析的指標是 count distinct,我們不能簡單相加兩個節點user_id 的 count distinct 值,我們只有得到一個 key 對應的所有 user_id 才能統計出正確的 count distinct值,而這些值原先可能分布在不同的節點上,所以我們只能通過 shuffle 把這些值收集到同一個節點上再做去重。而當 user_id 這一列的數據量非常大的時候,需要 shuffle 的數據量也會非常大。我們其實最后只需要一個 count 值,那么有辦法可以不 shuffle 整個列的原始值嗎?我下面要介紹的兩種算法就提供了這樣的一種思路,使用更少的信息位,同樣能夠求出該列不重復元素的個數(基數)
第一種要介紹的算法是一種精確的去重算法,主要利用了 Bitmap 的原理。Bitmap 也稱之為 Bitset,它本質上是定義了一個很大的 bit 數組,每個元素對應到 bit 數組的其中一位。例如有一個集合[2,3,5,8]對應的 Bitmap 數組是[001101001],集合中的 2 對應到數組 index 為 2 的位置,3 對應到 index 為 3 的位置,下同,得到的這樣一個數組,我們就稱之為 Bitmap。很直觀的,數組中 1 的數量就是集合的基數。追本溯源,我們的目的是用更小的存儲去表示更多的信息,而在計算機最小的信息單位是 bit,如果能夠用一個 bit 來表示集合中的一個元素,比起原始元素,可以節省非常多的存儲。
這就是最基礎的 Bitmap,我們可以把 Bitmap 想象成一個容器,我們知道一個 Integer 是32位的,如果一個 Bitmap 可以存放最多 Integer.MAX_VALUE 個值,那么這個 Bitmap 最少需要 32 的長度。一個 32 位長度的 Bitmap 占用的空間是512 M (2^32/8/1024/1024),這種 Bitmap 存在着非常明顯的問題:這種 Bitmap 中不論只有 1 個元素或者有 40 億個元素,它都需要占據 512 M 的空間。回到剛才求 UV 的場景,不是每一個商品都會有那么多的訪問,一些爆款可能會有上億的訪問,但是一些比較冷門的商品可能只有幾個用戶瀏覽,如果都用這種 Bitmap,它們占用的空間都是一樣大的,這顯然是不可接受的。
對於上節說的問題,有一種設計的非常的精巧 Bitmap,叫做 Roaring Bitmap,能夠很好地解決上面說的這個問題。我們還是以存放 Integer 值的 Bitmap 來舉例,Roaring Bitmap 把一個 32 位的 Integer 划分為高 16 位和低 16 位,取高 16 位找到該條數據所對應的 key,每個 key 都有自己的一個 Container。我們把剩余的低 16 位放入該 Container 中。依據不同的場景,有 3 種不同的 Container,分別是 Array Container、Bitmap Container 和 Run Container,下文將一一介紹。
首先第一種,是 Roaring Bitmap 初始化時默認的 Container,叫做 Array Container。Array Container 適合存放稀疏的數據,Array Container 內部的數據結構是一個 short array,這個 array 是有序的,方便查找。數組初始容量為 4,數組最大容量為 4096。超過最大容量 4096 時,會轉換為 Bitmap Container。這邊舉例來說明數據放入一個 Array Container 的過程:有 0xFFFF0000 和 0xFFFF0001 兩個數需要放到 Bitmap 中, 它們的前 16 位都是 FFFF,所以他們是同一個 key,它們的后 16 位存放在同一個 Container 中; 它們的后 16 位分別是 0 和 1, 在 Array Container 的數組中分別保存 0 和 1 就可以了,相較於原始的 Bitmap 需要占用 512M 內存來存儲這兩個數,這種存放實際只占用了 2+4=6 個字節(key 占 2 Bytes,兩個 value 占 4 Bytes,不考慮數組的初始容量)。
第二種 Container 是 Bitmap Container,其原理就是上文說的 Bitmap。它的數據結構是一個 long 的數組,數組容量固定為 1024,和上文的 Array Container 不同,Array Container 是一個動態擴容的數組。這邊推導下 1024 這個值:由於每個 Container 還需處理剩余的后 16 位數據,使用 Bitmap 來存儲需要 8192 Bytes(2^16/8), 而一個 long 值占 8 個 Bytes,所以一共需要 1024(8192/8)個 long 值。所以一個 Bitmap container 固定占用內存 8 KB(1024 * 8 Byte)。當 Array Container 中元素到 4096 個時,也恰好占用 8 k(4096*2Bytes)的空間,正好等於 Bitmap 所占用的 8 KB。而當你存放的元素個數超過 4096 的時候,Array Container 的大小占用還是會線性的增長,但是 Bitmap Container 的內存空間並不會增長,始終還是占用 8 K,所以當 Array Container 超過最大容量(DEFAULT_MAX_SIZE)會轉換為 Bitmap Container。
我們自己在 Kylin 中實踐使用 Roaring Bitmap 時,我們發現 Array Container 隨着數據量的增加會不停地 resize 自己的數組,而 Java 數組的 resize 其實非常消耗性能,因為它會不停地申請新的內存,同時老的內存在復制完成前也不會釋放,導致內存占用變高,所以我們建議把 DEFAULT_MAX_SIZE 調得低一點,調成 1024 或者 2048,減少 Array Container 后期 reszie 數組的次數和開銷。
最后一種 Container 叫做Run Container,這種 Container 適用於存放連續的數據。比如說 1 到 100,一共 100 個數,這種類型的數據稱為連續的數據。這邊的Run指的是Run Length Encoding(RLE),它對連續數據有比較好的壓縮效果。原理是對於連續出現的數字, 只記錄初始數字和后續數量。例如: 對於 [11, 12, 13, 14, 15, 21, 22],會被記錄為 11, 4, 21, 1。很顯然,該 Container 的存儲占用與數據的分布緊密相關。最好情況是如果數據是連續分布的,就算是存放 65536 個元素,也只會占用 2 個 short。而最壞的情況就是當數據全部不連續的時候,會占用 128 KB 內存。
總結:用一張圖來總結3種 Container 所占的存儲空間,可以看到元素個數達到 4096 之前,選用 Array Container 的收益是最好的,當元素個數超過了 4096 時,Array Container 所占用的空間還是線性的增長,而 Bitmap Container 的存儲占用則與數據量無關,這個時候 Bitmap Container 的收益就會更好。而 Run Container 占用的存儲大小完全看數據的連續性, 因此只能畫出一個上下限范圍 [4 Bytes, 128 KB]。
我們再來看一下Bitmap 在 Kylin 中的應用,Kylin 中編輯 measure 的時候,可以選擇 Count Distinct,且Return Type 選為 Precisely,點保存就可以了。但是事情沒有那么簡單,剛才上文在講 Bitmap 時,一直都有一個前提,放入的值都是數值類型,但是如果不是數值類型的值,它們不能夠直接放入 Bitmap,這時需要構建一個全區字典,做一個值到數值的映射,然后再放入 Bitmap 中。
在 Kylin 中構建全局字典,當列的基數非常高的時候,全局字典會成為一個性能的瓶頸。針對這種情況,社區也一直在努力做優化,這邊簡單介紹幾種優化的策略,更詳細的優化策略可以見文末的參考鏈接。
1)當一個列的值完全被另外一個列包含,而另一個列有全局字典,可以復用另一個列的全局字典
2)當精確去重指標不需要跨 Segment 聚合的時候,可以使用這個列的 Segment 字典代替(這個列需要字典編碼)。在 Kylin 中,Segment 就相當於時間分片的概念。當不會發生跨 Segments 的分析時,這個列的 Segment 字典就可以代替這個全局字典。
3)如果你的 cube 包含很多的精確去重指標,可以考慮將這些指標放到不同的列族上。不止是精確去重,像一些復雜 measure,我們都建議使用多個列族去存儲,可以提升查詢的性能。
快手BitBase數據庫
如上圖所示,首先將原始數據的一列的某個值抽象成 bitmap(比特數組),舉例:city=bj,city 是維度,bj (北京) 是維度值,抽象成 bitmap 值就是10100,表示第0個用戶在 bj,第1個用戶不在北京,依次類推。然后將多維度之間的組合轉換為 bitmap 計算:bitmap 之間做與、或、非、異或,舉例:比如在北京的用戶,且興趣是籃球,這樣的用戶有多少個,就轉換為圖中所示的兩個 bitmap 做與運算,得到橙色的 bitmap,最后,再對 bitmap 做 count 運算。count 表示統計“1”的個數,list 是列舉“1”所在的數組 index,業務上表示 userId。
SparkSQL自定義聚合函數(UDAF)實現bitmap函數
創建表:使用phoenix在HBase中創建測試表,字段使用VARBINARY類型
1 CREATE TABLE IF NOT EXISTS test_binary ( 2 date VARCHAR NOT NULL, 3 dist_mem VARBINARY 4 CONSTRAINT test_binary_pk PRIMARY KEY (date) 5 ) SALT_BUCKETS=6;
創建完成后使用RoaringBitmap序列化數據存入數據庫:
自定義代碼:
1 import org.apache.spark.sql.Row; 2 import org.apache.spark.sql.expressions.MutableAggregationBuffer; 3 import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; 4 import org.apache.spark.sql.types.DataType; 5 import org.apache.spark.sql.types.DataTypes; 6 import org.apache.spark.sql.types.StructField; 7 import org.apache.spark.sql.types.StructType; 8 import org.roaringbitmap.RoaringBitmap; 9 10 import java.io.*; 11 import java.util.ArrayList; 12 import java.util.List; 13 14 /** 15 * 實現自定義聚合函數Bitmap 16 */ 17 public class UdafBitMap extends UserDefinedAggregateFunction { 18 @Override 19 public StructType inputSchema() { 20 List<StructField> structFields = new ArrayList<>(); 21 structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true)); 22 return DataTypes.createStructType(structFields); 23 } 24 25 @Override 26 public StructType bufferSchema() { 27 List<StructField> structFields = new ArrayList<>(); 28 structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true)); 29 return DataTypes.createStructType(structFields); 30 } 31 32 @Override 33 public DataType dataType() { 34 return DataTypes.LongType; 35 } 36 37 @Override 38 public boolean deterministic() { 39 //是否強制每次執行的結果相同 40 return false; 41 } 42 43 @Override 44 public void initialize(MutableAggregationBuffer buffer) { 45 //初始化 46 buffer.update(0, null); 47 } 48 49 @Override 50 public void update(MutableAggregationBuffer buffer, Row input) { 51 // 相同的executor間的數據合並 52 // 1. 輸入為空直接返回不更新 53 Object in = input.get(0); 54 if(in == null){ 55 return ; 56 } 57 // 2. 源為空則直接更新值為輸入 58 byte[] inBytes = (byte[]) in; 59 Object out = buffer.get(0); 60 if(out == null){ 61 buffer.update(0, inBytes); 62 return ; 63 } 64 // 3. 源和輸入都不為空使用bitmap去重合並 65 byte[] outBytes = (byte[]) out; 66 byte[] result = outBytes; 67 RoaringBitmap outRR = new RoaringBitmap(); 68 RoaringBitmap inRR = new RoaringBitmap(); 69 try { 70 outRR.deserialize(new DataInputStream(new ByteArrayInputStream(outBytes))); 71 inRR.deserialize(new DataInputStream(new ByteArrayInputStream(inBytes))); 72 outRR.or(inRR); 73 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 74 outRR.serialize(new DataOutputStream(bos)); 75 result = bos.toByteArray(); 76 } catch (IOException e) { 77 e.printStackTrace(); 78 } 79 buffer.update(0, result); 80 } 81 82 @Override 83 public void merge(MutableAggregationBuffer buffer1, Row buffer2) { 84 //不同excutor間的數據合並 85 update(buffer1, buffer2); 86 } 87 88 @Override 89 public Object evaluate(Row buffer) { 90 //根據Buffer計算結果 91 long r = 0l; 92 Object val = buffer.get(0); 93 if (val != null) { 94 RoaringBitmap rr = new RoaringBitmap(); 95 try { 96 rr.deserialize(new DataInputStream(new ByteArrayInputStream((byte[]) val))); 97 r = rr.getLongCardinality(); 98 } catch (IOException e) { 99 e.printStackTrace(); 100 } 101 } 102 return r; 103 } 104 }
調用例子:
1 /** 2 * 使用自定義函數解析bitmap 3 * 4 * @param sparkSession 5 * @return 6 */ 7 private static void udafBitmap(SparkSession sparkSession) { 8 try { 9 Properties prop = PropUtil.loadProp(DB_PHOENIX_CONF_FILE); 10 // JDBC連接屬性 11 Properties connProp = new Properties(); 12 connProp.put("driver", prop.getProperty(DB_PHOENIX_DRIVER)); 13 connProp.put("user", prop.getProperty(DB_PHOENIX_USER)); 14 connProp.put("password", prop.getProperty(DB_PHOENIX_PASS)); 15 connProp.put("fetchsize", prop.getProperty(DB_PHOENIX_FETCHSIZE)); 16 // 注冊自定義聚合函數 17 sparkSession.udf().register("bitmap",new UdafBitMap()); 18 sparkSession 19 .read() 20 .jdbc(prop.getProperty(DB_PHOENIX_URL), "test_binary", connProp) 21 // sql中必須使用global_temp.表名,否則找不到 22 .createOrReplaceGlobalTempView("test_binary"); 23 //sparkSession.sql("select YEAR(TO_DATE(date)) year,bitmap(dist_mem) memNum from global_temp.test_binary group by YEAR(TO_DATE(date))").show(); 24 sparkSession.sql("select date,bitmap(dist_mem) memNum from global_temp.test_binary group by date").show(); 25 } catch (Exception e) { 26 e.printStackTrace(); 27 } 28 }
總結
感謝網絡大神的分享:技術共勉
https://kyligence.io/zh/blog/count-distinct-bitmap/
https://www.jianshu.com/p/ded6e8ecd0d1
https://blog.csdn.net/xiongbingcool/article/details/81282118
http://www.sohu.com/a/327627247_315839
https://blog.csdn.net/xywtalk/article/details/52590275
https://blog.csdn.net/qq_22520215/article/details/77893326