a) 實時計算架構
圖 1百分點大數據平台原理示意圖
工欲善其事,必先利其器。一個穩定可靠且高效的底層架構是實時計算的必要基礎。圖 1給出了百分點數據大平台的總體框架,如圖所示,大數據平台包含數據存儲和數據處理兩個層次。
存儲服務層提供了數據處理層需要的各類分布式存儲,包括分布式文件系統(Hadoop HDFS)、分布式SQL數據庫(MySQL)、分布式 NoSQL數據庫(Redis、MongoDB、HBase)、分布式消息隊列(Apache Kafka)、分布式搜索引擎(Apache Solr) 以及必不可少的Apache Zookeeper。
數據處理層由四個部分組成。其中Web應用雲包含了所有直接面對用戶的Web服務,每個Web應用都會產生Web日志以及其他實時數據,這些數據一 方面會及時交由實時計算框架進行處理,另一方面也會定期同步至離線計算框架;實時計算框架會處理接收到的實時數據,並將處理結果輸出到數據查詢框架或者離 線計算框架;離線計算框架則定期對數據進行處理,並將處理結果輸出至數據查詢框架;數據查詢框架提供了一系列應用接口供程序調取需要的各項數據,同時提供 了一些Web工具幫助業務人員對海量數據進行統計、匯總和分析。
在百分點大數據平台中,與實時計算密切相關的有實時計算框架和數據查詢框架,這部分的組件架構和數據流如圖 2所示。
圖 2實時計算框架和數據查詢框架示意
從圖上可以看出,數據采集服務會將收集到的實時數據推送到消息隊列Kafka中;Kafka中的數據會被兩個處理平台 BDM CEP(Big Data Management Complex Event Processing)和Storm消費並處理。Storm是當 下比較流行的開源流處理框架,百分點公司在2013年中開始使用Storm進行數據清洗、統計和一部分分析任務。在引入Storm之前,百分點所有的實時 計算都是基於BDM CEP進行的,它是我們基於中間件ICE開發的一套流處理平台。BDM CEP包含有四類組件:dispatcher負責從 Kafka中讀取消息,根據消息內容分發給相應的worker;worker復雜處理接收到的消息,並將處理結果傳遞給其他worker或者輸出到各類存 儲服務中;config負責維護dispatcher和worker的交互關系和配置信息,並在交互關系或配置更新時及時通知dispatcher和 worker;monitor負責監控dispatcher和worker的運行情況,把監控信息提交給Ganglia,monitor還負責系統異常時 的報警,以及dispatcher和worker發生故障時進行重啟和遷移。數據查詢框架由圖中最下層的三個組件組成,其中 BDM DS(Data Source)封裝了一系列的數據查詢邏輯並以REST API和ICE服務的形式供各種應用調 用;BDM OLAP(Online Analytical Processing)提供了實時查詢用戶行為和標簽明細,以及近實時的用戶多維度統計、匯 總和分析功能,這些功能是以REST API和Web應用方式提供的;BDM Search是對Solr和HBase的一次封裝,以REST API和 ICE服務方式對外提供近實時搜索功能。
百分點公司的主要服務都是運行在這套架構上的,它擁有良好的穩定性和擴展性,一般來說只需要增加水平擴展結點即可提高數據處理能力,這為百分點業務的穩定發展奠定了技術基礎。
b) 實時計算算法
要真正實現大數據實時計算,光有框架是不行的,還必須針對特定業務開發特定的處理流程和算法。相比較離線計算而言,實時計算在算法方面需要考慮的更 多,這是因為實時計算能夠用到的存儲資源遠不如離線,而且處理過程的時間限制要比離線計算嚴格,這都要求實時計算算法必須做相當多的優化。在這一節中,筆 者將以海量計數問題為例介紹百分點公司在實時計算算法方面的經驗。
目前,百分點數據平台上包含了近千萬的電商單品數據,實時追蹤這些單品的瀏覽和交易數據是必須的,這也是做個性化推薦、商品畫像、銷量預測和用戶畫 像等業務的必要前提。我們的問題是:如何設計一種算法使得我們可以實時查看任意單品最近24小時的瀏覽量?這個問題描述起來很簡單,但稍加思索就會發現做 起來並不容易。下面我們先給出一個簡單方案,而后按照一定的原則逐步精化到最佳方案。
c) 簡單方案
圖 3按秒計數方案
看到這個問題時,大部分讀者會很快想到如圖 3所示的算法方案。圖中紅色、藍色和綠色的方塊分別表示不同的單品。在這個方案中,我們為每個單品保存一份瀏覽信息,它包含兩個數據結構:
d) 歷史瀏覽量列表(簡稱歷史),一個列表,列表中每個元素包含一個時間戳和一個整數,分別代表過去24小時中的某一秒及這一秒鍾的瀏覽量,按時 間順序排序。這個列表的最長會包含24*3600=86400個元素,但一般情況下極少有單品時時刻刻都被瀏覽,我們可以假設這個列表的平均長度不超過 10000。
e) 累計瀏覽量(簡稱累計量),一個整數,代表截止到最后一次訪問時的瀏覽量。
如圖所示,假設藍色單品對應的數據是 [(t1, a1), (t2, a2), …, (tn, an)]和A。這表示t1時刻的該單品瀏覽量是a1,t2時刻是a2,tn是最后一次記錄到瀏覽該單品的時刻,瀏覽量是an。截止到tn,該單品的總瀏覽量是A。
當單品瀏覽源源不斷進入到消息隊列時,處理進程(或線程)P1,P2…會實時讀取到這些信息,並修改對應單品的數據信息。例如,P1讀取到t時刻對藍色單品的瀏覽記錄時,會進行下面的操作:
f) 得到當前時刻ct;
g) 對數據庫中藍色單品數據加鎖,加鎖成功后讀取出數據,假設歷史是[(t1, a1), (t2, a2), …, (tn, an)],累計量是A;
h) 累計量遞增,即從A修改為A+1
i) 如果ct=tn,則更新歷史為[(t1, a1), (t2, a2), …, (tn, an+1)],否則更新為 [(t1, a1), (t2, a2), …, (tn, an), (ct,1)];最后刪除時間戳小於ct-24*3600的列表元素,刪除的同時 從累計量中減去對應時刻的瀏覽量,例如只有元素t1> ct-24*3600,則操作完成后的瀏覽量為A+1-a1;
j) 將新的歷史和累計量輸出至數據庫,釋放鎖。
不難驗證這個方案是可以正確得出每個單品24小時內的瀏覽量的,並且只要在資源(計算、存儲和網絡)充足的情況下,數據庫中單品的瀏覽量是實時更新的。這個方案也是分布式實時計算中最簡單最常見的一種模式。
k) 避免鎖
圖 4不包含鎖的方案
第一個方案中需要對數據庫加鎖,無論加鎖粒度多細,都會嚴重影響計算效率。雖然像Redis一類的內存數據庫提供了incr這樣的原子操作,但這種操作多數情況下只適用於整型數據,並不適合本問題的歷史數據。
要想提高實時處理效率,避免鎖是非常重要的。一種常見的做法是將並行操作串行化,就像MapReduce中的Reduce階段一樣,將key相同的 數據交由同一個reducer處理。基於這個原理,我們可以將方案改造為如圖 4所示,我們新增一個數據分發處理過程,它的作用是保證同一個單品的所有數 據都會發送給同一個處理程序。例如將藍色單品交由P1處理,紅色交由P2處理,綠色交由P3處理。這樣P1在處理過程中不需要對數據庫加鎖,因為不存在資 源競爭。這樣可以極大的提高計算效率,於是整個計算過程變為:
l) 得到當前時刻ct;
m) 讀取數據庫中藍色單品信息,假設歷史是[(t1, a1), (t2, a2), …, (tn, an)],累計量是A;
n) 累計遞增,即從A修改為A+1
- o) 如果ct=tn,則更新歷史為[(t1, a1), (t2, a2), …, (tn, an+1)],否則更新為 [(t1, a1), (t2, a2), …, (tn, an), (ct,1)];最后刪除時間戳小於ct-24*3600的列表元素,刪除的同時 從累量中減去對應時刻的瀏覽量;
p) 將新的歷史和累計量輸出至數據庫。
步驟b)和e)省去了鎖操作,整個系統的並發性和吞吐量會得到大大提高。當然,沒有免費的午餐,這種方案的缺點在於存在單點隱患,例如一旦P1由於 某些原因掛掉了,那么藍色單品的數據將得不到及時處理,計數結果將無法保證實時。這種計算過程對系統監控和故障轉移會有很高的要求。
q) 數據分層
圖 5帶有本地緩存的方案
方案二已經可以大大提高計算效率,但這還不夠,我們可以看到在計算步驟b)和e)中總是要把歷史和累計量同時從數據庫中讀出或寫入,實際上這是沒有 必要的,因為只有累計量才是外部必須使用的數據,而歷史只是算法的中間數據。這樣,我們可以區別對待歷史和累計量,我們將歷史和累計量都緩存在計算進程 中,定期更新歷史至數據庫,而累計量則實時更新。新的方案如圖 5所示,計算過程變為:
r) 得到當前時刻ct;
s) 如果本地沒有藍色單品的信息,則從數據庫中讀取藍色單品信息;否則直接使用本地緩存的信息。假設歷史是[(t1, a1), (t2, a2), …, (tn, an)],累計量是A;
t) 累計量遞增,即從A修改為A+1
u) 如果ct=tn,則更新歷史為[(t1, a1), (t2, a2), …, (tn, an+1)],否則更新為 [(t1, a1), (t2, a2), …, (tn, an), (ct,1)];最后刪除時間戳小於ct-24*3600的列表元素,刪除的同時 從累計量中減去對應時刻的瀏覽量;
v) 將新的累計量輸出至數據庫;如果滿足一定的條件(例如上次輸出時間足夠久遠,或者處理的消息量達到一定數量),則將歷史輸出至數據庫。
這種方案可以大大降低數據庫壓力、數據IO和序列化反序列化次數,從而提高整個系統的處理效率。數據分層實際上是計算機中一種常用的路數,例如硬件 中的高速緩存/內存/磁盤,系統IO中的緩沖區/磁盤文件,數據庫的內存索引、系統DNS緩存等等。我們使用的開源搜索引擎Solr就使用了同樣的思路達 到近實時索引。Solr包含磁盤全量索引和實時增加的內存增量索引,並引入了“soft提交”的方式更新新索引。新數據到達后,Solr會使用 “soft”提交的方式更新內存增量索引,在檢索的時候通過同時請求全量索引和增量索引並合並的方式獲得到最新的數據。之后會在服務器空閑的時 候,Solr會把內存增量索引合並到磁盤全量索引中保證數據完整。
當然,這種方案也對系統的穩定性提出了更高的要求,因為一旦P1掛掉那么它緩存的數據將丟失,及時P1及時重啟,這些數據也無法恢復,那么在一段時間內我們將無法得到准確的實時瀏覽量。
w) 模糊化
現在,我們來考慮存儲資源問題。假設時間戳和整型都用long類型(8字節)保存,那么按照方案一中的估計,我們對每個單品的需要記錄的數據大小約 為10000×(8+8)+8=16008字節≈156KB,1000萬單品的數據總量將超過1T,如果考慮到數據庫和本地緩存因素,那么整個系統需要的 存儲量至少是2T!這對於計數這個問題而言顯然是得不償失的,我們必須嘗試將數據量降低,在這個問題中可行的是降低歷史的存儲精度。我們將歷史定義為小時 級別精度,這樣每個單品的歷史至多有24個,數據量最多392字節,1000萬單品的信息總量將變為3.6G,系統總的存儲量不超過8G,這是可以接受 的。如果考慮用int類型代替long類型存儲時間(小時數),則存儲量可以進一步降低到不足6G。這樣新的計算過程變為:
x) 得到當前時刻精確到小時的部分ct;
y) 如果本地沒有藍色單品的信息,則從數據庫中讀取藍色單品信息;否則直接使用本地緩存的信息。假設歷史是[(t1, a1), (t2, a2), …, (tn, an)],累計量是A;
z) 累計量遞增,即從A修改為A+1
aa) 如果ct=tn,則更新歷史為[(t1, a1), (t2, a2), …, (tn, an+1)],否則更新為 [(t1, a1), (t2, a2), …, (tn, an), (ct,1)];最后刪除小時數小於ct-24的列表元素,刪除的同時從累計量中 減去對應時刻的瀏覽量;
ab) 將新的瀏覽量輸出至數據庫;如果滿足一定的條件,則將歷史輸出至數據庫。
在這種方案下,數據庫中存儲的並不是過去24小時內的瀏覽量,而是過去23小時多一點內的。例如在1月2日12:15時數據庫中的瀏覽量實際上是1月1日13:00到1月2日12:15的瀏覽量!
這種降低數據精度的方法我們可以稱之為模糊化,它是用資源換效率的一種方法。在對數據精確性不是特別敏感的領域,這種方法可以大大降低系統資源使用 量、提高系統的處理效率。利用模糊化的實時算法快速得到近似結果,而后用離線算法慢慢修正結果的精確度,是百分點在大數據處理中經常使用的招數。
ac) 局部精化
圖 6局部精華示意圖
有時候,模糊化會掩蓋掉一些重要的細節信息,達不到業務需求的要求。例如,電商有很多的秒殺活動,此時必須及時監測單品瀏覽量,如果我們還按小時維 度進行計算,那顯然不能滿足要求。這種情況下我們就必須對局部數據進行細化,它是模糊化的逆操作,我們稱之為局部精化。如圖 6所示,第k小時的數據是很 敏感的,我們希望它的數據能更實時一些,那我們可以將第k小時的數據切分的更細,對它做10分鍾、分鍾甚至秒級別的計算,而其他時間段仍舊采用小時精度。
這種方案會增加系統的設計和開發難度,而且必須有靈活的配置才能滿足多變的業務需求。
ad) 數據建模
除了局部細化,還有一種方法可以提高數據的精確度,這就是數據建模。在方案四中我們提到在小時精度下,實際上只能得到23小時多一點之前的瀏覽量, 有一部分數據丟失了沒有用到。實際上我們可以將丟棄掉的數據利用起來得到更好的結果。最簡單思路是假設同一小時內單品的瀏覽量是線性增加的,那么我們顯然 可以利用相鄰兩個小時的瀏覽歷史推算出任意時刻的瀏覽量。回到方案四中的例子,1月2日12:15的實時瀏覽量可以通過下面的公式計算得出:
[a0 + (a1-a0)×(60-15)/60] + a1 + … + a24
其中a0代表1月1日12:00到13:00之間的瀏覽量,依次類推,a24代表1月2日12:00到12:15之間的瀏覽量。公式中的 a0 + (a1-a0)×(60-15)/60 估計了1月1日12:15-13:00之間的瀏覽量,這樣就得出了從1月1日12:15到1月2日 12:15之間24小時內的瀏覽量。
圖 7某單品的全天瀏覽分布
我們還可以利用更復雜的瀏覽量分布模型得出精度更高的估計,圖 7給出了某單品一天的瀏覽分布曲線,這個分布適用於絕大多數的商品以及絕大多數的時 間。因此,我們完全可以利用這個分布來更精確的估計每個單品的瀏覽量,利用這個模型我們甚至不需要記錄瀏覽歷史,只需要知道當天0:00到當前的瀏覽總量 就可以計算出前24小時內的瀏覽量,甚至預測接下來的瀏覽量情況!
當然,模型也不是萬能的,模型本身的建立和更新也是有代價的,如果建模方法不恰當或者模型更新不及時,很有可能得出的結果會很差。
ae) 小結
本文首先介紹了百分點公司大數據平台的基本原理,並詳細說明了其中與實時計算相關部分,實時計算框架和數據查詢框架,的系統架構、處理流程和應用。 而后,我們以海量數據計數問題為例,深入淺出的介紹了在百分點公司在實時計算算法中常用的方法和技巧,以及它們適用的場景和可能帶來的問題。這些方法和技 巧具有普遍性和通用性,被廣泛應用於百分點個性化推薦引擎的各個模塊,包括用戶意圖預測、用戶畫像、個性化推薦評分、商品分類等等。如果能在實際業務中靈 活運用這些方法和技巧,則能夠大大提高實時計算的數據規模和處理效率,幫助業務快速發展。希望本文的介紹能夠幫助讀者更好的理解大數據實時計算的方方面 面。