維度表,作為數據倉庫里面的概念,是維度屬性的集合,比如時間維、地點維;
但這里要討論流計算中的維度表問題,
流計算中維表問題和數據倉庫中有所不同,往往是因為通過agent采集到的數據比較有限,在做數據業務的時候,需要先實時的把這些維度信息給補全;
這個問題其實就是,主數據流和多個靜態表或半靜態表之間的join問題。
在flink中稱為side input問題,https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
解決維表問題考慮到點,
a. 對元數據庫的讀壓力;如果分析程序有1000並發,是否需要讀1000次
b. 讀維表數據不能拖慢主數據流的throughput,每秒千萬條數據量
c. 動態維表更新問題和一致性問題;元數據是不斷變化的,如何把更新同步到各個並發上
d. 冷啟動問題,如何保證主數據流流過的時候,維表數據已經ready,否則會出現數據無法處理
e. 超大維表數據會導致流量抖動和頻繁gc,比如幾十萬條的實例數據,可能上百兆
下面談談我們解決這個問題的思路,
1. 最簡單的版本,每個進程都會獨立的去從元數據里面讀取元數據;
這樣的優點是簡單,c,d問題天然解決;但只能適用於數據量較小的場景,否則並發太大,a,肯定就無法滿足
2. 隨着業務量的擴大,處理程序的並發越來越大,1,很快會達到瓶頸
我們就采用新的方案,這個方案是在Jstorm環境實現的,用一個spout讀,然后廣播給所有的處理進程
這個方案主要解決a,c的問題,
但是也引入了d,e的問題,
解決d,Jstorm支持讓某個spout在job啟動后等待一段時間,所以可以讓主數據流spout等待幾分鍾再開始讀數據,這樣保證數據到的時候,維表數據已經ready;這個解法每次重啟job都要等好幾分鍾,體驗挺差的,但是勉強可以work
e問題,一個spout廣播超大維表到幾百並發的線程,首先就是會隊列滿,因為jstorm發一份數據到所有並發的時候,是需要產生幾百份真實數據在隊列中的;然后GC也會很嚴重,因為大量的臨時對象會產生釋放,在傳輸和進程cache過程中,會導致業務抖動
這個問題只能增加內存和worker數來解決,否則job有可能會完全hang死
我們也用Chronicle Map(https://github.com/OpenHFT/Chronicle-Map)來嘗試解決內存使用和gc的問題
BTW,有同學問,如果讓數據和維表數據都 shuffle by key,是不是可以緩解這個問題
如果數據量比較小,可以考慮,但是對於我們的主數據流的數據量,是沒法shuffle的,所以需要在每個並發上保留全量的維表信息
2.1 用Flink帶替換Jstorm
Flink雖然在window,亂序,一致性等方面做了很大的改進,但是在這個問題上仍然沒有很好的解,上面提到的side input也沒有實現出來;
並且Flink隨着更多的高層的封裝,程序員的開發自由度是降低的,和JStorm比,所以如果用Flink解決上面的問題,沒有本質變化,可能JStorm更麻煩;
需要用ConnectedStreams去joine數據流和side input流,
對於d問題,沒法直接解決
對於e問題,因為flink對內存管理做的比較好,gc問題有所緩解,但是job抖動的問題還是會存在
因為廣播這么大的數據,會中斷主數據流的數據處理,也會大大增加checkpoint的時間,如下圖,可以看到30分鍾一次的同步
BTW,Flink保障一致性,提供checkpoint機制,但也增加復雜性,這個地方處理不好會有很多問題
比如,如果在source中同步讀數據庫數據,如果讀庫的時間比較長,就會hang住主數據流,因為其他operator都會等它完成checkpoint,寫JStorm的程序員需要注意這點,Flink需要更精細的控制,任何operator,任何並發的hang都會導致整個任務hang
我個人嘗試使用flink本身的機制,statebackend,rocksdb等來更優雅的解決這個問題,但是沒有發現比較好的方法,或者實現過於復雜
3. Redis版本
這其實是把1,2方法做了綜合
使用redis來做cache,只用一個job,負責從元數據庫同步數據到redis,這樣就解決a,c
然后所有的並發都從redis直接查詢需要的元數據,這樣就解決d
對於b,在並發上做local cache,只有第一次需要真正查詢redis,后續定期異步更新就好,不會影響到主數據流
對於e,因為現在不需要一下全量的讀取維表數據到內存,用到的時候才去讀,分攤了負載,也可以得到緩解
該方案當前線上跑着,還算比較穩定
這個方案最大的缺點是增加依賴,對於需要全球多region大規模部署的應用,增加依賴是成本極高的
同時要額外保障redis和同步job的穩定性
BTW,這里不建議local cache用LRU,因為要考慮到當redis掛掉或同步job掛掉的時候,不能影響主數據流,所以我只會異步的更新local cache,但不去做過期,這樣就算redis掛了,也只是影響更新的實例,大大降低故障發生概率
總結,
分享一下自己的一些實戰經驗,希望可以拋磚引玉,找到更合理,優雅的方案