來自字節跳動的管梓越同學一篇關於Apache Hudi在字節跳動推薦系統中EB級數據量實踐的分享。
接下來將分為場景需求、設計選型、功能支持、性能調優、未來展望五部分介紹Hudi在字節跳動推薦系統中的實踐。
在推薦系統中,我們在兩個場景下使用數據湖
- 我們使用BigTable作為整個系統近線處理的數據存儲,這是一個公司自研的組件TBase,提供了BigTable的語義和搜索推薦廣告場景下一些需求的抽象,並屏蔽底層存儲的差異。為了更好的理解,這里可以把它直接看做一個HBase。在這過程中為了能夠服務離線對數據的分析挖掘需求,需要將數據導出到離線存儲中。在過去用戶或是使用MR/Spark直接訪問存儲,或是通過掃庫的方式獲取數據,不符合OLAP場景下的數據訪問特性。因此我們基於數據湖構建BigTable的CDC,提高數據時效,減少近線系統訪問壓力,提供高效的OLAP訪問和用戶友好的SQL消費方式。
- 除此之外,我們還在特征工程與模型訓練的場景中使用數據湖。我們從內部和外部分別獲得兩類實時數據流,一個是來自系統內部回流的Instance,包含了推薦系統Serving時獲得的Feature。另一種是來自端上埋點/多種復雜外部數據源的反饋,這類數據作為Label,和之前的feature共同組成了完整的機器學習樣本。針對這個場景,我們需要實現一個基於主鍵的拼接操作,將Instance和Label Merge到一起。開窗范圍可能長達數十天,千億行量級。需要支持高效得列式選取和謂詞下推。同時還需要支持並發Update等相關能力。
在這兩個場景下存在如下挑戰
- 數據的非常不規整。相比Binlog,WAL沒法獲得一行的全部信息,同時數據大小變化非常大。
- 吞吐量比較大,單表吞吐超百GB/s,單表PB級存儲。
- 數據Schema 復雜。數據存在高維、稀疏等現象。表列數從1000-10000+都有。並且有大量復雜數據類型。
在引擎選型時,我們考察過Hudi,Iceberg,DeltaLake三個最熱門的數據湖引擎。三者在我們的場景下各有優劣,最終基於Hudi對上下游生態的開放,對全局索引的支持,對若干存儲邏輯提供了定制化的開發接口等原因,選擇了Hudi作為存儲引擎。
- 針對實時寫入,選擇了時效性更好的MOR。
- 考察了索引類型,首先因為WAL不能每次都獲取到數據的分區,所以必須要全局索引。在幾種全局索引實現中,為了實現高性能的寫入,HBase是唯一的選擇。另外兩種的實現決定了都和HBase在性能有本質上的差距。
- 在計算引擎上和API上,當時Hudi對Flink的支持還不是特別完善,所以選擇了更為成熟的Spark,為了能靈活實現一些定制功能和邏輯,也因為DataFrame的API語義限制比較多,所以選擇了更底層的RDD API。
功能支持包括存儲語義的MVCC和Schema注冊系統。
首先為了支持WAL語義的寫入,我們實現了針對MVCC的Payload,基於Avro自定義了一套帶時間戳的數據結構實現。並通過視圖訪問的方式對用戶屏蔽了這套邏輯。除此之外還實現了HBase Append的語義,可以實現對List類型的追加寫而非覆蓋寫。
由於Hudi本身的Schema從Write的數據中獲取,這種方式和其他系統對接不是很方便,以及我們需要一些基於Schema的擴展功能,所以我們構建了一個元數據中心來提供元數據相關的操作。
-
首先我們基於一種內部的存儲提供的語義實現了原子變更和異地多活。用戶可以通過接口原子地觸發Schema變更並立刻獲得結果。
-
並通過加入版本號的方法實現了Schema的多版本,Schema而不是把Json傳來傳去。有了多版本也可以實現Schema更靈活的演進。
-
我們還支持了列級別的額外信息編碼,來幫助業務實現一些場景下特有的擴展功能。並把列名替換成了數字來節約使用過程中的開銷。
-
Hudi的Spark Job在使用的時候會在JVM級別構建一個local cache並通過pull的方式和元數據中心同步數據,實現Schema的快速訪問和進程內Schema的單例。
在我們場景下性能挑戰比較大,最大單表數據量達400PB+,日增PB級數據量,總數據量達EB級別,因此我們針對性能和數據特性開發做了一些工作來提高性能。
序列化方面包括如下優化
- Schema:數據使用Avro序列化開銷特別大,而且消耗資源也非常多。針對這個問題,我們首先借助Schema的JVM單例,規避了序列化過程中很多費CPU的比較操作。
- 通過優化Payload邏輯,減少了需要序列化的次數。
- 借助了第三方的Avro序列化實現,通過將序列化過程編譯成字節碼的方式來提高SerDe的速度以及降低內存占用。對這種序列化形式做了修改,以保證我們的復雜Schema也能夠正常編譯。
對於Compaction流程優化如下
- Hudi除了默認的Inline/Async compaction選項之外,還支持Compaction的靈活部署。Compaction Job的作業特性和Ingestion作業其實有較大區別。在同一個Spark Application當中不僅不能針對性設置,也存在資源彈性不足的問題。我們首先構建了獨立部署的腳本,讓Compaction作業可以獨立觸發運行。使用了低成本的混部隊列並可以針對此次Compaction的Plan做資源申請。除此之外還做了基於規則和啟發式的Compaction Strategy,用戶的需求通常是保證天級別或者小時級別的SLA,並針對性地壓縮某些分區的數據,所以提供了針對性壓縮的能力。
- 為了能縮短關鍵Compaction的時間,我們通常會提前做Compaction來避免所有工作都在一個Compaction Job中完成。但是如果一個Compact過的FileGroup又有新的Update,就不得不再次Compact。為了優化整體的效率,我們根據業務信息對一個FileGroup該在何時被壓縮做了啟發式的調度以減少額外的壓縮損耗。該特性的具體收益還在評估中。
- 最后我們對Compaction做了一些流程的優化,比如不使用WriteStatus的Cache等等。
HDFS作為一種面向吞吐設計的存儲,在集群水位比較高的情況下,實時寫入毛刺比較嚴重。通過和HDFS團隊的溝通與合作,做了相關的一些工作。
- 首先把原有的數據HSync操作替換為HFlush,避免了分散性update導致的磁盤IO寫放大。
- 針對場景調參做了激進的pipeline切換設置,並且HDFS團隊開發了靈活的可以控制pipeline的api,來實現這個場景下靈活的配置。
- 最后還通過logfile獨立IO隔離的方式保證了實時寫入的時效性。
還有一些零零碎碎的性能提升,流程修改和Bug Fix,大家感興趣可以找我交流。
未來我們會在以下幾個方面持續迭代。
-
產品化問題:目前使用的API和調參調優方式對用戶要求很高,尤其是調參和運維,需要對Hudi原理有相當的了解才可以完成,不利於用戶推廣使用。
-
生態對接問題:在我們的場景中,技術棧以Flink為主,未來會探索Flink的使用。除此之外上下游使用的應用和環境也比較復雜,非常需要跨語言和通用的接口實現。目前和Spark綁定過於嚴重。
-
成本和性能問題:老生常談的話題,由於我們場景比較大,所以在這塊優化上的收益非常可觀。
-
存儲語義:我們把Hudi當做一種存儲來使用而非一種表格式。所以未來會拓展Hudi的使用場景,需要更豐富的存儲語義,會在這方面做更多的工作。
最后打個廣告,目前我們推薦架構團隊正在招人,工作地包括:北京/上海/杭州/新加坡/山景城等,有興趣的小伙伴可以添加微信qinglingcannotfly或發送簡歷至郵箱: guanziyue.gzy@bytedance.com