在前一篇中介紹了使用API做Distinct Count,但是精確計算的API都較慢,那有沒有能更快的優化解決方案呢?
1. Bitmap介紹
《編程珠璣》上是這樣介紹bitmap的:
Bitmap是一個十分有用的數據結構。所謂的Bitmap就是用一個bit位來標記某個元素對應的Value,而Key即是該元素。由於采用了Bit為單位來存儲數據,因此在內存占用方面,可以大大節省。
簡而言之——用一個bit(0或1)表示某元素是否出現過,其在bitmap的位置對應於其index。《編程珠璣》給出了一個用bitmap做排序的例子:
/* Copyright (C) 1999 Lucent Technologies */
/* From 'Programming Pearls' by Jon Bentley */
/* bitsort.c -- bitmap sort from Column 1
* Sort distinct integers in the range [0..N-1]
*/
#include <stdio.h>
#define BITSPERWORD 32
#define SHIFT 5
#define MASK 0x1F
#define N 10000000
int a[1 + N / BITSPERWORD];
void set(int i) { a[i >> SHIFT] |= (1 << (i & MASK)); }
void clr(int i) { a[i >> SHIFT] &= ~(1 << (i & MASK)); }
int test(int i) { return a[i >> SHIFT] & (1 << (i & MASK)); }
int main() {
int i;
for (i = 0; i < N; i++)
clr(i);
/* Replace above 2 lines with below 3 for word-parallel init
int top = 1 + N/BITSPERWORD;
for (i = 0; i < top; i++)
a[i] = 0;
*/
while (scanf("%d", &i) != EOF)
set(i);
for (i = 0; i < N; i++)
if (test(i))
printf("%d\n", i);
return 0;
}
上面代碼中,用int的數組存儲bitmap,對於每一個待排序的int數,其對應的index為其int值。
2. Distinct Count優化
index生成
為了使用bitmap做Distinct Count,首先需得到每個用戶(uid)對應(在bitmap中)的index。有兩種辦法可以得到從1開始編號index表(與uid一一對應):
- hash,但是要找到無碰撞且hash值均勻分布[1, +∞)區間的hash函數是非常困難的;
- 維護一張uid與index之間的映射表,並增量更新
比較兩種方法,第二種方法更為簡單可行。
UV計算
在index生成完成后,RDD[(uid, V)]
與RDD[(uid, index)]
join得到index化的RDD。bitmap的開源實現有EWAH,采用RLE(Run Length Encoding)壓縮,很好地解決了存儲空間的浪費。Distinct Count計算轉變成了求bitmap中1的個數:
// distinct count for rdd(not pair) and the rdd must be sorted in each partition
def distinctCount(rdd: RDD[Int]): Int = {
val bitmap = rdd.aggregate[EWAHCompressedBitmap](new EWAHCompressedBitmap())(
(u: EWAHCompressedBitmap, v: Int) => {
u.set(v)
u
},
(u1: EWAHCompressedBitmap, u2: EWAHCompressedBitmap) => u1.or(u2)
)
bitmap.cardinality()
}
// the tuple_2 is the index
def groupCount[K: ClassTag](rdd: RDD[(K, Int)]): RDD[(K, Int)] = {
val grouped: RDD[(K, EWAHCompressedBitmap)] = rdd.combineByKey[EWAHCompressedBitmap](
(v: Int) => EWAHCompressedBitmap.bitmapOf(v),
(c: EWAHCompressedBitmap, v: Int) => {
c.set(v)
c
},
(c1: EWAHCompressedBitmap, c2: EWAHCompressedBitmap) => c1.or(c2))
grouped.map(t => (t._1, t._2.cardinality()))
}
但是,在上述計算中,由於EWAHCompressedBitmap的set方法要求int值是升序的,也就是說RDD的每一個partition的index應是升序排列:
// sort pair RDD by value
def sortPairRDD[K](rdd: RDD[(K, Int)]): RDD[(K, Int)] = {
rdd.mapPartitions(iter => {
iter.toArray.sortWith((x, y) => x._2.compare(y._2) < 0).iterator
})
}
為了避免排序,可以為每一個uid生成一個bitmap,然后在Distinct Count時將bitmap進行or運算亦可:
rdd.reduceByKey(_ or _)
.mapValues(_._2.cardinality())
3. 參考資料
[1] 周海鵬, Bitmap的秘密.