1. 背景
在構建精准用戶畫像時,面臨着這樣一個問題:日志采集不能成功地收集用戶的所有ID,且每條業務線有各自定義的UID用來標識用戶,從而造成了用戶ID的零碎化。因此,為了做用戶標簽的整合,用戶ID之間的強打通(亦稱為ID-Mapping)成了迫切的需求。大概三年前,在知乎上有這樣一個與之相類似的問題:如何用MR實現並查集以對海量數據pair做聚合;目前為止還無人解答。本文將提供一個可能的基於MR計算框架的解決方案,以實現大數據下的ID強打通。
首先,簡要地介紹下Android設備常見的ID:
- IMEI(International Mobile Equipment Identity),即通常所說的手機序列號、手機“串號”,用於在移動電話網絡中識別每一部獨立的手機等行動通訊裝置;序列號共有15位數字,前6位(TAC)是型號核准號碼,代表手機類型。接着2位(FAC)是最后裝配號,代表產地。后6位(SNR)是串號,代表生產順序號。最后1位(SP)一般為0,是檢驗碼,備用。
- MAC(Media Access Control)一般代指MAC位址,為網卡的標識,用來定義網絡設備的位置。
- IMSI(International Mobile SubscriberIdentification Number),儲存在SIM卡中,可用於區別移動用戶的有效信息;其總長度不超過15位,同樣使用0~9的數字。其中MCC是移動用戶所屬國家代號,占3位數字,中國的MCC規定為460;MNC是移動網號碼,最多由兩位數字組成,用於識別移動用戶所歸屬的移動通信網;MSIN是移動用戶識別碼,用以識別某一移動通信網中的移動用戶。
- Android ID是系統隨機生成的設備ID 為一串64位的編碼(十六進制的字符串),通過它可以知道設備的壽命(在設備恢復出廠設置或刷機后,該值可能會改變)。
- IDFA (Identifier for Advertisers) 是蘋果推出來的用於廣告標識的設備ID,同一設備上的不同APP所獲取的IDFA是一致的;但是用戶可以自主更改IDFA,所以IDFA並不是和設備一一綁定的。
2. 設計
從圖論的角度出發,ID強打通更像是將小連通圖合並成一個大連通圖;比如,在日志中出現如下三條記錄,分別表示三個ID集合(小連通圖):
A B C
C D
D E
通過將三個小連通圖合並,便可得到一個大連通圖——完整的ID集合列表A B C D E
。淘寶明風介紹了如何用Spark GraphX通過outerJoinVertices等運算符來做大數據下的多圖合並;針對ID強打通的場景,也可采用類似的思路:日志數據構建大的稀疏圖,然后采用自join的方式做打通。但是,我並沒有選用GraphX,理由如下:
- GraphX只支持有向圖,而不支持無向圖,而ID之間的關聯關系是一個無向連通圖;
- GraphX的join操作不完全可控,“不完全可控”是指在做圖合並時我們需要做過濾山寨設備、一對多的ID等操作,而在GraphX封裝好的join算子上實現過濾操作則成本過高。
因而,基於MR計算模型(Spark框架)我設計新的ID打通算法;算法流程如下:打通的map階段將ID集合id_set
中每一個Id做key然后進行打散(id_set.map(id -> id_set))
),Reduce階段按key做id_set
的合並。通過觀察發現:僅需要兩步MR便可完成上述打通的操作。以上面的例子做說明,第一步MR完成后,打通ID集合為:A B C D
、 C D E
,第二步MR完成后便得到完整的ID集合列表A B C D E
。但是,在兩步MR過程中,所有的key都會對應一個聚合結果,而其中一些聚合結果只是中間結果。故而引入了key_set
用於保存聚合時的key值,加入了第三步MR,通過比較key_set
與id_set
來對中間聚合結果進行過濾。算法的偽代碼如下:
MR step1:
Map:
input: id_set
process: flatMap id_set;
output: id -> (id_set, 1)
Rduce:
process: reduceByKey
output: id -> (id_set, empty key_set, int_value)
MR step2:
Map:
input: id -> (id_set, empty key_set, int_value)
process: flatMap id_set, if have id_aggregation, then add key to key_set
output: id -> (id_set, key_set, int_value)
Reduce:
process: reduceByKey
output: id -> (id_set, key_set, int_value)
MR step3:
Map:
input: id -> (id_set, empty key_set, int_value)
process: flatMap id_set, if have id_aggregation, then add key to key_set
output: id -> (id_set, key_set, int_value)
Reduce:
process: reduceByKey
output: id -> (id_set, key_set, int_value)
Filters:
process: if have id_aggregation, then add key to key_set
filter: if no id_aggregation or key_set == id_set
distinct
3. 實現
針對上述ID強打通算法,Spark實現代碼如下:
case class DvcId(id: String, value: String)
val log: RDD[mutable.Set[DvcId]]
// MR1
val rdd1: RDD[(DvcId, (mutable.Set[DvcId], mutable.Set[DvcId], Int))] = log
.flatMap { set =>
set.map(t => (t, (set, 1)))
}.reduceByKey { (t1, t2) =>
t1._1 ++= t2._1
val added = t1._2 + t2._2
(t1._1, added)
}.map { t =>
(t._1, (t._2._1, mutable.Set.empty[DvcId], t._2._2))
}
// MR2
val rdd2: RDD[(DvcId, (mutable.Set[DvcId], mutable.Set[DvcId], Int))] = rdd1
.flatMap(flatIdSet).reduceByKey(tuple3Add)
// MR3
val rdd3: RDD[(DvcId, (mutable.Set[DvcId], mutable.Set[DvcId], Int))] = rdd2
.flatMap(flatIdSet).reduceByKey(tuple3Add)
// filter
val rdd4 = rdd3.filter { t =>
t._2._2 += t._1
t._2._3 == 1 || (t._2._1 -- t._2._2).isEmpty
}.map(_._2._1).distinct()
// flat id_set
def flatIdSet(row: (DvcId, (mutable.Set[DvcId], mutable.Set[DvcId], Int))) = {
row._2._3 match {
case 1 =>
Array((row._1, (row._2._1, row._2._2, row._2._3)))
case _ =>
row._2._2 += row._1 // add key to keySet
row._2._1.map(d => (d, (row._2._1, row._2._2, row._2._3))).toArray
}
}
def tuple3Add(t1: (mutable.Set[DvcId], mutable.Set[DvcId], Int),
t2: (mutable.Set[DvcId], mutable.Set[DvcId], Int)) = {
t1._1 ++= t2._1
t1._2 ++= t2._2
val added = t1._3 + t2._3
(t1._1, t1._2, added)
}
其中,引入常量1是為了標記該條記錄是否發生了ID聚合的情況。
ID強打通算法實現起來比較簡單,但是在實際的應用時,日志數據往往是帶噪聲的:
- 有山寨設備;
- ID之間存在着一對多的情況,比如,各業務線的UID的靠譜程度不一,有的UID會對應到多個設備。
另外,ID強打通后是HDFS的離線數據,為了提供線上服務、保證ID之間的一一對應關系,應選擇何種分布式數據庫、表應如何設計、如何做到數據更新時而不影響線上服務等等,則是另一個需要思考的問題。