簡介: 介紹了順豐科技數倉的架構,趟過的一些問題、使用 Hudi 來優化整個 job 狀態的實踐細節,以及未來的一些規划。
本文作者為劉傑,介紹了順豐科技數倉的架構,趟過的一些問題、使用 Hudi 來優化整個 job 狀態的實踐細節,以及未來的一些規划。主要內容為:
- 數倉架構
- Hudi 代碼躺過的坑
- 狀態優化
- 未來規划
順豐科技早在 2019 年引入 Hudi ,當時是基於 Spark 批處理,2020 年對數據的實時性要求更高公司對架構進行了升級,在社區 Hudi on Flink 的半成品上持續優化實現 Binlog 數據 CDC 入湖。在 Hudi 社區飛速發展的同時公司今年對數倉也提出了新的要求,最終采用 Flink + Hudi 的方式來寬表的實時化。過程中遇到了很多問題主要有兩點:
- Hudi Master 代碼當時存在一些漏洞;
- 寬表涉及到多個 Join,Top One 等操作使得狀態很大。
慶幸的是社區的修復速度很給力加上 Hudi 強大 upsert 能力使這兩個問題得到以有效的解決。
一、數倉架構
感興趣的同學可以參考之前順豐分享的
二、Hudi 代碼趟過的坑
在去年我們是基於 Hudi 0.6 左右進行的 Hudi on Flink 的實踐,代碼較老。為了擁抱社區我們使用最新 master 代碼進行實踐,在大數據量寫入場景中,發現了一個比較隱秘的丟數問題,這個問題花了將近兩周的時間才定位到。
1. Hudi StreamWriteFunction 算子核心流程梳理
flush 會調用 client 的 api 去創建一個 WriteHandle,然后把 WriteHandle 放入 Map 進行緩存,一個 handle 可以理解為對應一個文件的 cow。
如果一個 fileld 在同一 checkpoint 期間被多次寫入,則后一次是基於前一次的 cow, 它的 handle 是一個 FlinkMergeAndReplaceHandle
,判斷一個 fileld 是否之前被寫入過就是根據上面 Map 緩存得來的。
StreamWriteFunction
執行 snapshotState 時會把內存的所有分組數據一次進行 flush, 之后對 client 的 handle 進行清空。
2. 場景還原
Hudi 本身是具備 upsert 能力的,所以我們開始認為 Hudi Sink 在 At Least Once 模式下是沒問題的,並且 At Least Once 模式下 Flink 算子不需要等待 Barrier 對齊,能夠處理先到的數據使得處理速度更快,於是我們在 Copy On Write 場景中對 Flink CheckpointingMode 設置了 AT_LEAST_ONCE。
writeFunction 的上游是文件 BucketAssignFunction
fileld 分配算子,假如有一批 insert 數據 A、B、C、D 屬於同一個分區並且分配到同一個 BucketAssignFunction
的 subtask ,但是 A、B 和 C、D 是相鄰兩個不同的 checkpoint。
當 A 進入 BucketAssignFunction
時如果發現沒有新的小文件可以使用,就會創建一個新的 fileld f0,當 B 流入時也會給他分配到 f0 上。同時因為是 AT_LEAST_ONCE 模式,C、D 數據都有可能被處理到也被分配到了 f0 上。也就是說 在 AT_LEAST_ONCE 模式下由於 C、D 數據被提前處理,導致 A、B、C、D 4 條屬於兩個 checkpoint 的 insert 數據被分配到了同一個 fileld。
writeFunction 有可能當接收到 A、B、C 后這個算子的 barrier 就對齊了,會把 A、B、C 進行 flush,而 D 將被遺留到下一個 checkpoint 才處理。A、B、C 是 insert 數據所以就會直接創建一個文件寫入,D 屬於下一個 checkpoint ,A、B、C 寫入時創建的 handle 已被清理了,等到下一個 checkpoint 執行 flush。因為 D 也是 insert 數據所以也會直接創建一個文件寫數據,但是 A、B、C、D 的 fileld 是一樣的,導致最終 D 創建的文件覆蓋了 A、B、C 寫入的文件最終導致 A、B、C 數據丟失。
這個問題之所以難定位是因為具有一定隨機性,每次丟失的數據都不太一樣,而且小數據量不易出現。最終通過開啟 Flink 的 Queryable State 進行查詢, 查找丟失數據的定位到 fileld, 發現 ABCD state 的 instant 都是 I,然后解析對應 fileld 的所有版本進行跟蹤還原。
三、狀態優化
我們對線上最大的離線寬邊進行了實時化的,寬表字段較多,涉及到多個表對主表的 left join 還包括一些 Top One 的計算,這些算子都會占用 state. 而我們的數據周期較長需要保存 180 天數據。估算下來狀態大小將會達到上百 T,這無疑會對狀態的持久化帶來很大的壓力。但是這些操作放入 Hudi 來做就顯得輕而易舉。
1. Top One 下沉 Hudi
在 Hudi 中有一個 write.precombine.field
配置項用來指定使用某個字段對 flush 的數據去重,當出現多條數據需要去重時就會按照整個字段進行比較,保留最大的那條記錄,這其實和 Top One 很像。
我們在 SQL 上將 Top One 的排序邏輯組合成了一個字段設置為 Hudi 的 write.precombine.field
,同時把這個字段寫入 state,同一 key 的數據多次進來時都會和 state 的 write.precombine.field
進行比較更新。
Flink Top One 的 state 默認是保存整記錄的所有字段,但是我們只保存了一個字段,大大節省了 state 的大小。
2. 多表 Left Join 下沉 Hudi
2.1 Flink SQL join
我們把這個場景簡化成如下一個案例,假如有寬表 t_p 由三張表組成
在 Flink SQL join 算子內部會維護一個左表和右表的 state,這都是每個 table 的全字段,且多一次 join 就會多出一個 state. 最終導致 state 大小膨脹,如果 join 算子上游是一個 append 流,state 大小膨脹的效果更明顯。
2.2 把 Join 改寫成 Union All
對於上面案例每次 left join 只是補充了幾個字段,我們想到用 union all 的方式進行 SQL 改寫,union all 需要補齊所有字段,缺的字段用 null 補。我們認為 null 補充的字段不是有效字段。 改成從 union all 之后要求 Hudi 具備局部更新的能力才能達到 join 的效果。
- 當收到的數據是來自 t0 的時候就只更新 id 和 name 字段;
- 同理 ,數據是來自 t1 的時候就只更新 age 字段;
- t2 只更新 sex 字段。
不幸的是 Hudi 的默認實現是全字段覆蓋,也就是說當收到 t0 的數據時會把 age sex 覆蓋成 null, 收到 t1 數據時會把 name sex 覆蓋成 null。這顯然是不可接受的。這就要求我們對 Hudi sink 進行改造。
2.3 Hudi Union All 實現
Hudi 在 cow 模式每條記錄的更新寫入都是對舊數據進行 copy 覆蓋寫入,似乎只要知道這條記錄來自哪個表,哪幾個字段是有效的字段就選擇性的對 copy 出來的字段進行覆蓋即可。但是在分區變更的場景中就不是那么好使了。在分區變更的場景中,數據從一個分區變到另一個分區的邏輯是把舊分區數據刪掉,往新分區新增數據。這可能會把一些之前局部更新的字段信息丟失掉。細聊下來 Hudi on Flink 涉及到由幾個核心算子組成 pipeline。
- RowDataToHoodieFunction:這是對收入的數據進行轉化成一個 HudiRecord,收到數據是包含全字段的,我們在轉化 HudiRecord 的時候只選擇了有效字段進行轉化。
- BoostrapFunction:在任務恢復的時候會讀取文件加載索引數據,當任務恢復后次算子不做數據轉化處理。
- BucketAssignFunction:這個算子用來對記錄分配 location,loaction 包含兩部分信息。一是分區目錄,另一個是 fileld。fileld 用來標識記錄將寫入哪個文件,一旦記錄被確定寫入哪個文件,就會發記錄按照 fileld 分組發送到 StreamWriteFunction,StreamWriteFunction 再按文件進行批量寫入。
原生的 BucketAssignFunction 的算子邏輯如下圖,當收到一條記錄時會先從 state 里面進行查找是否之前有寫過這條記錄,如果有就會找對應的 location。如果分區沒有發生變更,就把當前這條記錄也分配給這個location,如果在 state 中沒有找到 location 就會新創建一個 location,把這個新的location 分配給當前記錄,並更新到 state。
總之這個 state 存儲的 location 就是告訴當前記錄應該從哪個文件進行更新或者寫入。遇到分區變更的場景會復雜一點。假如一條記錄從 2020 分區變更成了 2021,就會創建一條刪除的記錄,它的 loaction 是 state 中的 location。這條記錄讓下游進行實際的刪除操作,然后再創建一個新的 location (分區是 2021) 發送到下游進行 insert。
對於 StreamWriteFunction 在 Insert 場景中,假如收到了如下 3 條數據 {id:1,name:zs},{id:1,age:20},{id:1,sex:man},在執行 flush 時會創建一個全字段的空記錄 {id:null,name:null,age:null,sex:null},然后依次和 3 條記錄進行合並。注意,這個合並過程只會選擇有效字段的合並。如下圖:
四、未來規划
parquet 元數據信息收集,parquet 文件可以從 footer 里面得到每個行列的最大最小等信息,我們計划在寫入文件的后把這些信息收集起來,並且基於上一次的 commit 的元數據信息進行合並,生成一個包含所有文件的元數據文件,這樣可以在讀取數據時進行謂詞下推進行文件的過濾。
公司致力於打造基於 Hudi 作為底層存儲,Flink 作為流批一體化的 SQL 計算引擎,Flink 的批處理 Hudi 這塊還涉足不深,未來可能會計划用 Flink 對 Hudi 實現 clustering 等功能,在 Flink 引擎上完善 Hudi 的批處理功能。
原文鏈接
本文為阿里雲原創內容,未經允許不得轉載。