簡介: B 站選擇 Flink + Hudi 的數據湖技術方案,以及針對其做出的優化。
本文作者喻兆靖,介紹了為什么 B 站選擇 Flink + Hudi 的數據湖技術方案,以及針對其做出的優化。主要內容為:
- 傳統離線數倉痛點
- 數據湖技術方案
- Hudi 任務穩定性保障
- 數據入湖實踐
- 增量數據湖平台收益
- 社區貢獻
- 未來的發展與思考
一、傳統離線數倉痛點
1. 痛點
之前 B 站數倉的入倉流程大致如下所示:
- 大規模的數據落地 HDFS 后,只能在凌晨分區歸檔后才能查詢並做下一步處理;
- 數據量較大的 RDS 數據同步,需要在凌晨分區歸檔后才能處理,並且需要做排序、去重以及 join 前一天分區的數據,才能產生出當天的數據;
- 僅能通過分區粒度讀取數據,在分流等場景下會出現大量的冗余 IO。
總結一下就是:
- 調度啟動晚;
- 合並速度慢;
- 重復讀取多。
2. 痛點思考
- 調度啟動晚
思路:既然 Flink 落 ODS 是准實時寫入的,有明確的文件增量概念,可以使用基於文件的增量同 步,將清洗、補維、分流等邏輯通過增量的方式進行處理,這樣就可以在 ODS 分區未歸檔的時 候就處理數據,理論上數據的延遲只取決於最后一批文件的處理時間。
- 合並速度慢
思路:既然讀取已經可以做到增量化了,那么合並也可以做到增量化,可以通過數據湖的能力結 合增量讀取完成合並的增量化。
- 重復讀取多
思路:重復讀取多的主要原因是分區的粒度太粗了,只能精確到小時/天級別。我們需要嘗試一 些更加細粒度的數據組織方案,將 Data Skipping 可以做到字段級別,這樣就可以進行高效的數 據查詢了。
3. 解決方案: Magneto - 基於 Hudi 的增量數據湖平台
以下是基於 Magneto 構建的入倉流程:
-
Flow
- 使用流式 Flow 的方式,統一離線和實時的 ETL Pipline
-
Organizer
- 數據重組織,加速查詢
- 支持增量數據的 compaction
-
Engine
- 計算層使用 Flink,存儲層使用 Hudi
-
Metadata
- 提煉表計算 SQL 邏輯
- 標准化 Table Format 計算范式
二、數據湖技術方案
1. Iceberg 與 Hudi 的取舍
1.1 技術細節對比
統計截止至 2021-08-09
大致可以分為以下幾個主要緯度來進行對比:
- 對 Append 的支持
Iceberg 設計之初的主要支持方案,針對該場景做了很多優化。 Hudi 在 0.9 版本中對 Appned 模式進行了支持,目前在大部分場景下和 Iceberg 的差距不大, 目前的 0.10 版本中仍然在持續優化,與 Iceberg 的性能已經非常相近了。
- 對 Upsert 的支持
Hudi 設計之初的主要支持方案,相對於 Iceberg 的設計,性能和文件數量上有非常明顯的優 勢,並且 Compaction 流程和邏輯全部都是高度抽象的接口。 Iceberg 對於 Upsert 的支持啟動較晚,社區方案在性能、小文件等地方與 Hudi 還有比較明顯 的差距。
- 社區活躍度
Hudi 的社區相較於 Iceberg 社區明顯更加活躍,得益於社區活躍,Hudi 對於功能的豐富程度與 Iceberg 拉開了一定的差距。
綜合對比,我們選擇了 Hudi 作為我們的數據湖組件,並在其上繼續優化我們需要的功能 ( Flink 更好的集成、Clustering 支持等)
2. 選擇 Flink + Hudi 作為寫入方式
我們選擇 Flink + Hudi 的方式集成 Hudi 的主要原因有三個:
- 我們部分自己維護了 Flink 引擎,支撐了全公司的實時計算,從成本上考慮不想同時維護兩套計算引擎,尤其是在我們內部 Spark 版本也做了很多內部修改的情況下。
-
Spark + Hudi 的集成方案主要有兩種 Index 方案可供選擇,但是都有劣勢:
- Bloom Index:使用 Bloom Index 的話,Spark 會在寫入的時候,每個 task 都去 list 一遍所有的文件,讀取 footer 內寫入的 Bloom 過濾數據,這樣會對我們內部壓力已經非常大的 HDFS 造成非常恐怖的壓力。
- Hbase Index:這種方式倒是可以做到 O(1) 的找到索引,但是需要引入外部依賴,這樣會使整個方案變的比較重。
- 我們需要和 Flink 增量處理的框架進行對接。
3. Flink + Hudi 集成的優化
3.1 Hudi 0.8 版本集成 Flink 方案
3.2 Bootstrap State 冷啟動
背景:支持在已經存在 Hudi 表啟動 Flink 任務寫入,從而可以做到由 Spark on Hudi 到 Flink on Hudi 的方案切換
原方案:
- 每個 Bootstrap Operator 在初始化時,加載屬於當前 Task 的 fileId 相關的 BaseFile 和 logFile;
- 將 BaseFile 和 logFile 中的 recordKey 組裝成 HoodieKey,通過 Key By 的形式發送給 BucketAssignFunction,然后將 HoodieKey 作為索引存儲在 BucketAssignFunction 的 state 中。
效果:通過將 Bootstrap 功能單獨抽出一個 Operator,做到了索引加載的可擴展性,加載速度提升 N (取決於並發度) 倍。
3.3 Checkpoint 一致性優化
背景:在 Hudi 0.8 版本的 StreamWriteFunction 中,存在極端情況下的數據一致性問題。
原方案:
優化方案:
背景:Append 模式是用於支持不需要 update 的數據集時使用的模式,可以在流程中省略索引、 合並等不必要的處理,從而大幅提高寫入效率。
主要修改:
- 支持每次 FlushBucket 寫入一個新的文件,避免出現讀寫的放大;
- 添加參數,支持關閉 BoundedInMemeoryQueue 內部的限速機制,在 Flink Append 模式下只需要將 Queue 的大小和 Bucket buffer 設置成同樣的大小就可以了;
- 針對每個 CK 產生的小文件,制定自定義 Compaction 計划;
- 通過以上的開發和優化之后,在純 Insert 場景下性能可達原先 COW 的 5 倍。
三、Hudi 任務穩定性保障
1. Hudi 集成 Flink Metrics
通過在關鍵節點上報 Metric,可以比較清晰的掌握整個任務的運行情況:
2. 系統內數據校驗
3. 系統外數據校驗
四、數據入湖實踐
1. CDC數據入湖
1.1 TiDB入湖方案
由於目前開源的各種方案都沒辦法直接支持 TiDB 的數據導出,直接使用 Select 的方式會影響數 據庫的穩定性,所以拆成了全量 + 增量的方式:
- 啟動 TI-CDC,將 TIDB 的 CDC 數據寫入對應的 Kafka topic;
- 利用 TiDB 提供的 Dumpling 組件,修改部分源碼,支持直接寫入 HDFS;
- 啟動 Flink 將全量數據通過 Bulk Insert 的方式寫入 Hudi;
- 消費增量的 CDC 數據,通過 Flink MOR 的方式寫入 Hudi。
1.2 MySQL 入湖方案
MySQL 的入湖方案是直接使用開源的 Flink-CDC,將全量和增量數據通過一個 Flink 任務寫入 Kafka topic:
- 啟動 Flink-CDC 任務將全量數據以及 CDC 數據導入 Kafka topic;
- 啟動 Flink Batch 任務讀取全量數據,通過 Bulk Insert 寫入 Hudi;
- 切換為 Flink Streaming 任務將增量 CDC 數據通過 MOR 的方式寫入 Hudi。
2. 日志數據增量入湖
- 實現 HDFSStreamingSource 和 ReaderOperator,增量同步 ODS 的數據文件,並且通過寫入 ODS 的分區索引信息,減少對 HDFS 的 list 請求;
- 支持 transform SQL 配置化,允許用戶進行自定義邏輯轉化,包括但不限於維表 join、自定義 udf、按字段分流等;
- 實現 Flink on Hudi 的 Append 模式,大幅提升不需要合並的數據寫入速率。
五、增量數據湖平台收益
- 通過 Flink 增量同步大幅度提升了數據同步的時效性,分區就緒時間從 2:00~5:00 提前到 00:30 分內;
- 存儲引擎使用 Hudi,提供用戶基於 COW、MOR 的多種查詢方式,讓不同用戶可以根據自己 的應用場景選擇合適的查詢方式,而不是單純的只能等待分區歸檔后查詢;
- 相較於之前數倉的 T+1 Binlog 合並方式,基於 Hudi 的自動 Compaction 使得用戶可以將 Hive 當成 MySQL 的快照進行查詢;
- 大幅節約資源,原先需要重復查詢的分流任務只需要執行一次,節約大約 18000 core。
六、社區貢獻
上述優化都已經合並到 Hudi 社區,B站在未來會進一步加強 Hudi 的建設,與社區一起成⻓。
部分核心PR
七、未來的發展與思考
- 平台支持流批一體,統一實時與離線邏輯;
- 推進數倉增量化,達成 Hudi ODS -> Flink -> Hudi DW -> Flink -> Hudi ADS 的全流程;
- 在 Flink 上支持 Hudi 的 Clustering,體現出 Hudi 在數據組織上的優勢,並探索 Z-Order 等加速多維查詢的性能表現;
- 支持 inline clustering。
原文鏈接
本文為阿里雲原創內容,未經允許不得轉載。