-
背景:
- 2017 年,阿里內部 MaxCompute 集群上游 200 多萬個任務,每天存儲資源、計算資源消耗都很大。如何降低計算資源的消耗,提高任務執行的性能,提升任務產出的時間,是計算平台和 ETL 開發工程師孜孜追求的目標。
一、系統優化
- 系統優化:通過分析計算系統的數據運行情況,判斷計算系統對內存、CPU、Instance 個數這些資源的運用是否合理;
- 合理:指計算時間更短,使用的資源更少;
- Hadoop 等分布式計算系統的資源評估:
- 評估方式:根據 Map 任務的輸入的數據量進行靜態評估;
- 評估產出:是否存在長尾任務;
- 長尾:多個 Instance 同時計算數據,但由於各個 Instance 上的數據量分布不均勻等情況,使得有的 Instance 先計算完,而有的要等執行很久,計算完成時間相差很大;
- 弊端 / 問題:
- 對於普通的 Map 任務,評估一般符合預期;而對於 Reduce 任務,其輸入來自於 Map 的輸出,但進行評估是也是根據 Map 任務的輸出進行,這樣評估的結果經常和實際需要的資源數相差很大;
- 解決方案:
- 在任務穩定的情況下,基於任務的歷史執行情況進行評估,即采用 HBO(History - Based Optimizer,基於歷史的優化器);
- CBO(Cost - Based Optimizer,基於代價的優化器):
-
MaxCompute 的 CBO:
- 在 Oracle 的 CBO 的基礎上,改變了收集統計信息的方式:
- 采用各種抽樣統計算法,通過較少的資源獲得大量的統計信息,基於先進的優化模型,具備了完善的 CBO 能力;
- 優點:與傳統的大數據計算系統相比,性能提升明顯
- 在 Oracle 的 CBO 的基礎上,改變了收集統計信息的方式:
-
Oracle 的 CBO:
- 評估方式:根據收集到的表、分區、索引等統計信息,計算每種執行方式的代價(Cost),進而選擇其中最優的(也就是代價最小的)執行方式;
- 優點:收集到的統計信息越多、越准確,Oracle 的 CBO 可能生出代價更小的執行計划;
- 弊端 / 問題:
- 對表和列上統計信息的收集需要付出代價,尤其是在大數據環境下,表的體量巨大,需要消耗大量的資源來收集統計信息;
- 消耗大量資源收集到的統計信息,其利用率卻很低;
-
1、HBO
- HBO(History - Based Optimizer,基於歷史的優化器),根據任務的歷史執行情況,為任務分配更合理的資源;
- 分配的資源:內存、CPU、Instance 個數;
- Instance :指操作系統中一系列的進程以及為這些進程所分配的內存塊;
- 分配的資源:內存、CPU、Instance 個數;
- HBO 是對集群資源分配的一種優化,概括起來就是:任務執行歷史 + 集群狀態信息 + 優化原則 → 更優的執行配置;
1/1)背景
-
MaxCompute 原資源分配策略
- MaxCompute 最初分配 MR 執行過程的 Instance個數的算法
- MR執行過程:Map 任務、Reduce 任務;
- 在 Instance 分配算法的基礎上,根據歷史數據統計各個 Instance 處理的數據量
- Map Instance
-
fivenum ( round ( rt$map_input_bytes / 1024 / 1024, 2) ) [1] 0.00 4.11 16.59 60.66 4921.94
-
- Reduce Instance
-
fivenum ( round ( rt$map_input_bytes / 1024 / 1024, 2) ) [1] 0.00 0.00 0.75 24.87 192721.83
-
- Join Instance
-
fivenum ( round ( rt$map_input_bytes / 1024 / 1024, 2) ) [1] 0.00 0.02 1.82 22.15 101640.31
-
- 從上面內容可以看出:
- 大部分的 Instance 處理的數據量遠遠沒有達到預期,即一個 Instance 處理 256 MB 的數據;
- 有些 Instance 處理的數據量很大,很容易導致任務長尾;
- 總結:默認的 Instance 算法下,小任務存在資源浪費,而大任務卻資源不足;(需要有更合理的資源分配方法,HBO 應運而生)
- Map Instance
- MaxCompute 最初分配 MR 執行過程的 Instance個數的算法
-
HBO 的提出
- 問題分析及解決思路:通過數據分析,發現在系統中存在大量的周期性調度的腳本(物理計划穩定),且這些腳本的輸入一般比較穩定,如果能對這部分腳本進行優化,那么對整個集群的計算資源的使用率會得到顯著提升;
- 使用 HBO,根據任務的執行歷史情況,為其分配更合理的計算資源;
- HBO 一般通過自適應調整系統參數,來達到控制計算資源的目的;
1/2)HBO 原理
- HBO 分配資源的步驟:
- 前提:最近 7 天內,任務代碼沒有發生變更,且任務運行 4 次;
- Instance 分配邏輯:基礎資源估算值 + 加權資源估算值;
- 最終的 Instance 個數為:基礎資源估算值 + 加權資源估算值;
- 加權資源:指對已經按分配邏輯進行分配好基礎資源的每個 Map Task / Reduce Task,再追加一部分資源;
-
基礎資源數量的邏輯
- 對於 Map Task:Map 數量、平均每個 Map 能處理的數據量
- 估算用戶提交的任務所需要的 Map 數量;
- 根據期望的每個 Map 能處理的數據量,再結合用戶提交任務的輸入數據量,估算出用戶提交的任務所需要的 Map 數量;
- 采用分層的方式,提供平均每個 Map 能處理的數據量;
- 分層的目的:為了保證集群上任務的整體吞吐量,保證集群的資源不會被一些超大任務占有;
- 估算用戶提交的任務所需要的 Map 數量;
- 對於 Reduce Task:Reduce 數量、平均每個 Reduce 能處理的數據量
- 計算 Reduce 數量的方法
- Hive 的計算方法:使用 Map 的輸入數據量計算 Reduce 的數量;
- MaxCompute 的計算方法:使用最近 7 天 Reduce 對應的 Map 的平均輸出數據量,作為 Reduce 的輸入數據量,用於計算 Reduce 的數量;
- 估算用戶提交的任務所需要的 Reduce 數量
- 根據期望的每個 Reduce 能處理的數據量,再結合用戶提交任務的 Reduce 的輸入數據量,估算出用戶提交的任務所需要的 Reduce 數量;
- 采用分層的方式,提供平均每個 Reduce 能處理的數據量;
- 分層的目的:為了保證集群上任務的整體吞吐量,保證集群的資源不會被一些超大任務占有;
- 計算 Reduce 數量的方法
- 對於 Map Task:Map 數量、平均每個 Map 能處理的數據量
-
加權資源數量的邏輯
- 加權資源:對每個 Map Task 和 Reduce Task,在其基礎資源的基礎上再追加一部分資源;
- 以下是追加原理:追加多少、怎么追加
- 對於 Map Task
- 系統先初始化期望的每個 Map 能處理的數據量;
- 拿該 Map 在最近一段時間內的平均處理速度,與系統設定的期望值做比較:
- 如果平均處理速度小於期望值,則按照同等比例對基礎資源數量進行加權,估算出該 Map 的加權資源數量;
- 一段時間:一般用最近 7 天;
- 對於 Reduce Task
- 方法同 Map Task;
-
CPU / 內存分配邏輯
- 類似於 Instance 分配邏輯:基礎資源估算值 + 加權資源估算值;
1/3)HBO 效果
- 提高 CPU 利用率
- 提高內存利用率
- 提高 Instance 並發數
- 降低執行時長
1/4)HBO 改進與優化
- 特殊任務場景:有些業務在特定場合下依舊有數據量暴漲的情況發生,如,“雙 11” 和 “雙 12” 期間,這個日常生成的 HBO 計划就不適用了;
- 解決方法:HBO 增加了根據數據量動態調整 Instance 數的功能,主要根據 Map 的數量增長情況進行調整;
2、CBO
- MaxCompute 引入的基於代價的優化器(CBO):根據收集的統計信息,計算每種執行方式的代價,進而選擇最優的執行方式;
2/1)優化器原理
- 優化器(Optimizer)引入了 Volcano 模型,該模型是基於代價的優化器(CBO),並且引入了重新排序 Join(Join Reorder)和自動 MapJoin(Auto MapJoin)優化規則等,同時基於 Volcano 模型的優化器會盡最大的搜索寬度來獲取最優計划;
- 優化器功能結構模塊:
- Meta Manager(元數據)、Statistics(統計信息)、Rule Set(優化規則集)、Volcano Planner Core(核心計划器)等,如下圖:
- Meta Manager(元數據)、Statistics(統計信息)、Rule Set(優化規則集)、Volcano Planner Core(核心計划器)等,如下圖:
-
Meta Manager
- 功能:提供元數據信息;
- 元數據信息:包括表的元數據、統計信息元數據等;以及一些基本的元數據,如是否是分區表、表有哪些列等;
- 當優化器在選擇計划時,需要根據元數據的一些信息進行優化;
- 如,表分區裁剪(TableScan Partition Prunning)優化時,需要通過 Meta 信息獲取表數據有哪些分區,然后根據過濾條件來裁剪分區。
- 功能:提供元數據信息;
-
Statistics
- 功能:提供准確的統計信息;
- 統計信息:如,表的 Count 值、列的 Distinct 值、TopN 值等;
- 收集統計信息:優化器提供了 UDF 來收集統計信息;(包括 Distinct 值、TopN 值等)
- 注:Count 值等信息是由 Meta 直接提供的;
- 優化器只有擁有准確的統計信息,才能計算出真正的最優的計划;
- 如,Join 是選擇 Hash Join 還是 Merge Join,優化器會根據 Join 的輸入數據量(即 Count 值)來進行選擇;
- 功能:提供准確的統計信息;
-
Rule Set
- 功能:選擇優化規則;
- 選擇:根據不同情況選擇不同的優化點,再由優化器根據代價模型(Cost Model)來選擇啟用哪些優化規則;
- 如,工程合並規則(Project Merge Rule):將臨近的兩個 Project 合並成一個 Project;
- 如,過濾條件下推規則(Filter Push Down):將過濾條件盡量下推,使得數據先進行過濾,再進行其他計算;(以較少其他操作的數據量)
- 選擇:根據不同情況選擇不同的優化點,再由優化器根據代價模型(Cost Model)來選擇啟用哪些優化規則;
- 優化規則分類:
- Substitute:被認為是優化了肯定好的規則;
- Explore Rule:優化后需要考慮多種優化結果;
- Build Rule:可以認為優化后的結果不能再次使用規則再進行優化;
- 所有的優化規則,都放在優化規則集中;
- MaxCompute 優化器中的優化規則,由用戶通過 set 等命令控制使用;
- 功能:選擇優化規則;
-
Volcano Planner Core
- 功能:將所有信息(Meta 信息、統計信息、優化規則)統一起來處理,然后根據代價模型的計算,獲得一個最優計划;
- 代價模型
- 功能 / 原理:代價模型根據不同操作符(如,Join、Project 等)計算出不同的代價,然后再計算出整個計划中最小代價的計划;
- MaxCompute 的代價模型提供的 Cost 由 3 個維度組成:行數、I / O 開銷、CPU 開銷;(3 個衡量標准)
- 通過這 3 個維度衡量每個一操作符的代價;
- 工作原理:
- 將需要輸入給 Planner 的數據,用 Compiler 解析為一個 “計划樹”,簡稱 “RelNode 樹”,樹的每個節點簡稱 RelNode;
- Volcano Planner 創建
- Planner 的創建:主要是將 Planner 在優化過程中所用到的信息傳遞給執行計划器;如規則集,用戶指定要使用的規則;
- 信息:RelNode 的 Meta 計算值、RelNode 的代價計算值;
- 信息的由來:
- Meta Provider:每個 RelNode 的 Meta 計算;
- 如,RowCount 值計算、Distinct 值計算等;
- 代價模型:計算每個 RelNode 的代價等;
- Meta Provider:每個 RelNode 的 Meta 計算;
- Planner 的創建:主要是將 Planner 在優化過程中所用到的信息傳遞給執行計划器;如規則集,用戶指定要使用的規則;
-
Planner 優化
-
Planner 的優化過程:
-
規則匹配(Rule Match)
- 規則匹配:指 RelNode 滿足規則的優化條件而建立的一種匹配關系;(就是給所有的 RelNode,在規則集中找相匹配的規則)
- 操作步驟:
- Planner 首先將整個 RelNode 樹的每一個 RelNode 注冊到 Planner 內部;同時在注冊過程中,在規則集中找到與每個 RelNode 匹配的規則,然后加入到規則應用(Rule Apply)的隊列中;
- 整個注冊過程處理結束后,所有與 RelNode 可以匹配的規則,全部加入到隊列中,以后應用時只要從隊列中取出來就可以了;
- Planner 首先將整個 RelNode 樹的每一個 RelNode 注冊到 Planner 內部;同時在注冊過程中,在規則集中找到與每個 RelNode 匹配的規則,然后加入到規則應用(Rule Apply)的隊列中;
-
規則應用(Rule Apply)
- 主要任務:優化每個規則隊列中的規則;
- 優化過程:
- 從規則隊列(Rule Queue)中彈出(Pop)一個已經匹配成功的規則進行優化;
- 如果優化成功后,會產生至少一個新的 RelNode;新的 RelNode 與未優化時的 RelNode 存在差異;
- 彈出:彈出一個規則后,規則隊列中就少一個規則;
- 使用新的 RelNode 再次進行注冊以及規則匹配操作,再把匹配的規則加入到規則應用的規則隊列中,然后接着下次規則應用;
- 結束對規則的優化:
- Planner 會一直應用所有的規則,包括后來疊加的規則,直到不會有新的規則匹配到,則優化結束,得到一個最優計划;
- 產出:新的 “RelNode 樹”,也就是新的 RelNode 數據節點集合;
- 從規則隊列(Rule Queue)中彈出(Pop)一個已經匹配成功的規則進行優化;
-
代價計算(Cost Compute)
- 代價計算的時期:
- 每當規則應用之后,如果規則優化成功,則會產生新的 RelNode,在新的 RelNode 注冊過程中,有一個步驟是計算 RelNode 的代價;
- 代價計算的過程:
- 由代價模型對每個 RelNode 的代價進行估算和累加:
- 如果不存在代價,或者 Child 的代價還沒有估算(默認是最大值),則忽略;
- 如果存在代價,則會將本身的代價和 Child (即輸入的所有 RelNode)的代價進行累加;
- 若累加結果小於 Best(期望值),則認為優化后的 RelNode 是當前最優的;並且會對其 Parent 進行遞歸估算代價,即傳播代價計算(Propagate Calculate Cost);
- Parent :指與每個 RelNode 對應的沒有被解析前的數據;
-
思考:
- 代價評估的過程和規則優化的過程是同步的,如果已經找到滿足的方案(累積代價小於 Best),但是規則優化還未結束,是否要繼續優化規則?
- 如果還繼續優化規則,是不是可以找到多種滿足期望的方案?
- 是不是要對比所有滿足期望的方案的累積代價,選擇最小的代價對應的方案,作為最優的方案?
- 由代價模型對每個 RelNode 的代價進行估算和累加:
- 代價計算的時期:
-
2/2)優化器新特性(或者說是新功能)
-
重新排序 Join(Join Reorder)
- Join 是關系型數據庫中最重要的操作符之一,Join 的性能也直接關系到整個 SQL 的性能;
- Join 排序算法的兩種實現:MapJoin、Merge Join;
- 對於小數據量,MapJoin 比 Merge Join 性能更優;
- 功能:將 Join 的所有不同輸入進行一個全排序,找到代價最小的一個排列;
- 業務背景:排序之前只是保持了用戶書寫 SQL 語句的 Join 順序,這樣的 Join 順序不一定是最優的,所以通過重排序 Join 規則可以實現最好的選擇,提供更優的性能;
-
自動 MapJoin(Auto MapJoin)
- 功能:將 Join 的所有不同輸入進行一個全排序,找到代價最小的一個排列;
- 實現方式:充分利用優化器代價模型進行估算,獲得更優的 MapJoin 方式,而不是通過 Hint 方式來處理;
- 業務背景:之前通過 Hint 方式來指定是否使用 MapJoin,這樣對用戶不是很友好,且使用不方便;
2/3)優化器使用
- MaxCompute 優化器提供了許多優化規則,將內部已經實現的優化規則進行分類,並提供 set 等命令方便用戶使用;
- 一些基礎優化規則都會默認打開,用戶可以自己任意搭配使用;
-
優化器提供的 Flag 有:
- 規則白名單 —— odps.optimizer.cbo.rule.filter.white
- 規則黑名單 —— odps.optimizer.cbo.rule.filter.black
-
使用規則:
- 使用優化規則:將需要使用的優化規則的縮寫名稱加入白名單即可;
- 例 2:set odps.optimizer.cbo.rule.filter.white = pojr, hj ;
- 表示:使用重排序 Join 規則和自動 MapJoin 規則;
- 重排序規則 Join = pojr 、自動 MapJoin = hj ;
- 例 2:set odps.optimizer.cbo.rule.filter.white = pojr, hj ;
- 關閉優化規則:將想要關閉的優化規則的縮寫名稱加入黑名單即可;
- 例 1:set odps.optimizer.cbo.rule.filter.black = xxx, yyy ;
- 表示:將優化規則 xxx 和 yyy 關閉;
- 例 1:set odps.optimizer.cbo.rule.filter.black = xxx, yyy ;
- 使用優化規則:將需要使用的優化規則的縮寫名稱加入白名單即可;
2/4)注意事項
-
背景:
- 由於用戶書寫 SQL 語句時可能存在一些不確定因素,所有應盡量避免這些因素帶來的性能影響,甚至結果非預期;
- 例:Optimizer 會提供謂詞下推(Predicate Push Down)優化,主要目的是盡量早的進行謂詞過濾,以減少后續操作的數據量,提供性能;但需要注意的是:
-
UDF
- 優化器不會任意下推帶有用戶意圖的函數;
- 原因:不同用戶書寫的函數含義不一樣,不可以一概而論;
- 解決方法:如果用戶需要下推 UDF,需要自己改動 SQL;
- 好處:用戶自己控制 UDF 執行的邏輯,最了解自己的 UDF 使用在 SQL 的哪個部分,而不是優化器任意下推;
- 優化器不會任意下推帶有用戶意圖的函數;
-
不確定函數
- 優化器不會任意下推不確定函數;(如,sample 函數)
- 例 1:如果用戶將 sample 函數寫在 where 子句中,同時語句中存在 Join,則優化器是不會下推到 TableScan 的,因為可能改變願意;
-
SELECT * FROM t1 JOIN t2 ON t1.c1 = t2.d1 WHERE sample( 4, 1) = true ;
- sample 函數在 Join 之后執行,而不會直接在 TableScan 后執行;
-
- 例 2:如果用戶需要對 TableScan 進行抽樣,則需要自己修改 SQL來達到目的,否則優化器進行下推可能會錯誤的理解用戶意圖
-
SELECT * FROM ( SELECT * FROM t1 WHERE sample(4, 1) = true ) t1 JOIN t2 ON t1.c1 = t2.d1 ;
-
- 例 1:如果用戶將 sample 函數寫在 where 子句中,同時語句中存在 Join,則優化器是不會下推到 TableScan 的,因為可能改變願意;
- 優化器不會任意下推不確定函數;(如,sample 函數)
-
隱式類型轉換
- 書寫 SQL 語句時,應盡量避免 Join Key 存在隱式類型轉換;
- 如,String = Bigint,會轉換為 ToDouble(String) = toDouble(Bigint),這是不是用戶的原版意圖,數據庫本身不得而知;
- 存在隱式類型轉換可能會引發兩種后果:
- 轉換失敗,報錯;
- 雖然轉換成功了,但結果與用戶期望的不一致;
- 書寫 SQL 語句時,應盡量避免 Join Key 存在隱式類型轉換;
-
二、任務優化
- SQL / MR 從提交到最后執行,在 MaxCompute 中的細化步驟:
- SQL / MR 作業一般會生成 MapReduce 任務,在 MaxCompute 中則會生成 MaxCompute Instance,通過唯一 ID 進行標識;
- Fuxi Job:對於 MaxCompute Instance,會生成一個或多個由 Fuxi Task 組成的有向無環圖,即 Fuxi Job;
- Fuxi Task:也就是 Map 端的一個熟人分片數據;
- MaxCompute Instance 和Fuxi Job 類似於 Hive 中 Job 的概念;
- Fuxi Task(任務類型):主要包含三種類型,分別是 Map、Reduce、Join,類似於 Hive 中 Task 的概念;
- Fuxi Instance:真正的計算單元,和 Hive 中的概念類似,一般和槽位(slot)對應;
1、Map 傾斜
- Map 傾斜:數據在 Map Instance 上的分布不均勻,即有的 Map Instance 上分布的數據量很大,有的 Map Instance 分布的數據量很少;
- 后果(Map 端長尾現象):有的 Map Instance 的資源浪費,有的 Map Instance 的資源不夠,計算所用時間很長,導致最終整個 Map 端的計算時間變長;
1/1)背景
- Map 端是 MR 任務的起始階段,Map 端的主要功能是從磁盤中將數據讀入內存,Map 端的兩個主要過程,如下圖:
-
輸入分片
- 每個輸入分片會讓一個 Map Instance 來處理;
- 默認情況下,一個 Pangu 文件系統的一個文件快的大小(默認為 256 MB)為一個分片(Fuxi Task);
-
Map 讀數據階段
- 調節 Map Instance 的個數:如,set odps.mapper.split.size = 256;
- 控制每個 Map Instance 讀取文件的個數:如,set odps.mapper.merge.limit.size = 64;
- 如果輸入數據的文件大小差異比較大,每個 Map Instance 讀取的數據量和讀取時間差異也會很大;(長尾現象 / Map 傾斜)
-
Map Instance 輸出結果時
- 輸出結果會暫時放在一個環形內存緩沖區;(當該緩沖區快要溢出時會在本地文件系統中創建一個溢出文件,即 Write Dump)
-
寫入磁盤
- 在寫入磁盤之前,線程首先根據 Reduce Instance 的個數划分分區,數據將會根據 Key 值 Hash 到不同的分區上,一個 Reduce Instance 對應一個分區的數據;
- Reduce Instance 的個數的確定,在下面的 Reduce 傾斜中介紹;
- Map 端也會做部分聚合操作,以減少輸入 Reduce 端的數據量;
- 在寫入磁盤之前,線程首先根據 Reduce Instance 的個數划分分區,數據將會根據 Key 值 Hash 到不同的分區上,一個 Reduce Instance 對應一個分區的數據;
-
-
問題:
- 由於各個 Map Instance 的數據是根據 Hash 分配的,因此也會導致有些 Reduce Instance 分配到大量的數據,而有些 Reduce Instance 卻分配到很少數據,甚至沒有分配到數據;(也就是 Map 端數據傾斜,會連帶影響 Reduce 端數據傾斜;)
- 在 Map 端讀數據時,由於讀入數據的文件大小分布不均勻,因此會導致有些 Map Instance 讀取並且處理的數據特別多,而有些 Map Instance 處理的數據特別少,造成 Map 端長尾;一般有兩種情況可能會導致 Map 端長尾:
- 上游表文件的大小特別不均勻,並且小文件特別多,導致當前表 Map 端讀取的數據分布不均勻,引起長尾;
- 上游表文件:數據倉庫中的維表和事實表;
- Map 端做聚合時,由於某些 Map Instance 讀取文件的某個值特別多而引起長尾,主要指 Count Distinct 操作;
1/2)方案
-
針對第一種情況導致的 Map 端長尾
- 情況一:上游表文件的大小特別不均勻,並且小文件特別多,導致當前表 Map 端讀取的數據分布不均勻,引起長尾;
- 優化方案:合並上游的小文件,同時調節本節點的小文件的參數;
- 兩個參數:
- 一種參數,用於調節 Map 任務的 Map Instance 的個數;
- 例:set odps.sql.mapper.merge.limit.size = 64 ;
- 另一種參數,用於調節單個 Map Instance 讀取的小文件個數;
- 例:set odps.sql.mapper.split.size = 256 ;
- 一種參數,用於調節 Map 任務的 Map Instance 的個數;
- 兩個參數:
-
針對第二種情況導致 Map 端長尾
- 情況二:Map 端做聚合時,由於某些 Map Instance 讀取文件的某個值特別多而引起長尾,主要指 Count Distinct 操作;
- 聚合:指數據將要從 Map 端輸出,然后徐進入 Reduce 端的時候,對數據在 Map 端做的操作;
- 聚合目的:減少 Reduce 端的數據量;
-
實例說明由於某個值特別多而引起長尾:
- 例:獲取收集 APP 日志明細中的前一個頁面的頁面組信息;
- pre_page:前一個頁面的頁面標志;
- page_ut 表:存儲的手機 APP 的頁面組;
- pre_page 只能通過匹配正則或者所屬的頁面組信息,進行笛卡爾積 Join;
- 思路:pre_page 只能通過正則或者所屬的頁面組信息,進行笛卡爾積 Join;
- 原始代碼:
-
SELECT ... FROM ( SELECT ds , unique_id , pre_page FROM tmp _app_ut_1 WHERE ds = '${bizdate}' AND pre_page is not null ) a LEFT OUTER JOIN ( SELECT t.* , length(t.page_type_rule) rule_length FROM page_ut t WHERE ds = '${bizdate}' AND is_enable = 'Y' ) b ON 1 = 1 WHERE a.pre_page rlike b.page_type_rule ;
-
- 運行代碼,LogView 日志如下圖:
- L1_Stg4:MapJoin 小表的分發階段;
- M3_Stg1:讀取明細日志表的 Map 階段;與 MapJoin 小表的 Join 操作也發生在這個階段;
- R5_3_Stg2:進行分組排序的階段;
-
問題分析:
- M3_Stg1 階段,單個 Instance 的處理時間達到了 40 分鍾,而且長尾的 Instance 個數比較固定,應是不同的 Map 讀入的文件塊分別不均勻導致的,文件塊大的 Map 數據量比較大,在與小表進行笛卡爾積 操作時,非常耗時,造成 Map 端長尾;
-
解決方法:
- 使用 “distribute by rand()” 來打亂數據分布,使數據盡可能分布均勻;
- 修改后的代碼如下
-
SELECT ... FROM ( SELECT ds , unique_id , pre_page FROM tmp _app_ut_1 WHERE ds = '${bizdate}' AND pre_page is not null DISTRIBUTE BY rand() ) a LEFT OUTER JOIN ( SELECT t.* , length(t.page_type_rule) rule_length FROM page_ut t WHERE ds = '${bizdate}' AND is_enable = 'Y' ) b ON 1 = 1 WHERE a.pre_page rlike b.page_type_rule ;
-
-
- 執行上述代碼,LogView 日志如下圖:
-
原因分析:
- 通過 “distribute by rand()” ,將 Map 端分發后的數據重新按照隨機值再進行一次分發;
- 不加人隨機分配函數時:Map 階段需要與使用 MapJoin 的小表進行笛卡爾積操作,Map 端完成了大小表的分發和笛卡爾積操作;
- 加入隨機分配函數后:Map 端只負責數據的分發,不再有復雜的聚合或笛卡爾積操作,因此不會導致 Map 端長尾;
- 通過 “distribute by rand()” ,將 Map 端分發后的數據重新按照隨機值再進行一次分發;
- 執行上述代碼,LogView 日志如下圖:
- 情況二:Map 端做聚合時,由於某些 Map Instance 讀取文件的某個值特別多而引起長尾,主要指 Count Distinct 操作;
1/3)思考
- Map 端長尾的根本原因:由於讀入的文件塊的數據分布不均勻,再加上 UDF 函數性能、Join、聚合操作等,導致讀入數據量大的 Map Instance 耗時較長;
-
實際開發過程中,如果遇到 Map 端長尾情況,解決思路:
- 首先,考慮如何讓 Map Instance 讀取的數據量足夠均勻;
- 然后,判斷是哪些操作導致 Map Instance 比較慢;
- 最后,考慮這些操作是否必須在 Map 端完成,在其他階段是否會做得更好;
2、Join 傾斜
- Join 的功能:MaxCompute SQL 在 Join 執行階段會將 Join Key 相同的數據分發到同一個執行 Instance 上處理;
2/1)背景
-
Join 的執行原理
- Join 操作需要參與 Map 和 Reduce 的整個階段;
- 例:通過 Join 的SQL 代碼,來看整個 Map 和 Reduce 階段的執行過程以及數據變化,進而對 Join 執行原理有所了解;
-
SELECT student_id, student_name, course_id FROM student LEFT JOIN student_course ON student.student_id = student_course.student_id ;
- 過程理解:MaxCompute SQL 在 Join 執行階段會將 Join Key 相同的數據分發到同一個執行 Instance 上處理;
- 長尾情況:如果某個 Key 上的數據量比較大,則會導致該 Instance 執行時間較長;
- 長尾表現:在執行日志中該 Join Task 的大部分 Instance 都已執行完成,但少數幾個 Instance 一致處於執行中;
-
-
MaxCompute SQL 執行中的 Join 階段的 3 中數據傾斜場景
- Join 的某輸入比較小
- 可以采用 MapJoin,避免分發引起長尾;
- Join 的每路輸入都較大,且長尾是空值導致的
- 可以將空值處理成隨機值,避免聚集;
- Join 的每路輸入都較大,且長尾是熱點值導致的
- 可以對熱點值和非熱點值分別進行處理,再合並數據;
- Join 的某輸入比較小
- 如何確認 Join 是否發生數據傾斜
- 打開 MaxCompute SQL 執行時產生的 LogView 日志,點開日志可以看到每個 Fuxi Task 的詳細執行信息,如下圖:
- 可以看到每一個 Map、Join、Reduce 的Fuxi Task 任務;
- 根據上圖,點擊其中一個 Join 任務,可以看到有 115 個 Instance 長尾;再點擊 StdOut,可以查看 Instance 讀入的數據量,如下圖:
- 圖 13.10 顯示,Join 的一路輸入讀取的數據量是 1389941257 行;
- 長尾情況:如果 Long-Tails 中 Instance 讀入的數據量遠超過其他 Instance 讀取的數據量,則表示某個 Instance 處理的數據量超大導致長尾;
- 打開 MaxCompute SQL 執行時產生的 LogView 日志,點開日志可以看到每個 Fuxi Task 的詳細執行信息,如下圖:
2/2)方案
- 針對上面的 3 中傾斜場景,給出 3 中對應的解決方案
-
Join 的某輸入比較小
- 方案:采用 MapJoin;
- MapJoin的原理
- MapJoin 的原理:將 Join 操作提前到 Map 端執行,將小表讀入內存,順序掃描大表完成 Join;
- 優點:可以避免因為分發 Key 不均勻導致數據傾斜;
- 弊端:MapJoin 的使用有限制,必須是 Join 中的從表比較小才可用;
- 從表:左外連接中的右表,或者右外連接中的左表;
- MapJoin 的原理:將 Join 操作提前到 Map 端執行,將小表讀入內存,順序掃描大表完成 Join;
- MapJoin 的使用方法
- 具體操作:在代碼中 select 后加上 “/*+mapjoin(a)*/” 即可;
- a:代表小表(或者子查詢)的別名;
- 例:MaxCompute 已經可以自動選擇是否使用 MapJoin ,可以不使用顯式 Hint:
-
SELECT /*+MAPJOIN(b)*/ a.c2 ,b.c3 FROM ( SELECT C1 ,C2 FROM t1 ) a LETF OUTER JOIN ( SELECT c1 ,c3 FROM t2 ) b on a.c1 = b.c1;
-
- 具體操作:在代碼中 select 后加上 “/*+mapjoin(a)*/” 即可;
- 使用 MapJoin 時,對小表的大小有限制,默認小表讀入內存后的大小不能超過 512 MB,但是用戶可以通過設置 “set odps.sql.mapjoin.memory.max = 2048” 加大內存,最大為 2048 MB;
-
Join 因為空值導致長尾
- 方案:將空值處理成隨機值;
- 原因:空值無法關聯上,只是分發到一處,因此處理成隨機值既不會影響關聯結果,也能很好的避免聚集導致長尾;
- 例:
-
SELECT ... FROM table_a LEFT OUTER JOIN table_b ON coalesce(table_a.key, rand()*9999) = table_b.key --當 key 值為空值時用隨機值代替
-
-
Join 因為熱點值導致長尾
- 場景:因為熱點值導致長尾,且 Join 的輸入比較大,無法使用 MapJoin;
- 方案:先將熱點 key 取出,用熱點 key 將主表數據切分成熱點數據和非熱點數據兩部分,分別處理,最后合並;
- 以實例闡述操作步驟:
- 例:淘寶的 PV 日志關聯商品維表,取商品屬性為例;
- 取熱點 key:將 PV 大於 50000 的商品 ID 取出到臨時表中
-
INSERT OVERWRITE TABLE topk_item SELECT item_id FROM ( SELECT item_id ,count(1) as cnt FROM pv --pv表 WHERE ds = '${bizdate}' AND url_type = 'ipv' AND item_id is not null GROUP BY item_id ) a WHERE cnt >= 50000
-
- 取出非熱點數據
- 操作步驟:將主表(pv 表)和熱點 key 表(topk_item 表)外關聯后,通過條件 “bl.item_id is null” 取關聯不到的數據,即非熱點商品的日志數據;此時需要使用 MapJoin;再用熱點數據關聯商品維表;(因為已經排除了熱點數據,所以不會長尾;)
- 代碼示例:
-
SELECT ... FROM ( SELECT * FROM item --商品表 WHERE ds = '${bizdate}' ) a RIGHT OUTER JOIN ( SELECT /*+MAPJOIN*/ b2.* FROM ( SELECT item_id FROM topk_item --熱點表 WHERE ds = '${bizdate}' ) b1 RIGHT OUTER JOIN ( SELECT * FROM pv --PV表 WHERE ds = '${bizdate}' AND url_type = 'ipv' ) b2 ON b1.item_id = coalesce(b2.item_id, concat("tbcdm", rand()) WHERE b1.item_id is null ) 1 ON a.item_id = coalesce(1.item_id, concat("tbcdm", rand())
-
- 取出熱點數據
- 取到熱點商品的日志數據:將主表(pv 表)和熱點 key 表(topk_item 表)內關聯,此時需要使用 MapJoin;
- 取到熱點商品的維表數據:同時,需要將商品維表(item 表)和熱點 key 表(topk_item 表)內關聯;
- 因為維表數據只有熱點商品的數據,數據量比較小,可以使用 MapJoin 避免長尾;
- 將上兩步匯總的日志數據外關聯維表數據;
-
SELECT /*+MAPJOIN*/ ... FROM ( SELECT /*+MAPJOIN*/ b2.* FROM ( SELECT item_id FROM topk_item WHERE ds = '${bizdate}' ) b1 JOIN ( SELECT * FROM pv --pv 表 WHERE ds = '${bizdate}' AND url_type = 'ipv' AND item_id is not null ) b2 ON (b1.item_id = b2.item_id) ) 1 LEFT OUTER JOIN ( SELECT /*+MAPJOIN*/ a2.* FROM ( SELECT item_id FROM topk_item WHERE ds = '${bizdate}' ) a1 JOIN ( SELECT * FROM item --商品表 WHERE ds = '${bizdate}' ) a2 ON (a1.item_id = a2.item_id) ) a ON a.item_id = 1.item_id
-
- 將上面取到的非熱點數據和熱點數據通過 “union all” 合並后,即得到完整的日志數據,並且關聯了商品信息;
- 取熱點 key:將 PV 大於 50000 的商品 ID 取出到臨時表中
- 例:淘寶的 PV 日志關聯商品維表,取商品屬性為例;
-
針對傾斜問題,MaxCompute 系統提供了專門的參數用來解決長尾問題:
- 例 1:開啟 / 關閉功能
-
set odps.sql.skewjoin = true / false
-
- 例 2:設置傾斜的 key 及對應的值
-
set odps.sql.skewinfo = skewed_src: (skewed_key) --設置傾斜的 key 值(skewed_ey)
-
- 優點:簡單方便;
- 弊端:
- 如果傾斜值發生變化,需要修改代碼,而且一般無法提前知道變化;
- 如果傾斜至比較多,則不方便在參數中設置;
- 需要根據實際情況,選擇拆分代碼或者設置參數;
- 例 1:開啟 / 關閉功能
2/3)思考
- 當大表和大表 Join因為熱點值發生傾斜時,雖然可以通過修改代碼來解決,但是修改起來很麻煩,代碼改動也很大,且影響閱讀;而 MaxCompute 現有的參數設置使用不夠靈活,傾斜值多的時候,不可能將所有值都列在參數中,且傾斜值可能經常變動;
3、Reduce 傾斜
3/1)背景
- Reduce 端:對 Map 端梳理后的有序 key-value 鍵值對進行聚合,即 進行 Count、Sum、Avg 等聚合操作,得到最終聚合的結果;
- Distinct:MaxCompute SQL 中支持的語法;
- 功能:用於對字段去重;
- 原理:將需要去重的字段以及 Group By 字段聯合作為 key 將數據分發到 Reduce 端;
-
Reduce 端長尾原因:key 的數據分布不均勻;
- 因為 Distinct 操作,數據無法在 Map 端的 Shuffle 階段根據 Group By 先做一個聚合操作,以減少傳輸的數據量,而是將所有的數據都傳輸到 Reduce 端,當 key 的數據分布不均勻時,就會導致 Reduce 端長尾;
-
造成 Reduce 端長尾的 4 種情況:
- 對同一個表按照維度對不同的列進行 Count Distinct 操作,造成 Map 端數據膨脹,從而使下游的 Join 和 Reduce 出現鏈路上的長尾;
- Map 端直接做聚合時,出現 key 值分布不均勻,造成 Reduce 端長尾;
- 動態分區數過多時,可能造成小文件過多,從而引起 Reduce 端長尾;
- 多個 Distinct 同時出現在一端 SQL 代碼中時,數據會被分發多次,不僅會造成數據膨脹 N 倍,還會把長尾現象放大 N 倍;
3/2)方案
- 針對上述的造成 Reduce 端長尾的 4 中情況,給出解決方案;
-
Map 端直接做聚合時,出現 key 值分布不均勻,造成 Reduce 端長尾
- 解決方案:對熱點 key 單獨處理,然后通過 “Union All” 合並;(具體操作與 “Join 因為熱點值導致長尾” 的處理方式一樣)
-
動態分區數過多時,可能造成小文件過多,從而引起的 Reduce 端長尾
- 解決方案:把符合不同條件的數據放到不同的分區,避免通過多次 “Insert Overwrite” 寫入表中,特別是分區數比較多時,能夠很多的簡化代碼;
- 弊端:動態分區也可能帶來小文件過多的困擾;
- 例:
-
INSERT OVERWRITE TABLE part_test PARTITION(ds) SELECT * FROM part_test ;
- 假設有 K 個 Map Instance,N 個目標分區,那么在最壞的情況下,可能產生K x N 個小文件,而過多的小文件會對文件系統造成巨大的管理壓力;
- 解決方法:
- MaxCompute 對動態分區的處理是引入額外一級的 Reduce Task,把相同的目標分區交由同一個(或少量幾個)Reduce Instance 來寫入,避免小文件過多,並且這個 Reduce 肯定是最后一個 Reduce Task 操作;(MaxCompute 是默認開啟這個功能的,也就是將下面參數設置為 true)
-
set odps.sql.reshuffle.dynamicpt = true;
-
- 解決方案:把符合不同條件的數據放到不同的分區,避免通過多次 “Insert Overwrite” 寫入表中,特別是分區數比較多時,能夠很多的簡化代碼;
-
多個 Distinct 同時出現在一段 SQL 代碼中時,數據會被分發多次,不僅會造成數據膨脹 N 倍,還會把長尾現象方法 N 倍
- 例:在 7 天、30天等時間范圍內,分 PC 端、無線端、所有終端,計算支付買家和支付商品數,其中支付買家數和支付商品數指標需要去重;
-
方案一:使用 Distinct 去重
- 因為需要根據日期、終端等多種條件組合對買家和商品進行去重,因此需要有 12 個 Count Distinct 計算;
- 下圖為統計代碼和該代碼運行的 LogView 日志,可以看出節點運行時間:1 h 14 min,數據膨脹;
- 方案二:不使用 Distinct 去重
- 計算支付買家數:
- 先分別進行查詢,執行 Group By 原表粒度 + buyer_id,計算出 PC 端、無線端、所有終端以及 7 天、30 天等統計口徑下的 buyer_id(這里可以理解為買家支付的次數);
- 在子查詢外,Group By 原表粒度:當上一步的 Count 值大於 0 時,說明這一買家在這個統一口徑下油鍋支付,計入支付買家數,否則不計入;
- 計算支付商品數:
- 與計算支付買家數的方法一樣,按照上兩步操作進行;
- 對支付買家數和支付商品數進行 Join 操作;
- 代碼示例:(僅示例支付買家數計算)
-
SELECT t2.seller_id ,t2.price_seg_id ,SUM(case when pay_ord_byr_cnt_1w_001 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1w_001 --最近 7 天支付買家數 ,SUM(case when pay_ord_byr_cnt_1w_002 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1w_002 --最近 7 天 PC 端支付買家數 ,SUM(case when pay_ord_byr_cnt_1w_003 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1w_003 --最近 7 天無線端支付買家數 ,SUM(case when pay_ord_byr_cnt_1m_002 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1m_002 --最近 30 天支付買家數 ,SUM(case when pay_ord_byr_cnt_1m_003 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1m_003 --最近 30 天 PC 端支付買家數 ,SUM(case when pay_ord_byr_cnt_1m_004 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1m_004 --最近 30 天無線端支付買家數 FROM ( SELECT a1.seller_id ,a2.price_seg_id ,buyer_id ,COUNT(buyer_id) AS pay_ord_byr_cnt_1m_002 --最近 30 天支付買家數 ,COUNT(CASE WHEN is_wireless = 'N' THEN buyer_id ELSE NULL END) AS pay_ord_byr_cnt_1m_003 --最近 30 天 PC 端支付買家數 ,COUNT(CASE WHEN is_wireless = 'Y' THEN buyer_id ELSE NULL END) AS pay_ord_byr_cnt_1m_004 --最近 30 天無線端支付買家數 ,COUNT(case when a1.ds >= To_CHAR(DATEADD(TO_DATE('${bizdate}', 'yyyymmdd'), -6, 'dd'), 'yyyymmdd') then buyer_id else null end) AS pay_ord_byr_cnt_1w_001 --最近 7 天支付買家數 ,COUNT(case when a1.ds >= To_CHAR(DATEADD(TO_DATE('${bizdate}', 'yyyymmdd'), -6, 'dd'), 'yyyymmdd') and is_wireless = 'N' THEN buyer_id ELSE NULL END) AS pay_ord_byr_cnt_1w_002 --最近 7 天 PC 端支付買家數 ,COUNT(case when a1.ds >= To_CHAR(DATEADD(TO_DATE('${bizdate}', 'yyyymmdd'), -6, 'dd'), 'yyyymmdd') and is_wireless = 'Y' THEN buyer_id ELSE NULL END) AS pay_ord_byr_cnt_1w_003 --最近 7 天無線端支付買家數 FROM ( select * from table_pay --支付表 ) a1 JOIN ( SELECT item_id ,price_seg_id FROM tag_itm --商品 tag 表 WHERE ds = '${bizdate}' ) a2 ON ( a1.item_id = a2.item_id ) GROUP BY a1.seller_id --原表粒度 ,a2.price_seg_id --原表粒度 ,buyer_id ) t2 GROUP BY t2.seller_id --原表粒度 ,t2.price_seg_id --原表粒度
- 修改后運行時間為 13 min,整體運行的 LogView 日志如下圖:數據沒有膨脹;
-
- 代碼示例:(僅示例支付買家數計算)
- 計算支付買家數:
-
- 例:在 7 天、30天等時間范圍內,分 PC 端、無線端、所有終端,計算支付買家和支付商品數,其中支付買家數和支付商品數指標需要去重;
3/3)思考
-
對 Multi Distinct 的思考:
- 上述方案中如果出現多個需要去重的指標,那么在把不同指標 Join 在一起之前,一定要確保指標的粒度是原始表的數據粒度;
- 如,支付買家數和支付商品數,在子查詢中指標粒度分別是:原始表的數據粒度 + buyer_id 和原始表的數據粒度 + item_id,這時兩個指標不是同一數據粒度,所以不能 Join,需要再套一層代碼,分別把指標 Group By 到 “原始表的數據粒度”,然后再進行 Join 操作;
- 在性能和代碼簡潔、可維護之間需要根據具體情況進行權衡
- 情況 1:
- 修改前的 Multi Distinct 代碼的可讀性比較強,代碼簡潔,便於維護;修改后的代碼較為復雜;
- 特點:一般代碼改動比較大,需要投入一定的時間成本;
- 解決思路:可以考慮做成自動化,通過檢測代碼、優化代碼自動生成;
- 修改前的 Multi Distinct 代碼的可讀性比較強,代碼簡潔,便於維護;修改后的代碼較為復雜;
- 情況 2:
- 當出現的Distinct 個數不多、表的數據量也不是很大、表的數據分布較均勻時,可以不使用 Multi Distinct 進行計算;
- 情況 1:
-
考慮上述兩種情況的另一種處理方式
- 情況 1 及處理方式:當代碼比較臃腫時,也可以將上述子查詢落到中間表里,這樣數據模型更合理、復用性更強、層次更清晰;
- 情況 2 及處理方式:當需要去除類似的多個 Distinct 時,可以查一下是否有更細粒度的表可用,避免重復計算;
- 上述方案中如果出現多個需要去重的指標,那么在把不同指標 Join 在一起之前,一定要確保指標的粒度是原始表的數據粒度;
-
兩個要注意的問題
- 目前 Reduce 端數據傾斜很多是由 Count Distinct 問題引起的,因此,在 ETL 開發工作中應該予以重視 Count Distinct 問題,避免數據膨脹;
- 對於一些表的 Join 階段的 NULL 值問題,應該對表的數據分布要有清楚的認識,在開發時解決這個問題;