1.背景
在0.10.1版本下,用默認的index(FLINK_STATE),在upsert的模式下,幾十億級別的數據更新會很消耗內存以及ckp時過長,因此切換到0.11.0的BUCKET索引;
僅對於當前環境:flink1.13.2 + hudi 0.11.0-(master 2022.04.11) + cow + hdfs;
關鍵配置項:'index.type' = 'BUCKET', 'hoodie.bucket.index.num.buckets' = '256'
關鍵詞 hudi cow FLINK BUCKET FLINK_STATE
2.BUCKET與FLINK_STATE的區別
FLINK_STATE: 簡單的說,hudi的upsert模式需要指定主鍵組,更新時是按照主鍵進行更新的,而數據是存在於hdfs文件上的,那么主鍵與文件名的映射就是必須的 => 依托Flink特性,存在state里面;因此程序第一次加載hudi表的歷史數據時,需要設置 'index.bootstrap.enabled' = 'true' 來加載歷史數據到state里,更新可以跨分區;
BUCKET:簡單的說,就是”基於文件的分桶“,比如設置主鍵為id,桶個數256('hoodie.bucket.index.num.buckets' = '256'),那么計算bucket序號的方法就是(id.hashCode() & Integer.MAX_VALUE) % 256;
而且一旦設置,桶(buckets)的個數是不能變的,對應文件個數是不變的 => 預估數據量來保證合理的文件數量與大小,減少小文件或過度寫放大(因為文件個數不變,單個文件大小會一直增大);優點:無內存(僅指flink|Managed Memory)占用,缺點:文件IO帶來cpu壓力會升高;
BUCKET是基於單個文件的設置,因此不能跨分區;
tips:bucket個數預估可以使用離線導數據看hdfs文件大小來預估;
總結:FLINK_STATE占內存,初始化加載歷史數據慢,可跨分區;BUCKET占磁盤,不可跨分區,省內存;
3.相關配置
flink實時流配置
'connector' = 'hudi',
'path' = 'hdfs://path/',
'index.type' = 'BUCKET', -- bucket索引
'hoodie.parquet.compression.codec'= 'snappy',
'table.type' = 'COPY_ON_WRITE',
'write.operation' = 'upsert',
'write.task.max.size' = '2048',
'write.precombine' = 'true',
'write.precombine.field' = 'update_time',
'write.tasks' = '6',
'write.bucket_assign.tasks' = '6',
'hoodie.bucket.index.hash.field' = 'id', -- 主鍵
'hoodie.bucket.index.num.buckets' = '256', -- 桶個數
'hive_sync.enable'='true',
'hive_sync.table'='TABLE_NAME',
'hive_sync.db'='DB_NAME',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://HOST:9083',
'hive_sync.skip_ro_suffix' = 'true',
'write.insert.cluster' = 'true',
'write.ignore.failed' = 'true',
'clean.async.enabled' = 'true',
'clean.retain_commits' = '3',
'hoodie.cleaner.commits.retained' = '3',
'hoodie.keep.min.commits' = '4',
'hoodie.keep.max.commits' = '8'
Flink離線導入數據配置
'connector' = 'hudi',
'path' = 'hdfs://PATH',
'hoodie.parquet.compression.codec'= 'snappy',
'index.type' = 'BUCKET',
'table.type' = 'COPY_ON_WRITE',
'write.operation' = 'bulk_insert',
'write.tasks' = '2',
'hoodie.bucket.index.num.buckets' = '256',
'hoodie.bucket.index.hash.field' = 'id'
離線導入完成后,觀察hdfs文件前八位為數字,例如00000000-,00000255-,即設置成功,然后可直接接入實時數據;
注意:從hive導數據到hudi,可以調整一下hive source的並行度:
tableConfig.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, source_parallelism_max)
4.性能小結
實時情況:基於目前的數據量,單文件(80M)操作在100ms左右:eg: block read in memory in 171 ms. row count = 617384,十幾張表每次ckp三四分鍾左右,對於我們十幾分鍾的ckp來說可以接受;
離線導數據情況:對於億級別數據的離線導入,資源不算大,十幾分鍾就導入完成了;
注:如果ckp設置太小,cow表情況下,頻繁操作bucket文件,會對集群cpu load產生壓力;