hive拉鏈表優化·百億量級數據支持准實時更新


優化源於痛點(┬_┬)

有沒有痛點取決於業務場景的需求;有多痛取決於當前方案對業務的契合度

讓我們從業務場景①、當前方案②切入,聯立①②來推導當前痛點③吧!

話不多說,開始分析

 

①業務場景:

1.表的數據量很大,時間長了可能會到百億級的數據

2.表中的部分數據需要更新

3.需要查看歷史變更記錄

4.更新數量很低,但更新頻率可能比較高

 

②當前方案:

采用了hive的拉鏈表,講這個的博客比較多,我只講一講操作。我們現在是每天指定時間執行一次拉鏈表的操作,更改全部走kafka,從接口讀到更新后存入kafka等待明日執行更新,使用的時候從代碼中根據最新時間和狀態字段對數據進行過濾

 

③當前缺陷:

1.無法實現准實時。看似能實現准實時,但其實只能以日更新的方式跑批。因為在百億這個數據量級下,如果使用time來分辨每條數據的最終時間,數據分析的時間成本會過於高昂,只能使用類似於20200811這樣的dt(datatime)作為分區字段來尋求較快的對比,因此,只能以每天執行一次任務的形式跑批(如果數據量較小,可以實現實時,但是數據量較小的話也沒必要用hive,所以...)

2.每次數據分析的時候都要對數據狀態進行過濾,非常浪費算力,還會嚴重拉慢分析速度,分析job多的話會很頭疼

3.每條數據分析都要加相關篩選,以前的sql改起來直接炸裂

ps:另一種方案是每天都從這張表里拉出一張新的干凈表,這樣可以規避2,3,但是會新增一個問題,就是每天都需要浪費內存和算力以及時間去維護一張新表,且依然無法實現准實時

 

那么問題來了,是否存在什么更合適的方案來解決這些痛點呢?

我想,是有的。

根據痛點③,反推我們的預期目標④;

根據目標④,嘗試推導出優化思路⑤;

落地思路⑤,成為最終的優化方案⑥

 

④預期目標:

1.實現准實時,能夠在接口更新數據結束后半小時內完成數據的更新

2.實現物理刪除,讓分析任務縱享絲滑

3.更改盡可能小,最好不要對上下層的操作有任何影響

 

⑤優化思路:

1.從痛點來看,不能實現准實時的主要原因是更新數據和干凈數據混在一張表里,篩選步驟嚴重影響了計算效率。因此,我們首先要將更新數據和干凈的預期數據分離成兩張表

2.如果不希望影響前后過程,那么我們期望實現物理刪除。全量的數據覆蓋是我們無法接受的,但如果數據按照分區分區較為均勻,每個分區的數據量我們都可以接受,對若干個分區的數據做覆蓋,HDFS表示:小菜一碟。因此,我們需要找出一個能讓數據分布較為均勻的分區字段,一般情況下,就是dt。接着,按照分區進行覆蓋,就能實現物理刪除啦!

 

⑥優化方案:

我們先把百億級的數據分為兩種情況

1.由時間累計產生的海量數據,數據分布較為均勻

2.一次性導入大量數據,數據分布不均勻

第2種情況可以通過以hash取模的結果來強制均衡數據,或者hive分桶的思路或許也可以(這塊我還不太熟練,暫且一提)。第1種情況是我們的理想數據,可以直接按照dt來分區。下面,我按照第1種情況來分析,用同樣的方法略微更換分區方法就可以解決第2種情況。

test:原表,存儲海量數據,數據按照分區字段dt相對均勻的分布

zipper:增量表,保存所有來自接口的更新信息,字段對比原表增加一個statu,0代表新增,1代表刪除。如果沒有modify_time,增加這個字段,如果有,直接用

①創建臨時表delete,start_time為kafka開始接入數據的時間,modify_time為寫入增量表的時間,開始執行時間為kafka停止接入數據后的n分鍾(根據實際需求自行設置),以防數據頻繁接入觸發程序一直執行

with delete as (
select id from zipper where modify_time>=start_time and statu=1
)

②從delete表獲取原表對應的dt分區,hive本身不支持直接in,但是可以用left semi join來代替,這個寫法的性能優於直接join

delete_dt as (
select dt from 
test left semi join delete
on test.id=delete.id
)

③從dt分區搜索出所有不需要刪除的數據,覆蓋寫入原表對應分區,實現物理刪除。hive本身不支持直接not in,但是可以用where not exists來替代該邏輯。此處如果有人有更高效的方法,還請告知,我想進一步優化但是沒能成功

dt_data as (
select * from test left semi join delete_dt
on test.dt=delete_dt.dt
)
insert overwrite test
select 原表字段們 from dt_data
where not exists (select id from delete where delete.id = dt_data.id)

④上面已經處理完刪除數據,接下來從zipper獲取新增數據,並插入原表對應分區

insert into test 
select 原表字段們
from zipper where time>=start_time and statu=0;

最終代碼

with delete as (
select id from zipper where statu=1
),
delete_dt as (
select dt from 
test left semi join delete
on test.id=delete.id
),
dt_data as (
select * from test left semi join delete_dt
on test.dt=delete_dt.dt
)
select * from dt_data
where not exists (select id from delete where delete.id = dt_data.id);     

以上就是我的優化方案,所有sql均在spark.sql中執行,優點如下:

1.分析任務不受任何影響

2.每次執行覆蓋的數據少,執行快,可以結合kafka實現准實時更新;

3.需要更新的數據量不大,存儲所有變更記錄的zipper表不會有太多數據,查詢變更記錄方便,原表也一直是干凈的,不需要做后續維護;

4.具備強一致性,通過java統一調度,規避了凌晨跑完該任務后,白天遇到更新要求,ES、Arango實時更新后,執行分析任務時再次寫入ES、Arango,導致數據不一致的情況

5.全程最多為原表新增一個分區字段,完全不影響業務上下游

 

以上就是本次優化從思考到實現的全過程啦,希望大家喜歡(≧▽≦)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM