Apache Hudi 在 B 站構建實時數據湖的實踐


簡介: B 站選擇 Flink + Hudi 的數據湖技術方案,以及針對其做出的優化。

本文作者喻兆靖,介紹了為什么 B 站選擇 Flink + Hudi 的數據湖技術方案,以及針對其做出的優化。主要內容為:

  1. 傳統離線數倉痛點
  2. 數據湖技術方案
  3. Hudi 任務穩定性保障
  4. 數據入湖實踐
  5. 增量數據湖平台收益
  6. 社區貢獻
  7. 未來的發展與思考

一、傳統離線數倉痛點

1. 痛點

之前 B 站數倉的入倉流程大致如下所示:

img

在這種架構下產生了以下幾個核心痛點:
  1. 大規模的數據落地 HDFS 后,只能在凌晨分區歸檔后才能查詢並做下一步處理;
  2. 數據量較大的 RDS 數據同步,需要在凌晨分區歸檔后才能處理,並且需要做排序、去重以及 join 前一天分區的數據,才能產生出當天的數據;
  3. 僅能通過分區粒度讀取數據,在分流等場景下會出現大量的冗余 IO。

總結一下就是:

  • 調度啟動晚;
  • 合並速度慢;
  • 重復讀取多。

2. 痛點思考

  • 調度啟動晚

    思路:既然 Flink 落 ODS 是准實時寫入的,有明確的文件增量概念,可以使用基於文件的增量同 步,將清洗、補維、分流等邏輯通過增量的方式進行處理,這樣就可以在 ODS 分區未歸檔的時 候就處理數據,理論上數據的延遲只取決於最后一批文件的處理時間。

  • 合並速度慢

    思路:既然讀取已經可以做到增量化了,那么合並也可以做到增量化,可以通過數據湖的能力結 合增量讀取完成合並的增量化。

  • 重復讀取多

    思路:重復讀取多的主要原因是分區的粒度太粗了,只能精確到小時/天級別。我們需要嘗試一 些更加細粒度的數據組織方案,將 Data Skipping 可以做到字段級別,這樣就可以進行高效的數 據查詢了。

3. 解決方案: Magneto - 基於 Hudi 的增量數據湖平台

以下是基於 Magneto 構建的入倉流程:

img

  • Flow

    • 使用流式 Flow 的方式,統一離線和實時的 ETL Pipline
  • Organizer

    • 數據重組織,加速查詢
    • 支持增量數據的 compaction
  • Engine

    • 計算層使用 Flink,存儲層使用 Hudi
  • Metadata

    • 提煉表計算 SQL 邏輯
    • 標准化 Table Format 計算范式

二、數據湖技術方案

1. Iceberg 與 Hudi 的取舍

1.1 技術細節對比

img

1.2 社區活躍度對比

統計截止至 2021-08-09

img

1.3 總結

大致可以分為以下幾個主要緯度來進行對比:

  • 對 Append 的支持

    Iceberg 設計之初的主要支持方案,針對該場景做了很多優化。 Hudi 在 0.9 版本中對 Appned 模式進行了支持,目前在大部分場景下和 Iceberg 的差距不大, 目前的 0.10 版本中仍然在持續優化,與 Iceberg 的性能已經非常相近了。

  • 對 Upsert 的支持

    Hudi 設計之初的主要支持方案,相對於 Iceberg 的設計,性能和文件數量上有非常明顯的優 勢,並且 Compaction 流程和邏輯全部都是高度抽象的接口。 Iceberg 對於 Upsert 的支持啟動較晚,社區方案在性能、小文件等地方與 Hudi 還有比較明顯 的差距。

  • 社區活躍度

    Hudi 的社區相較於 Iceberg 社區明顯更加活躍,得益於社區活躍,Hudi 對於功能的豐富程度與 Iceberg 拉開了一定的差距。

綜合對比,我們選擇了 Hudi 作為我們的數據湖組件,並在其上繼續優化我們需要的功能 ( Flink 更好的集成、Clustering 支持等)

2. 選擇 Flink + Hudi 作為寫入方式

我們選擇 Flink + Hudi 的方式集成 Hudi 的主要原因有三個:

  1. 我們部分自己維護了 Flink 引擎,支撐了全公司的實時計算,從成本上考慮不想同時維護兩套計算引擎,尤其是在我們內部 Spark 版本也做了很多內部修改的情況下。
  2. Spark + Hudi 的集成方案主要有兩種 Index 方案可供選擇,但是都有劣勢:

    • Bloom Index:使用 Bloom Index 的話,Spark 會在寫入的時候,每個 task 都去 list 一遍所有的文件,讀取 footer 內寫入的 Bloom 過濾數據,這樣會對我們內部壓力已經非常大的 HDFS 造成非常恐怖的壓力。
    • Hbase Index:這種方式倒是可以做到 O(1) 的找到索引,但是需要引入外部依賴,這樣會使整個方案變的比較重。
  3. 我們需要和 Flink 增量處理的框架進行對接。

3. Flink + Hudi 集成的優化

3.1 Hudi 0.8 版本集成 Flink 方案

img

針對 Hudi 0.8 版本集成暴露出來的問題,B站和社區合作進行了優化與完善。

3.2 Bootstrap State 冷啟動

背景:支持在已經存在 Hudi 表啟動 Flink 任務寫入,從而可以做到由 Spark on Hudi 到 Flink on Hudi 的方案切換

原方案:

img

問題:每個 Task 處理全量數據,然后選擇屬於當前 Task 的 HoodieKey 存入 state 優化方案。

img

  • 每個 Bootstrap Operator 在初始化時,加載屬於當前 Task 的 fileId 相關的 BaseFile 和 logFile;
  • 將 BaseFile 和 logFile 中的 recordKey 組裝成 HoodieKey,通過 Key By 的形式發送給 BucketAssignFunction,然后將 HoodieKey 作為索引存儲在 BucketAssignFunction 的 state 中。

效果:通過將 Bootstrap 功能單獨抽出一個 Operator,做到了索引加載的可擴展性,加載速度提升 N (取決於並發度) 倍。

3.3 Checkpoint 一致性優化

背景:在 Hudi 0.8 版本的 StreamWriteFunction 中,存在極端情況下的數據一致性問題。

原方案:

img

問題:CheckpointComplete不在CK生命周期內,存在CK成功但是instant沒有commit的情 況,從而導致出現數據丟失。

優化方案:

img

3.4 Append 模式支持及優化

背景:Append 模式是用於支持不需要 update 的數據集時使用的模式,可以在流程中省略索引、 合並等不必要的處理,從而大幅提高寫入效率。

img

主要修改:

  • 支持每次 FlushBucket 寫入一個新的文件,避免出現讀寫的放大;
  • 添加參數,支持關閉 BoundedInMemeoryQueue 內部的限速機制,在 Flink Append 模式下只需要將 Queue 的大小和 Bucket buffer 設置成同樣的大小就可以了;
  • 針對每個 CK 產生的小文件,制定自定義 Compaction 計划;
  • 通過以上的開發和優化之后,在純 Insert 場景下性能可達原先 COW 的 5 倍。

三、Hudi 任務穩定性保障

1. Hudi 集成 Flink Metrics

通過在關鍵節點上報 Metric,可以比較清晰的掌握整個任務的運行情況:

img

img

2. 系統內數據校驗

img

3. 系統外數據校驗

img

四、數據入湖實踐

1. CDC數據入湖

1.1 TiDB入湖方案

由於目前開源的各種方案都沒辦法直接支持 TiDB 的數據導出,直接使用 Select 的方式會影響數 據庫的穩定性,所以拆成了全量 + 增量的方式:

  1. 啟動 TI-CDC,將 TIDB 的 CDC 數據寫入對應的 Kafka topic;
  2. 利用 TiDB 提供的 Dumpling 組件,修改部分源碼,支持直接寫入 HDFS;
  3. 啟動 Flink 將全量數據通過 Bulk Insert 的方式寫入 Hudi;
  4. 消費增量的 CDC 數據,通過 Flink MOR 的方式寫入 Hudi。

1.2 MySQL 入湖方案

MySQL 的入湖方案是直接使用開源的 Flink-CDC,將全量和增量數據通過一個 Flink 任務寫入 Kafka topic:

  1. 啟動 Flink-CDC 任務將全量數據以及 CDC 數據導入 Kafka topic;
  2. 啟動 Flink Batch 任務讀取全量數據,通過 Bulk Insert 寫入 Hudi;
  3. 切換為 Flink Streaming 任務將增量 CDC 數據通過 MOR 的方式寫入 Hudi。

img

2. 日志數據增量入湖

  • 實現 HDFSStreamingSource 和 ReaderOperator,增量同步 ODS 的數據文件,並且通過寫入 ODS 的分區索引信息,減少對 HDFS 的 list 請求;
  • 支持 transform SQL 配置化,允許用戶進行自定義邏輯轉化,包括但不限於維表 join、自定義 udf、按字段分流等;
  • 實現 Flink on Hudi 的 Append 模式,大幅提升不需要合並的數據寫入速率。

img

五、增量數據湖平台收益

  • 通過 Flink 增量同步大幅度提升了數據同步的時效性,分區就緒時間從 2:00~5:00 提前到 00:30 分內;
  • 存儲引擎使用 Hudi,提供用戶基於 COW、MOR 的多種查詢方式,讓不同用戶可以根據自己 的應用場景選擇合適的查詢方式,而不是單純的只能等待分區歸檔后查詢;
  • 相較於之前數倉的 T+1 Binlog 合並方式,基於 Hudi 的自動 Compaction 使得用戶可以將 Hive 當成 MySQL 的快照進行查詢;
  • 大幅節約資源,原先需要重復查詢的分流任務只需要執行一次,節約大約 18000 core。

六、社區貢獻

上述優化都已經合並到 Hudi 社區,B站在未來會進一步加強 Hudi 的建設,與社區一起成⻓。

部分核心PR

Log in - ASF JIRA

Log in - ASF JIRA

Log in - ASF JIRA

Log in - ASF JIRA

Log in - ASF JIRA

Log in - ASF JIRA

Log in - ASF JIRA

七、未來的發展與思考

  • 平台支持流批一體,統一實時與離線邏輯;
  • 推進數倉增量化,達成 Hudi ODS -> Flink -> Hudi DW -> Flink -> Hudi ADS 的全流程;
  • 在 Flink 上支持 Hudi 的 Clustering,體現出 Hudi 在數據組織上的優勢,並探索 Z-Order 等加速多維查詢的性能表現;
  • 支持 inline clustering。

原文鏈接
本文為阿里雲原創內容,未經允許不得轉載。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM