Hive增量更新方案
方案一(總結出來業界可行方案):
1、Hive原始表提前規划好以時間分區,初始化裝載源庫記錄為base_table(最新數據)
2、每個相關表都會有一個timestamp列,對每一行操作做了修改,都會重置這列timestamp為當前時間戳;
3、新增數據通過sqoop(支持當天抽取)或者導出當天文件兩種形式,抽取或導入到hive表,記錄為新增表incremental_table
4、(1)如果incremental_table表中數據不涉及到更新,直接導入到以時間分區的base_table表中 (2)如果某幾個分區涉及到更新操作,將這段時間分區內的base_table和incremental_table數據進行合並,有相同主鍵的取timestamp最新的那條,合並后新數據重新寫回base_table分區中;
(3)如果涉及到刪除數據操作,分軟刪除(打標簽)和硬刪除;如果是軟刪除,記錄數據也會存在incremental_table,只是在合並base_table和incremental_table表中過濾掉此記錄寫回base_table即可。如果是硬刪除,需將刪除的數據行放入額外審計表中audit_table,與base_table和incremental_table一起進行合並過濾處理后,再寫回base_table。
采用Oozie、NiFi或者Shell腳本等方式,將上述流程統一做成一個工作流,方便調度。
方案二(如有業務訴求,需要對Hive表具體某條記錄進行delete或update操作):
如果一個HIVE表要實現update和delete功能,該表就必須支持ACID,需開啟ACID,同時必須滿足以下條件:
1、表的存儲格式必須是ORC(STORED AS ORC);
2、表必須進行分桶(CLUSTERED BY (col_name, col_name, ...) INTO num_buckets BUCKETS);
3、Table property中參數transactional必須設定為True(tblproperties('transactional'='true'));
4、以下配置項必須被設定:
Client端:
hive.support.concurrency – true
hive.enforce.bucketing – true
hive.exec.dynamic.partition.mode – nonstrict
hive.txn.manager – org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
服務端:
hive.compactor.initiator.on – true
hive.compactor.worker.threads – 1
hive.txn.manager – org.apache.hadoop.hive.ql.lockmgr.DbTxnManager(經過測試,服務端也需要設定該配置項)