大數據:計算管理


  • 背景:

    • 2017 年,阿里內部 MaxCompute 集群上游 200 多萬個任務,每天存儲資源、計算資源消耗都很大。如何降低計算資源的消耗,提高任務執行的性能,提升任務產出的時間,是計算平台和 ETL 開發工程師孜孜追求的目標。

 

一、系統優化

  • 系統優化:通過分析計算系統的數據運行情況,判斷計算系統對內存、CPU、Instance 個數這些資源的運用是否合理;
    • 合理:指計算時間更短,使用的資源更少;
  • Hadoop 等分布式計算系統的資源評估:
    • 評估方式:根據 Map 任務的輸入的數據量進行靜態評估;
      • 評估產出:是否存在長尾任務;
        • 長尾:多個 Instance 同時計算數據,但由於各個 Instance 上的數據量分布不均勻等情況,使得有的 Instance 先計算完,而有的要等執行很久,計算完成時間相差很大;
    •  弊端 / 問題:
      • 對於普通的 Map 任務,評估一般符合預期;而對於 Reduce 任務,其輸入來自於 Map 的輸出,但進行評估是也是根據 Map 任務的輸出進行,這樣評估的結果經常和實際需要的資源數相差很大;
    • 解決方案:
      • 在任務穩定的情況下,基於任務的歷史執行情況進行評估,即采用 HBO(History - Based Optimizer,基於歷史的優化器);

 

  • CBO(Cost - Based Optimizer,基於代價的優化器):
    • MaxCompute 的 CBO:

      • 在 Oracle 的 CBO 的基礎上,改變了收集統計信息的方式:
        • 采用各種抽樣統計算法,通過較少的資源獲得大量的統計信息,基於先進的優化模型,具備了完善的 CBO 能力;
      • 優點:與傳統的大數據計算系統相比,性能提升明顯
    • Oracle 的 CBO:

      • 評估方式:根據收集到的表、分區、索引等統計信息,計算每種執行方式的代價(Cost),進而選擇其中最優的(也就是代價最小的)執行方式
      • 優點:收集到的統計信息越多、越准確,Oracle 的 CBO 可能生出代價更小的執行計划;
      • 弊端 / 問題:
        1. 對表和列上統計信息的收集需要付出代價,尤其是在大數據環境下,表的體量巨大,需要消耗大量的資源來收集統計信息;
        2. 消耗大量資源收集到的統計信息,其利用率卻很低;

 

1、HBO

  • HBO(History - Based Optimizer,基於歷史的優化器),根據任務的歷史執行情況,為任務分配更合理的資源;
    • 分配的資源內存、CPU、Instance 個數
      • Instance :指操作系統中一系列的進程以及為這些進程所分配的內存塊;
  • HBO 是對集群資源分配的一種優化,概括起來就是:任務執行歷史 + 集群狀態信息 + 優化原則 → 更優的執行配置;

 1/1)背景

  1. MaxCompute 原資源分配策略

    1. MaxCompute 最初分配 MR 執行過程的 Instance個數的算法
      • MR執行過程:Map 任務、Reduce 任務;
    2. 在 Instance 分配算法的基礎上,根據歷史數據統計各個 Instance 處理的數據量
      1. Map Instance
        • fivenum ( round ( rt$map_input_bytes / 1024 / 1024, 2) )
          [1] 0.00  4.11  16.59  60.66  4921.94
      2. Reduce Instance
        • fivenum ( round ( rt$map_input_bytes / 1024 / 1024, 2) )
          [1]  0.00  0.00  0.75  24.87  192721.83
      3. Join Instance
        • fivenum ( round ( rt$map_input_bytes / 1024 / 1024, 2) )
          [1]  0.00  0.02  1.82  22.15  101640.31
      • 從上面內容可以看出:
        1. 大部分的 Instance 處理的數據量遠遠沒有達到預期,即一個 Instance 處理 256 MB 的數據;
        2. 有些 Instance 處理的數據量很大,很容易導致任務長尾;
      • 總結:默認的 Instance 算法下,小任務存在資源浪費,而大任務卻資源不足;(需要有更合理的資源分配方法,HBO 應運而生)
  2. HBO 的提出

    • 問題分析及解決思路:通過數據分析,發現在系統中存在大量的周期性調度的腳本(物理計划穩定),且這些腳本的輸入一般比較穩定,如果能對這部分腳本進行優化,那么對整個集群的計算資源的使用率會得到顯著提升;
    • 使用 HBO,根據任務的執行歷史情況,為其分配更合理的計算資源;
      • HBO 一般通過自適應調整系統參數,來達到控制計算資源的目的;

 

 1/2)HBO 原理

  • HBO 分配資源的步驟:
    1. 前提:最近 7 天內,任務代碼沒有發生變更,且任務運行 4 次;
    2. Instance 分配邏輯:基礎資源估算值 + 加權資源估算值;
      • 最終的 Instance 個數為:基礎資源估算值 + 加權資源估算值;
      • 加權資源:指對已經按分配邏輯進行分配好基礎資源的每個 Map Task / Reduce Task,再追加一部分資源;
  1. 基礎資源數量的邏輯

    1. 對於 Map Task:Map 數量平均每個 Map 能處理的數據量
      1. 估算用戶提交的任務所需要的 Map 數量;
        • 根據期望的每個 Map 能處理的數據量,再結合用戶提交任務的輸入數據量,估算出用戶提交的任務所需要的 Map 數量;
      2. 采用分層的方式,提供平均每個 Map 能處理的數據量;
        • 分層的目的:為了保證集群上任務的整體吞吐量,保證集群的資源不會被一些超大任務占有;
    2. 對於 Reduce Task:Reduce 數量平均每個 Reduce 能處理的數據量
      • 計算 Reduce 數量的方法
        • Hive 的計算方法:使用 Map 的輸入數據量計算 Reduce 的數量;
        • MaxCompute 的計算方法:使用最近 7 天 Reduce 對應的 Map 的平均輸出數據量,作為 Reduce 的輸入數據量,用於計算 Reduce 的數量;
      1. 估算用戶提交的任務所需要的 Reduce 數量
        • 根據期望的每個 Reduce 能處理的數據量,再結合用戶提交任務的 Reduce 的輸入數據量,估算出用戶提交的任務所需要的 Reduce 數量;
      2. 采用分層的方式,提供平均每個 Reduce 能處理的數據量;
        • 分層的目的:為了保證集群上任務的整體吞吐量,保證集群的資源不會被一些超大任務占有;
  2. 加權資源數量的邏輯

    • 加權資源:對每個 Map Task 和 Reduce Task,在其基礎資源的基礎上再追加一部分資源;
    • 以下是追加原理:追加多少、怎么追加
    1. 對於 Map Task
      1. 系統先初始化期望的每個 Map 能處理的數據量;
      2. 拿該 Map 在最近一段時間內的平均處理速度,與系統設定的期望值做比較:
        • 如果平均處理速度小於期望值,則按照同等比例對基礎資源數量進行加權,估算出該 Map 的加權資源數量;
        • 一段時間:一般用最近 7 天;
    2. 對於 Reduce Task
      • 方法同 Map Task;
  3. CPU / 內存分配邏輯

    • 類似於 Instance 分配邏輯:基礎資源估算值 + 加權資源估算值;

 

 1/3)HBO 效果

  1. 提高 CPU 利用率
  2. 提高內存利用率
  3. 提高 Instance 並發數
  4. 降低執行時長

 

 1/4)HBO 改進與優化

  • 特殊任務場景:有些業務在特定場合下依舊有數據量暴漲的情況發生,如,“雙 11” 和 “雙 12” 期間,這個日常生成的 HBO 計划就不適用了;
  • 解決方法HBO 增加了根據數據量動態調整 Instance 數的功能,主要根據 Map 的數量增長情況進行調整

 

2、CBO

  • MaxCompute 引入的基於代價的優化器(CBO):根據收集的統計信息,計算每種執行方式的代價,進而選擇最優的執行方式;

 2/1)優化器原理

  • 優化器(Optimizer)引入了 Volcano 模型,該模型是基於代價的優化器(CBO),並且引入了重新排序 Join(Join Reorder)自動 MapJoin(Auto MapJoin)優化規則等,同時基於 Volcano 模型的優化器會盡最大的搜索寬度來獲取最優計划;
  • 優化器功能結構模塊:
    • Meta Manager(元數據)、Statistics(統計信息)、Rule Set(優化規則集)、Volcano Planner Core(核心計划器)等,如下圖:
  1. Meta Manager

    • 功能:提供元數據信息
      • 元數據信息:包括表的元數據、統計信息元數據等;以及一些基本的元數據,如是否是分區表、表有哪些列等;
      • 當優化器在選擇計划時,需要根據元數據的一些信息進行優化;
        • 如,表分區裁剪(TableScan Partition Prunning)優化時,需要通過 Meta 信息獲取表數據有哪些分區,然后根據過濾條件來裁剪分區。
  2. Statistics

    • 功能:提供准確的統計信息
      • 統計信息:如,表的 Count 值、列的 Distinct 值、TopN 值等;
      • 收集統計信息:優化器提供了 UDF 來收集統計信息;(包括 Distinct 值、TopN 值等)
        • 注:Count 值等信息是由 Meta 直接提供的;
      • 優化器只有擁有准確的統計信息,才能計算出真正的最優的計划;
        • 如,Join 是選擇 Hash Join 還是 Merge Join,優化器會根據 Join 的輸入數據量(即 Count 值)來進行選擇;
  3. Rule Set

    • 功能優化規則
      • 選擇:根據不同情況選擇不同的優化點,再由優化器根據代價模型(Cost Model)來選擇啟用哪些優化規則;
        • 如,工程合並規則(Project Merge Rule):將臨近的兩個 Project 合並成一個 Project;
        • 如,過濾條件下推規則(Filter Push Down):將過濾條件盡量下推,使得數據先進行過濾,再進行其他計算;(以較少其他操作的數據量)
    • 優化規則分類
      • Substitute:被認為是優化了肯定好的規則;
      • Explore Rule:優化后需要考慮多種優化結果;
      • Build Rule:可以認為優化后的結果不能再次使用規則再進行優化;
    • 所有的優化規則,都放在優化規則集中;
    • MaxCompute 優化器中的優化規則,由用戶通過 set 等命令控制使用;
  4. Volcano Planner Core

    • 功能:將所有信息(Meta 信息、統計信息、優化規則)統一起來處理,然后根據代價模型的計算,獲得一個最優計划;
    1. 代價模型
      • 功能 / 原理:代價模型根據不同操作符(如,Join、Project 等)計算出不同的代價,然后再計算出整個計划中最小代價的計划
      • MaxCompute 的代價模型提供的 Cost 由 3 個維度組成:行數I / O 開銷CPU 開銷;(3 個衡量標准)
        • 通過這 3 個維度衡量每個一操作符的代價;
    2. 工作原理:
      • 將需要輸入給 Planner 的數據,用 Compiler 解析為一個 “計划樹”,簡稱 “RelNode 樹”,樹的每個節點簡稱 RelNode
    3. Volcano Planner 創建
      • Planner 的創建:主要是將 Planner 在優化過程中所用到的信息傳遞給執行計划器;如規則集,用戶指定要使用的規則
        • 信息:RelNode 的 Meta 計算值、RelNode 的代價計算值;
        • 信息的由來:
          • Meta Provider:每個 RelNode 的 Meta 計算;
            • 如,RowCount 值計算、Distinct 值計算等;
          • 代價模型:計算每個 RelNode 的代價等;
    4. Planner 優化

      • Planner 的優化過程:

      1. 規則匹配(Rule Match)

        • 規則匹配:指 RelNode 滿足規則的優化條件而建立的一種匹配關系;(就是給所有的 RelNode,在規則集中找相匹配的規則
        • 操作步驟:
          • Planner 首先將整個 RelNode 樹的每一個 RelNode 注冊到 Planner 內部;同時在注冊過程中,在規則集中找到與每個 RelNode 匹配的規則,然后加入到規則應用(Rule Apply)的隊列中;
            • 整個注冊過程處理結束后,所有與 RelNode 可以匹配的規則,全部加入到隊列中,以后應用時只要從隊列中取出來就可以了;
      2. 規則應用(Rule Apply)

        • 主要任務:優化每個規則隊列中的規則
        • 優化過程:
          1. 從規則隊列(Rule Queue)中彈出(Pop)一個已經匹配成功的規則進行優化
            • 如果優化成功后,會產生至少一個新的 RelNode;新的 RelNode 與未優化時的 RelNode 存在差異;
            • 彈出:彈出一個規則后,規則隊列中就少一個規則;
          2. 使用新的 RelNode 再次進行注冊以及規則匹配操作,再把匹配的規則加入到規則應用的規則隊列中,然后接着下次規則應用;
          3. 結束對規則的優化:
            • Planner 會一直應用所有的規則,包括后來疊加的規則,直到不會有新的規則匹配到,則優化結束,得到一個最優計划
            • 產出:新的 “RelNode 樹”,也就是新的 RelNode 數據節點集合;
      3. 代價計算(Cost Compute)

        • 代價計算的時期:
          • 每當規則應用之后,如果規則優化成功,則會產生新的 RelNode,在新的 RelNode 注冊過程中,有一個步驟是計算 RelNode 的代價;
        • 代價計算的過程:
          1. 由代價模型對每個 RelNode 的代價進行估算和累加:
            1. 如果不存在代價,或者 Child 的代價還沒有估算(默認是最大值),則忽略;
            2. 如果存在代價,則會將本身的代價和 Child (即輸入的所有 RelNode)的代價進行累加;
          2. 若累加結果小於 Best(期望值),則認為優化后的 RelNode 是當前最優的;並且會對其 Parent 進行遞歸估算代價,即傳播代價計算(Propagate Calculate Cost);
            • Parent :指與每個 RelNode 對應的沒有被解析前的數據;
            • 思考:

              1. 代價評估的過程和規則優化的過程是同步的,如果已經找到滿足的方案(累積代價小於 Best),但是規則優化還未結束,是否要繼續優化規則?
              2. 如果還繼續優化規則,是不是可以找到多種滿足期望的方案?
              3. 是不是要對比所有滿足期望的方案的累積代價,選擇最小的代價對應的方案,作為最優的方案?

 

 2/2)優化器新特性(或者說是新功能)

  1. 重新排序 Join(Join Reorder)

    • Join 是關系型數據庫中最重要的操作符之一,Join 的性能也直接關系到整個 SQL 的性能;
    • Join 排序算法的兩種實現:MapJoin、Merge Join;
      • 對於小數據量,MapJoin 比 Merge Join 性能更優;
    • 功能將 Join 的所有不同輸入進行一個全排序,找到代價最小的一個排列
      • 業務背景:排序之前只是保持了用戶書寫 SQL 語句的 Join 順序,這樣的 Join 順序不一定是最優的,所以通過重排序 Join 規則可以實現最好的選擇,提供更優的性能;
  2. 自動 MapJoin(Auto MapJoin)

    • 功能:將 Join 的所有不同輸入進行一個全排序,找到代價最小的一個排列
    • 實現方式:充分利用優化器代價模型進行估算,獲得更優的 MapJoin 方式,而不是通過 Hint 方式來處理;
    • 業務背景:之前通過 Hint 方式來指定是否使用 MapJoin,這樣對用戶不是很友好,且使用不方便;

 

 2/3)優化器使用

  • MaxCompute 優化器提供了許多優化規則,將內部已經實現的優化規則進行分類,並提供 set 等命令方便用戶使用;
    • 一些基礎優化規則都會默認打開,用戶可以自己任意搭配使用;
  • 優化器提供的 Flag 有:

    • 規則白名單 —— odps.optimizer.cbo.rule.filter.white
    • 規則黑名單 —— odps.optimizer.cbo.rule.filter.black
  • 使用規則:

    • 使用優化規則:將需要使用的優化規則的縮寫名稱加入白名單即可;
      • 例 2:set odps.optimizer.cbo.rule.filter.white = pojr, hj ;
        • 表示:使用重排序 Join 規則和自動 MapJoin 規則;
        • 重排序規則 Join = pojr 、自動 MapJoin = hj ;
    • 關閉優化規則:將想要關閉的優化規則的縮寫名稱加入黑名單即可;
      • 例 1:set odps.optimizer.cbo.rule.filter.black = xxx, yyy ;
        • 表示:將優化規則 xxx 和 yyy 關閉;

 

 2/4)注意事項

  • 背景:

    • 由於用戶書寫 SQL 語句時可能存在一些不確定因素,所有應盡量避免這些因素帶來的性能影響,甚至結果非預期;
  • 例:Optimizer 會提供謂詞下推(Predicate Push Down)優化,主要目的是盡量早的進行謂詞過濾,以減少后續操作的數據量,提供性能;但需要注意的是:
    1. UDF

      • 優化器不會任意下推帶有用戶意圖的函數
        • 原因:不同用戶書寫的函數含義不一樣,不可以一概而論;
        • 解決方法:如果用戶需要下推 UDF,需要自己改動 SQL;
          • 好處:用戶自己控制 UDF 執行的邏輯,最了解自己的 UDF 使用在 SQL 的哪個部分,而不是優化器任意下推;
    2. 不確定函數

      • 優化器不會任意下推不確定函數;(如,sample 函數)
        • 例 1:如果用戶將 sample 函數寫在 where 子句中,同時語句中存在 Join,則優化器是不會下推到 TableScan 的,因為可能改變願意;
          • SELECT *
            FROM t1
            JOIN t2
            ON t1.c1 = t2.d1
            WHERE sample( 4, 1) = true ;
            • sample 函數在 Join 之后執行,而不會直接在 TableScan 后執行;
        • 例 2:如果用戶需要對 TableScan 進行抽樣,則需要自己修改 SQL來達到目的,否則優化器進行下推可能會錯誤的理解用戶意圖
          • SELECT *
            FROM
            (
                SELECT *
                FROM t1
                WHERE sample(4, 1) = true
            ) t1
            JOIN t2
            ON t1.c1 = t2.d1 ;
    3. 隱式類型轉換

      • 書寫 SQL 語句時,應盡量避免 Join Key 存在隱式類型轉換
        • 如,String = Bigint,會轉換為 ToDouble(String) = toDouble(Bigint),這是不是用戶的原版意圖,數據庫本身不得而知;
      • 存在隱式類型轉換可能會引發兩種后果:
        1. 轉換失敗,報錯;
        2. 雖然轉換成功了,但結果與用戶期望的不一致;

 

 

二、任務優化

  • SQL / MR 從提交到最后執行,在 MaxCompute 中的細化步驟:
    1. SQL / MR 作業一般會生成 MapReduce 任務,在 MaxCompute 中則會生成 MaxCompute Instance,通過唯一 ID 進行標識
    2. Fuxi Job:對於 MaxCompute Instance,會生成一個或多個由 Fuxi Task 組成的有向無環圖,即 Fuxi Job;
      • Fuxi Task:也就是 Map 端的一個熟人分片數據;
      • MaxCompute Instance 和Fuxi Job 類似於 Hive 中 Job 的概念;
    3. Fuxi Task(任務類型):主要包含三種類型,分別是 Map、Reduce、Join,類似於 Hive 中 Task 的概念;
    4. Fuxi Instance:真正的計算單元,和 Hive 中的概念類似,一般和槽位(slot)對應;

 

1、Map 傾斜

  • Map 傾斜:數據在 Map Instance 上的分布不均勻,即有的 Map Instance 上分布的數據量很大,有的 Map Instance 分布的數據量很少;
    • 后果(Map 端長尾現象):有的 Map Instance 的資源浪費,有的 Map Instance 的資源不夠,計算所用時間很長,導致最終整個 Map 端的計算時間變長;

 1/1)背景

  • Map 端是 MR 任務的起始階段,Map 端的主要功能是從磁盤中將數據讀入內存,Map 端的兩個主要過程,如下圖:
      1. 輸入分片

        • 每個輸入分片會讓一個 Map Instance 來處理;
        • 默認情況下,一個 Pangu 文件系統的一個文件快的大小(默認為 256 MB)為一個分片(Fuxi Task);
      2. Map 讀數據階段

        • 調節 Map Instance 的個數:如,set  odps.mapper.split.size = 256;
        • 控制每個 Map Instance 讀取文件的個數:如,set odps.mapper.merge.limit.size = 64;
          • 如果輸入數據的文件大小差異比較大,每個 Map Instance 讀取的數據量和讀取時間差異也會很大;(長尾現象 / Map 傾斜)
      3. Map Instance 輸出結果時

        • 輸出結果會暫時放在一個環形內存緩沖區;(當該緩沖區快要溢出時會在本地文件系統中創建一個溢出文件,即 Write Dump
      4. 寫入磁盤

        • 在寫入磁盤之前,線程首先根據 Reduce Instance 的個數划分分區,數據將會根據 Key 值 Hash 到不同的分區上,一個 Reduce Instance 對應一個分區的數據;
          • Reduce Instance 的個數的確定,在下面的 Reduce 傾斜中介紹;
        • Map 端也會做部分聚合操作,以減少輸入 Reduce 端的數據量;
      • 問題:

        1. 由於各個 Map Instance 的數據是根據 Hash 分配的,因此也會導致有些 Reduce Instance 分配到大量的數據,而有些 Reduce Instance 卻分配到很少數據,甚至沒有分配到數據;(也就是 Map 端數據傾斜,會連帶影響 Reduce 端數據傾斜;)
        2. 在 Map 端讀數據時,由於讀入數據的文件大小分布不均勻,因此會導致有些 Map Instance 讀取並且處理的數據特別多,而有些 Map Instance 處理的數據特別少,造成 Map 端長尾;一般有兩種情況可能會導致 Map 端長尾:
          1. 上游表文件的大小特別不均勻,並且小文件特別多,導致當前表 Map 端讀取的數據分布不均勻,引起長尾
            • 上游表文件:數據倉庫中的維表和事實表;
          2. Map 端做聚合時,由於某些 Map Instance 讀取文件的某個值特別多而引起長尾,主要指 Count Distinct 操作

 

 1/2)方案

  1. 針對第一種情況導致的 Map 端長尾

    • 情況一:上游表文件的大小特別不均勻,並且小文件特別多,導致當前表 Map 端讀取的數據分布不均勻,引起長尾
    • 優化方案合並上游的小文件同時調節本節點的小文件的參數
      • 兩個參數:
        1. 一種參數,用於調節 Map 任務的 Map Instance 的個數;
          • 例:set  odps.sql.mapper.merge.limit.size = 64 ;
        2. 另一種參數,用於調節單個 Map Instance 讀取的小文件個數;
          • 例:set  odps.sql.mapper.split.size = 256 ;
  2. 針對第二種情況導致 Map 端長尾

    • 情況二:Map 端做聚合時,由於某些 Map Instance 讀取文件的某個值特別多而引起長尾,主要指 Count Distinct 操作
      • 聚合:指數據將要從 Map 端輸出,然后徐進入 Reduce 端的時候,對數據在 Map 端做的操作;
      • 聚合目的:減少 Reduce 端的數據量;
    • 實例說明由於某個值特別多而引起長尾:

      • 例:獲取收集 APP 日志明細中的前一個頁面的頁面組信息;
        • pre_page:前一個頁面的頁面標志;
        • page_ut 表:存儲的手機 APP 的頁面組;
        • pre_page 只能通過匹配正則或者所屬的頁面組信息,進行笛卡爾積 Join;
        • 思路:pre_page 只能通過正則或者所屬的頁面組信息,進行笛卡爾積 Join;
        • 原始代碼:
          • SELECT ...
            FROM
            (
                SELECT ds
                              , unique_id
                              , pre_page
                FROM tmp _app_ut_1
                WHERE ds = '${bizdate}'
                AND pre_page is not null
            ) a
            LEFT OUTER JOIN
            (
                SELECT t.*
                             , length(t.page_type_rule) rule_length
                FROM    page_ut t
                WHERE ds = '${bizdate}'
                AND      is_enable = 'Y'
            ) b
            ON 1 = 1
            WHERE a.pre_page rlike b.page_type_rule ;
        • 運行代碼,LogView 日志如下圖:
          1. L1_Stg4:MapJoin 小表的分發階段;
          2. M3_Stg1:讀取明細日志表的 Map 階段;與 MapJoin 小表的 Join 操作也發生在這個階段;
          3. R5_3_Stg2:進行分組排序的階段;
        • 問題分析:

          • M3_Stg1 階段,單個 Instance 的處理時間達到了 40 分鍾,而且長尾的 Instance 個數比較固定,應是不同的 Map 讀入的文件塊分別不均勻導致的,文件塊大的 Map 數據量比較大,在與小表進行笛卡爾積 操作時,非常耗時,造成 Map 端長尾
        • 解決方法:

          • 使用 “distribute by rand()” 來打亂數據分布,使數據盡可能分布均勻
        • 修改后的代碼如下
          • SELECT ...
            FROM
            (
                SELECT ds
                              , unique_id
                              , pre_page
                FROM tmp _app_ut_1
                WHERE ds = '${bizdate}'
                AND pre_page is not null
                DISTRIBUTE BY rand()
            ) a
            LEFT OUTER JOIN
            (
                SELECT t.*
                             , length(t.page_type_rule) rule_length
                FROM    page_ut t
                WHERE ds = '${bizdate}'
                AND      is_enable = 'Y'
            ) b
            ON 1 = 1
            WHERE a.pre_page rlike b.page_type_rule ;
        • 執行上述代碼,LogView 日志如下圖:
        • 原因分析:

          • 通過 “distribute by rand()” ,將 Map 端分發后的數據重新按照隨機值再進行一次分發;
            1. 不加人隨機分配函數時:Map 階段需要與使用 MapJoin 的小表進行笛卡爾積操作,Map 端完成了大小表的分發和笛卡爾積操作;
            2. 加入隨機分配函數后:Map 端只負責數據的分發,不再有復雜的聚合或笛卡爾積操作,因此不會導致 Map 端長尾;

 

 1/3)思考

  • Map 端長尾的根本原因由於讀入的文件塊的數據分布不均勻,再加上 UDF 函數性能、Join、聚合操作等,導致讀入數據量大的 Map Instance 耗時較長
  • 實際開發過程中,如果遇到 Map 端長尾情況,解決思路:

    1. 首先,考慮如何讓 Map Instance 讀取的數據量足夠均勻;
    2. 然后,判斷是哪些操作導致 Map Instance 比較慢;
    3. 最后,考慮這些操作是否必須在 Map 端完成,在其他階段是否會做得更好;

 

2、Join 傾斜

  • Join 的功能:MaxCompute SQL 在 Join 執行階段會將 Join Key 相同的數據分發到同一個執行 Instance 上處理

 2/1)背景

  1. Join 的執行原理

    • Join 操作需要參與 Map 和 Reduce 的整個階段;
    • 例:通過 Join 的SQL 代碼,來看整個 Map 和 Reduce 階段的執行過程以及數據變化,進而對 Join 執行原理有所了解;
      • SELECT student_id, student_name, course_id
        FROM    student
        LEFT JOIN student_course
        ON student.student_id = student_course.student_id ;
      • 過程理解:MaxCompute SQL 在 Join 執行階段會將 Join Key 相同的數據分發到同一個執行 Instance 上處理;
        • 長尾情況:如果某個 Key 上的數據量比較大,則會導致該 Instance 執行時間較長;
        • 長尾表現:在執行日志中該 Join Task 的大部分 Instance 都已執行完成,但少數幾個 Instance 一致處於執行中;
  2. MaxCompute SQL 執行中的 Join 階段的 3 中數據傾斜場景

    1. Join 的某輸入比較小
      • 可以采用 MapJoin,避免分發引起長尾;
    2. Join 的每路輸入都較大,且長尾是空值導致的
      • 可以將空值處理成隨機值,避免聚集;
    3. Join 的每路輸入都較大,且長尾是熱點值導致的
      • 可以對熱點值和非熱點值分別進行處理,再合並數據;
  3. 如何確認 Join 是否發生數據傾斜
    1. 打開 MaxCompute SQL 執行時產生的 LogView 日志,點開日志可以看到每個 Fuxi Task 的詳細執行信息,如下圖:
      • 可以看到每一個 Map、Join、Reduce 的Fuxi Task 任務;
    2. 根據上圖,點擊其中一個 Join 任務,可以看到有 115 個 Instance 長尾;再點擊 StdOut,可以查看 Instance 讀入的數據量,如下圖:
        • 圖 13.10 顯示,Join 的一路輸入讀取的數據量是 1389941257 行;
        • 長尾情況:如果 Long-Tails 中 Instance 讀入的數據量遠超過其他 Instance 讀取的數據量,則表示某個 Instance 處理的數據量超大導致長尾;

 

 2/2)方案

  • 針對上面的 3 中傾斜場景,給出 3 中對應的解決方案
  1. Join 的某輸入比較小

    • 方案采用 MapJoin
    1. MapJoin的原理
      • MapJoin 的原理:將 Join 操作提前到 Map 端執行,將小表讀入內存,順序掃描大表完成 Join;
        • 優點:可以避免因為分發 Key 不均勻導致數據傾斜;
        • 弊端:MapJoin 的使用有限制,必須是 Join 中的從表比較小才可用;
          • 從表:左外連接中的右表,或者右外連接中的左表;
    2. MapJoin 的使用方法
      • 具體操作在代碼中 select 后加上 “/*+mapjoin(a)*/” 即可
        • a:代表小表(或者子查詢)的別名;
        • 例:MaxCompute 已經可以自動選擇是否使用 MapJoin ,可以不使用顯式 Hint:
          • SELECT    /*+MAPJOIN(b)*/
                            a.c2
                            ,b.c3
            
            FROM
            (
                    SELECT    C1
                             ,C2
                    FROM      t1
            ) a
            LETF OUTER JOIN
            (
                    SELECT    c1
                             ,c3
                    FROM      t2
            ) b
            on    a.c1 = b.c1;
    • 使用 MapJoin 時,對小表的大小有限制,默認小表讀入內存后的大小不能超過 512 MB,但是用戶可以通過設置 “set odps.sql.mapjoin.memory.max = 2048” 加大內存,最大為 2048 MB;
  2. Join 因為空值導致長尾

    • 方案將空值處理成隨機值
    • 原因:空值無法關聯上,只是分發到一處,因此處理成隨機值既不會影響關聯結果,也能很好的避免聚集導致長尾;
    • 例:
      • SELECT    ...
        FROM                        table_a
        LEFT OUTER JOIN      table_b
        ON    coalesce(table_a.key, rand()*9999) = table_b.key    --當 key 值為空值時用隨機值代替
  3. Join 因為熱點值導致長尾

    • 場景:因為熱點值導致長尾,且 Join 的輸入比較大,無法使用 MapJoin;
    • 方案先將熱點 key 取出,用熱點 key 將主表數據切分成熱點數據和非熱點數據兩部分,分別處理,最后合並
    • 以實例闡述操作步驟:
      • 例:淘寶的 PV 日志關聯商品維表,取商品屬性為例;
        1. 取熱點 key:將 PV 大於 50000 的商品 ID 取出到臨時表中
          • INSERT    OVERWRITE TABLE topk_item
            SELECT    item_id
            FROM
            (
                            SELECT    item_id
                                            ,count(1) as cnt
                            FROM        pv        --pv表
                            WHERE    ds = '${bizdate}'
                            AND        url_type = 'ipv'
                            AND        item_id is not null
                            GROUP BY item_id
            ) a
            WHERE    cnt >= 50000
        2. 取出非熱點數據
          • 操作步驟:將主表(pv 表)和熱點 key 表(topk_item 表)外關聯后,通過條件 “bl.item_id is null” 取關聯不到的數據,即非熱點商品的日志數據;此時需要使用 MapJoin;再用熱點數據關聯商品維表;(因為已經排除了熱點數據,所以不會長尾;)
          • 代碼示例:
            • SELECT ...
              FROM
              (
                      SELECT    *
                      FROM        item        --商品表
                      WHERE    ds = '${bizdate}'
              ) a
              RIGHT OUTER JOIN
              (
                      SELECT    /*+MAPJOIN*/
                                      b2.*
                      FROM
                      (
                                      SELECT    item_id
                                      FROM        topk_item        --熱點表
                                      WHERE    ds = '${bizdate}'
                      ) b1
                      RIGHT OUTER JOIN
                      (
                                      SELECT    *
                                      FROM        pv            --PV表
                                      WHERE    ds = '${bizdate}'
                                      AND          url_type = 'ipv'
                      ) b2
                      ON b1.item_id = coalesce(b2.item_id, concat("tbcdm", rand())
                      WHERE    b1.item_id is null
              ) 1
              ON a.item_id = coalesce(1.item_id, concat("tbcdm", rand())
        3. 取出熱點數據
          1. 取到熱點商品的日志數據:將主表(pv 表)和熱點 key 表(topk_item 表)內關聯,此時需要使用 MapJoin;
          2. 取到熱點商品的維表數據:同時,需要將商品維表(item 表)和熱點 key 表(topk_item 表)內關聯;
            • 因為維表數據只有熱點商品的數據,數據量比較小,可以使用 MapJoin 避免長尾;
          3. 將上兩步匯總的日志數據外關聯維表數據;
            • SELECT    /*+MAPJOIN*/
                              ...
              FROM
              (
                              SELECT    /*+MAPJOIN*/
                                              b2.*
                              FROM
                              (
                                              SELECT    item_id
                                              FROM        topk_item
                                              WHERE     ds = '${bizdate}'
                              ) b1
                              JOIN
                              (
                                              SELECT    *
                                              FROM        pv        --pv 表
                                              WHERE    ds = '${bizdate}'
                                              AND          url_type = 'ipv'
                                              AND          item_id is not null
                              ) b2
                              ON            (b1.item_id = b2.item_id)
              ) 1
              LEFT OUTER JOIN
              (
                              SELECT    /*+MAPJOIN*/
                                              a2.*
                              FROM
                              (
                                              SELECT    item_id
                                              FROM       topk_item
                                              WHERE    ds = '${bizdate}'
                              ) a1
                              JOIN
                              (
                                              SELECT    *
                                              FROM        item        --商品表
                                              WHERE    ds = '${bizdate}'
                              ) a2
                              ON            (a1.item_id = a2.item_id)
              ) a
              ON            a.item_id = 1.item_id
        4. 將上面取到的非熱點數據和熱點數據通過 “union all” 合並后,即得到完整的日志數據,並且關聯了商品信息;
  • 針對傾斜問題,MaxCompute 系統提供了專門的參數用來解決長尾問題:

    • 例 1:開啟 / 關閉功能
      • set odps.sql.skewjoin = true / false
    • 例 2:設置傾斜的 key 及對應的值
      • set odps.sql.skewinfo = skewed_src: (skewed_key)    --設置傾斜的 key 值(skewed_ey)
    • 優點:簡單方便;
    • 弊端:
      1. 如果傾斜值發生變化,需要修改代碼,而且一般無法提前知道變化;
      2. 如果傾斜至比較多,則不方便在參數中設置;
    • 需要根據實際情況,選擇拆分代碼或者設置參數;

 

 2/3)思考

  • 當大表和大表 Join因為熱點值發生傾斜時,雖然可以通過修改代碼來解決,但是修改起來很麻煩,代碼改動也很大,且影響閱讀;而 MaxCompute 現有的參數設置使用不夠靈活,傾斜值多的時候,不可能將所有值都列在參數中,且傾斜值可能經常變動;

 

3、Reduce 傾斜

 3/1)背景

  • Reduce 端對 Map 端梳理后的有序 key-value 鍵值對進行聚合,即 進行 Count、Sum、Avg 等聚合操作,得到最終聚合的結果
  • Distinct:MaxCompute SQL 中支持的語法;
    • 功能:用於對字段去重;
    • 原理:將需要去重的字段以及 Group By 字段聯合作為 key 將數據分發到 Reduce 端;
  • Reduce 端長尾原因:key 的數據分布不均勻;

    • 因為 Distinct 操作,數據無法在 Map 端的 Shuffle 階段根據 Group By 先做一個聚合操作,以減少傳輸的數據量,而是將所有的數據都傳輸到 Reduce 端,當 key 的數據分布不均勻時,就會導致 Reduce 端長尾;
  • 造成 Reduce 端長尾的 4 種情況:

    1. 對同一個表按照維度對不同的列進行 Count Distinct 操作,造成 Map 端數據膨脹,從而使下游的 Join 和 Reduce 出現鏈路上的長尾;
    2. Map 端直接做聚合時,出現 key 值分布不均勻,造成 Reduce 端長尾;
    3. 動態分區數過多時,可能造成小文件過多,從而引起 Reduce 端長尾;
    4. 多個 Distinct 同時出現在一端 SQL 代碼中時,數據會被分發多次,不僅會造成數據膨脹 N 倍,還會把長尾現象放大 N 倍;

 

 3/2)方案

  • 針對上述的造成 Reduce 端長尾的 4 中情況,給出解決方案;
  1. Map 端直接做聚合時,出現 key 值分布不均勻,造成 Reduce 端長尾

    • 解決方案對熱點 key 單獨處理,然后通過 “Union All” 合並;(具體操作與 “Join 因為熱點值導致長尾” 的處理方式一樣)
  2. 動態分區數過多時,可能造成小文件過多,從而引起的 Reduce 端長尾

    • 解決方案:把符合不同條件的數據放到不同的分區,避免通過多次 “Insert Overwrite” 寫入表中,特別是分區數比較多時,能夠很多的簡化代碼;
      • 弊端:動態分區也可能帶來小文件過多的困擾;
      • 例:
        • INSERT OVERWRITE TABLE part_test PARTITION(ds)
          SELECT    *
          FROM        part_test ;
        • 假設有 K 個 Map Instance,N 個目標分區,那么在最壞的情況下,可能產生K x N 個小文件,而過多的小文件會對文件系統造成巨大的管理壓力;
        • 解決方法:
          • MaxCompute 對動態分區的處理是引入額外一級的 Reduce Task,把相同的目標分區交由同一個(或少量幾個)Reduce Instance 來寫入,避免小文件過多,並且這個 Reduce 肯定是最后一個 Reduce Task 操作;(MaxCompute 是默認開啟這個功能的,也就是將下面參數設置為 true)
          • set odps.sql.reshuffle.dynamicpt = true;
  3. 多個 Distinct 同時出現在一段 SQL 代碼中時,數據會被分發多次,不僅會造成數據膨脹 N 倍,還會把長尾現象方法 N 倍

    • 例:在 7 天、30天等時間范圍內,分 PC 端、無線端、所有終端,計算支付買家和支付商品數,其中支付買家數和支付商品數指標需要去重;
      • 方案一使用 Distinct 去重

        • 因為需要根據日期、終端等多種條件組合對買家和商品進行去重,因此需要有 12 個 Count Distinct 計算;
        • 下圖為統計代碼和該代碼運行的 LogView 日志,可以看出節點運行時間:1 h 14 min,數據膨脹;
      • 方案二:不使用 Distinct 去重
        1. 計算支付買家數:
          1. 先分別進行查詢,執行 Group By 原表粒度 + buyer_id,計算出 PC 端、無線端、所有終端以及 7 天、30 天等統計口徑下的 buyer_id(這里可以理解為買家支付的次數);
          2. 在子查詢外,Group By 原表粒度:當上一步的 Count 值大於 0 時,說明這一買家在這個統一口徑下油鍋支付,計入支付買家數,否則不計入;
        2. 計算支付商品數:
          • 與計算支付買家數的方法一樣,按照上兩步操作進行;
        3. 對支付買家數和支付商品數進行 Join 操作;
          • 代碼示例:(僅示例支付買家數計算)
            • SELECT    t2.seller_id
                              ,t2.price_seg_id
                              ,SUM(case when pay_ord_byr_cnt_1w_001 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1w_001    --最近 7 天支付買家數
                              ,SUM(case when pay_ord_byr_cnt_1w_002 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1w_002    --最近 7 天 PC 端支付買家數
                              ,SUM(case when pay_ord_byr_cnt_1w_003 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1w_003    --最近 7 天無線端支付買家數
                              ,SUM(case when pay_ord_byr_cnt_1m_002 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1m_002    --最近 30 天支付買家數
                              ,SUM(case when pay_ord_byr_cnt_1m_003 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1m_003    --最近 30 天 PC 端支付買家數
                              ,SUM(case when pay_ord_byr_cnt_1m_004 > 0 then 1 else 0 end) AS pay_ord_byr_cnt_1m_004    --最近 30 天無線端支付買家數
              FROM
              (
                      SELECT    a1.seller_id
                                      ,a2.price_seg_id
                                      ,buyer_id
                                      ,COUNT(buyer_id) AS pay_ord_byr_cnt_1m_002    --最近 30 天支付買家數
                                      ,COUNT(CASE WHEN is_wireless = 'N' THEN buyer_id ELSE NULL END) AS pay_ord_byr_cnt_1m_003    --最近 30 天 PC 端支付買家數
                                      ,COUNT(CASE WHEN is_wireless = 'Y' THEN buyer_id ELSE NULL END) AS pay_ord_byr_cnt_1m_004    --最近 30 天無線端支付買家數
                                      ,COUNT(case when a1.ds >= To_CHAR(DATEADD(TO_DATE('${bizdate}', 'yyyymmdd'), -6, 'dd'), 'yyyymmdd') then buyer_id else null end) AS pay_ord_byr_cnt_1w_001    --最近 7 天支付買家數
                                      ,COUNT(case when a1.ds >= To_CHAR(DATEADD(TO_DATE('${bizdate}', 'yyyymmdd'), -6, 'dd'), 'yyyymmdd') and is_wireless = 'N' THEN buyer_id ELSE NULL END) AS pay_ord_byr_cnt_1w_002    --最近 7 天 PC 端支付買家數
                                      ,COUNT(case when a1.ds >= To_CHAR(DATEADD(TO_DATE('${bizdate}', 'yyyymmdd'), -6, 'dd'), 'yyyymmdd') and is_wireless = 'Y' THEN buyer_id ELSE NULL END) AS pay_ord_byr_cnt_1w_003    --最近 7 天無線端支付買家數
              FROM
                          (
                              select *
                              from    table_pay        --支付表
                          ) a1
              JOIN ( SELECT item_id
                                      ,price_seg_id
                         FROM    tag_itm        --商品 tag 表
                         WHERE  ds = '${bizdate}'
                      ) a2
              ON ( a1.item_id = a2.item_id )
              GROUP BY a1.seller_id               --原表粒度
                              ,a2.price_seg_id        --原表粒度
                              ,buyer_id
              )  t2
              GROUP BY t2.seller_id               --原表粒度
                              ,t2.price_seg_id        --原表粒度
            • 修改后運行時間為 13 min,整體運行的 LogView 日志如下圖:數據沒有膨脹;

 

 3/3)思考

  1. 對 Multi Distinct 的思考:

    1. 上述方案中如果出現多個需要去重的指標,那么在把不同指標 Join 在一起之前,一定要確保指標的粒度是原始表的數據粒度;
      • 如,支付買家數和支付商品數,在子查詢中指標粒度分別是:原始表的數據粒度 + buyer_id 和原始表的數據粒度 + item_id,這時兩個指標不是同一數據粒度,所以不能 Join,需要再套一層代碼,分別把指標 Group By 到 “原始表的數據粒度”,然后再進行 Join 操作;
    2. 在性能和代碼簡潔、可維護之間需要根據具體情況進行權衡
      • 情況 1
        • 修改前的 Multi Distinct 代碼的可讀性比較強,代碼簡潔,便於維護;修改后的代碼較為復雜;
          • 特點:一般代碼改動比較大,需要投入一定的時間成本;
          • 解決思路:可以考慮做成自動化,通過檢測代碼、優化代碼自動生成;
      • 情況 2
        • 當出現的Distinct 個數不多、表的數據量也不是很大、表的數據分布較均勻時,可以不使用 Multi Distinct 進行計算;
    3. 考慮上述兩種情況的另一種處理方式

      • 情況 1 及處理方式:當代碼比較臃腫時,也可以將上述子查詢落到中間表里,這樣數據模型更合理、復用性更強、層次更清晰;
      • 情況 2 及處理方式:當需要去除類似的多個 Distinct 時,可以查一下是否有更細粒度的表可用,避免重復計算;
  2. 兩個要注意的問題

    1. 目前 Reduce 端數據傾斜很多是由 Count Distinct 問題引起的,因此,在 ETL 開發工作中應該予以重視 Count Distinct 問題,避免數據膨脹;
    2. 對於一些表的 Join 階段的 NULL 值問題,應該對表的數據分布要有清楚的認識,在開發時解決這個問題;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM