優化源於痛點(┬_┬)
有沒有痛點取決於業務場景的需求;有多痛取決於當前方案對業務的契合度
讓我們從業務場景①、當前方案②切入,聯立①②來推導當前痛點③吧!
話不多說,開始分析
①業務場景:
1.表的數據量很大,時間長了可能會到百億級的數據
2.表中的部分數據需要更新
3.需要查看歷史變更記錄
4.更新數量很低,但更新頻率可能比較高
②當前方案:
采用了hive的拉鏈表,講這個的博客比較多,我只講一講操作。我們現在是每天指定時間執行一次拉鏈表的操作,更改全部走kafka,從接口讀到更新后存入kafka等待明日執行更新,使用的時候從代碼中根據最新時間和狀態字段對數據進行過濾
③當前缺陷:
1.無法實現准實時。看似能實現准實時,但其實只能以日更新的方式跑批。因為在百億這個數據量級下,如果使用time來分辨每條數據的最終時間,數據分析的時間成本會過於高昂,只能使用類似於20200811這樣的dt(datatime)作為分區字段來尋求較快的對比,因此,只能以每天執行一次任務的形式跑批(如果數據量較小,可以實現實時,但是數據量較小的話也沒必要用hive,所以...)
2.每次數據分析的時候都要對數據狀態進行過濾,非常浪費算力,還會嚴重拉慢分析速度,分析job多的話會很頭疼
3.每條數據分析都要加相關篩選,以前的sql改起來直接炸裂
ps:另一種方案是每天都從這張表里拉出一張新的干凈表,這樣可以規避2,3,但是會新增一個問題,就是每天都需要浪費內存和算力以及時間去維護一張新表,且依然無法實現准實時
那么問題來了,是否存在什么更合適的方案來解決這些痛點呢?
我想,是有的。
根據痛點③,反推我們的預期目標④;
根據目標④,嘗試推導出優化思路⑤;
落地思路⑤,成為最終的優化方案⑥
④預期目標:
1.實現准實時,能夠在接口更新數據結束后半小時內完成數據的更新
2.實現物理刪除,讓分析任務縱享絲滑
3.更改盡可能小,最好不要對上下層的操作有任何影響
⑤優化思路:
1.從痛點來看,不能實現准實時的主要原因是更新數據和干凈數據混在一張表里,篩選步驟嚴重影響了計算效率。因此,我們首先要將更新數據和干凈的預期數據分離成兩張表
2.如果不希望影響前后過程,那么我們期望實現物理刪除。全量的數據覆蓋是我們無法接受的,但如果數據按照分區分區較為均勻,每個分區的數據量我們都可以接受,對若干個分區的數據做覆蓋,HDFS表示:小菜一碟。因此,我們需要找出一個能讓數據分布較為均勻的分區字段,一般情況下,就是dt。接着,按照分區進行覆蓋,就能實現物理刪除啦!
⑥優化方案:
我們先把百億級的數據分為兩種情況
1.由時間累計產生的海量數據,數據分布較為均勻
2.一次性導入大量數據,數據分布不均勻
第2種情況可以通過以hash取模的結果來強制均衡數據,或者hive分桶的思路或許也可以(這塊我還不太熟練,暫且一提)。第1種情況是我們的理想數據,可以直接按照dt來分區。下面,我按照第1種情況來分析,用同樣的方法略微更換分區方法就可以解決第2種情況。
test:原表,存儲海量數據,數據按照分區字段dt相對均勻的分布
zipper:增量表,保存所有來自接口的更新信息,字段對比原表增加一個statu,0代表新增,1代表刪除。如果沒有modify_time,增加這個字段,如果有,直接用
①創建臨時表delete,start_time為kafka開始接入數據的時間,modify_time為寫入增量表的時間,開始執行時間為kafka停止接入數據后的n分鍾(根據實際需求自行設置),以防數據頻繁接入觸發程序一直執行
with delete as (
select id from zipper where modify_time>=start_time and statu=1
)
②從delete表獲取原表對應的dt分區,hive本身不支持直接in,但是可以用left semi join來代替,這個寫法的性能優於直接join
delete_dt as (
select dt from
test left semi join delete
on test.id=delete.id
)
③從dt分區搜索出所有不需要刪除的數據,覆蓋寫入原表對應分區,實現物理刪除。hive本身不支持直接not in,但是可以用where not exists來替代該邏輯。此處如果有人有更高效的方法,還請告知,我想進一步優化但是沒能成功
dt_data as (
select * from test left semi join delete_dt
on test.dt=delete_dt.dt
)
insert overwrite test
select 原表字段們 from dt_data
where not exists (select id from delete where delete.id = dt_data.id)
④上面已經處理完刪除數據,接下來從zipper獲取新增數據,並插入原表對應分區
insert into test
select 原表字段們
from zipper where time>=start_time and statu=0;
最終代碼
with delete as (
select id from zipper where statu=1
),
delete_dt as (
select dt from
test left semi join delete
on test.id=delete.id
),
dt_data as (
select * from test left semi join delete_dt
on test.dt=delete_dt.dt
)
select * from dt_data
where not exists (select id from delete where delete.id = dt_data.id);
以上就是我的優化方案,所有sql均在spark.sql中執行,優點如下:
1.分析任務不受任何影響
2.每次執行覆蓋的數據少,執行快,可以結合kafka實現准實時更新;
3.需要更新的數據量不大,存儲所有變更記錄的zipper表不會有太多數據,查詢變更記錄方便,原表也一直是干凈的,不需要做后續維護;
4.具備強一致性,通過java統一調度,規避了凌晨跑完該任務后,白天遇到更新要求,ES、Arango實時更新后,執行分析任務時再次寫入ES、Arango,導致數據不一致的情況
5.全程最多為原表新增一個分區字段,完全不影響業務上下游
以上就是本次優化從思考到實現的全過程啦,希望大家喜歡(≧▽≦)