1. Iceberg構建數據湖
核心思想
在時間軸上跟蹤表的所有變化;
- 快照表示表數據文件的一個完整集合;
- 每次更新操作會生成一個新的快照;
特性
① 優化數據入庫流程
- Iceberg提供ACID事務能力,上游數據寫入即可見,不影響當前數據處理任務,這大大簡化了ETL;
- Iceberg提供upsert/merge into能力,可以極大地縮小數據入庫延遲;
② 支持更多的分析引擎
- 優秀的內核抽象使之不綁定特定引擎,目前在支持的有spark、flink、presto、hive;
- Iceberg提供了java native API,不用特定引擎也可以訪問Iceberg表;
③ 統一數據存儲和靈活的文件組織
- 提供了基於流式的增量計算模型和基於批處理的全量表計算模型,批任務和流任務可以使用相同 的存儲模型(HDFS、OZONE),數據不再孤立;
- Iceberg支持隱藏分區和分區進化,方便業務進行數據分區策略更新;
- 支持Parquet、ORC、Avro行列存兼顧;
④ 增量讀取處理能力
- Iceberg支持通過流式方式讀取增量數據;
- Spark struct streaming支持;
- Flink table source支持;
文件發布
Apache Iceberg的實現細節
快照設計方式
快照隔離
• 讀操作僅適用當前已生成快照
• 寫操作會生成新的隔離快照,並在寫完成后原子性提交 l
對於文件列表的所有修改都是原子操作
• 在分區中追加數據
• 合並或是重寫分區
元數據組織形式
實現基於快照的跟蹤方式;
• 記錄表的結構,分區信息,參數等
• 跟蹤老的快照以確保能夠最終回收
表的元數據是不可修改的, 並且始終向前迭代;
當前的快照可以回退;
事務性提交
寫操作必須
• 記錄當前元數據的版本--base version
• 創建新的元數據以及manifest文件
• 原子性的將base version 替換為新的版本
原子性替換保證了線性的歷史
原子性替換需要依靠以下操作來保證
沖突解決--樂觀鎖
• 假定當前沒有其他的寫操作
• 遇到沖突則基於當前最新的元數據進行重試
• 元數據管理器所提供的能力
• HDFS或是本地文件系統所提供的原子化的rename能力
2. IceBerg結合Flink的應用場景
場景① 構建近實時Data Pipeline
IceBerg提供了增量拉取的能力,類似Huidi增量查詢的方式;IceBerg可以做到分鍾級別;
場景② CDC數據實時攝入攝出
FlinkCDC增量數據寫入IceBerg
場景③ 近實時場景的流批統一
原有的架構為lambda架構,分為離線鏈路和 實時鏈路;
場景④ 從Iceberg歷史數據啟動Flink任務
場景⑤ 通過Iceberg數據來訂正實時聚合結果
3. Apache Flink如何集成Apache Iceberg
如何對齊Flink和Iceberg的Schema ;
Flink SQL、Flink Table API、Iceberg API,它們之間類型互相對比映射,Iceberg屏蔽了跟底下文件交互的細節,只需考慮上層即可。
Flink 記錄如何寫入 Iceberg 表的 AVRO 文件 ?
實現表級別的語義,Flink中每個字段都給它設計成了一個writer的樣式,ID Int -> IntWriter... 形成 一顆樹形結構,再遍歷寫入iceberg;
如何設計 Iceberg Sink 的 Operator ?
如何設計flink中的算子才能sink到iceberg中,並且實現exactly-once語義、容錯機制(checkpoint、state)等功能;
每個並發寫入為一個IcebergStreamWriter,寫到了兩個partition,但是還沒commit提交,這時可以設計一個IcebergFilesCommitter(單並發節點來進行提交,如果有多個commit提交會有沖突),統一提交給Iceberg;
Flink算子的State,保存數據的狀態(就可以實現復雜的業務邏輯),spark中沒有狀態,但特殊算子如updateStateBykey才會有狀態;
如何設計 Operator 的 State ?
如何在failover的時候保證 exactly-once 語義?
如果中途某次checkpoint失敗,其他checkpoint都正常。 如何設計來保證數據不丟且語義正確?
兩個算子 IcebergStreamWriter不給state為空(因為它會把寫成功 的數據文件傳給FileCommitter), IceberFileCommitter有state 它做checkpoint時,保存到flink中的stateBackend中,再提交到Iceberg中的表中;
StateBackend是flink中存儲state的方式,checkpoint也會存儲到StateBackend中;