Hive優化-大表join大表優化


Hive優化-大表join大表優化

  5、大表join大表優化

      如果Hive優化實戰2中mapjoin中小表dim_seller很大呢?比如超過了1GB大小?這種就是大表join大表的問題。首先引入一個具體的問題場景,然后基於此介紹各自優化方案。

   5.1、問題場景

      問題場景如下:

      A表為一個匯總表,匯總的是賣家買家最近N天交易匯總信息,即對於每個賣家最近N天,其每個買家共成交了多少單,總金額是多少,假設N取90天,匯總值僅取成交單數。

      A表的字段有:buyer_id、seller_id、pay_cnt_90day。

      B表為賣家基本信息表,其字段有seller_id、sale_level,其中sale_levels是賣家的一個分層評級信息,比如吧賣家分為6個級別:S0、S1、S2、S3、S4和S5。

      要獲得的結果是每個買家在各個級別的賣家的成交比例信息,比如:

      某買家:S0:10%;S1:20%;S2:20%;S3:10%;S4:20%;S5:10%。

      正如mapjoin中的例子一樣,第一反應是直接join兩表並統計:

      select

         m.buyer_id,

        sum(pay_cnt_90day)  as pay_cnt_90day,

        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,

        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,

        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,

        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,

        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,

        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5

      from (

        select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day

        from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a

        join

               (select seller_id,  sale_level  from table_B)  b

        on  a.seller_id  = b.seller_id

        )  m

      group by m.buyer_id

      但是此SQL會引起數據傾斜,原因在於賣家的二八准則,某些賣家90天內會有幾百萬甚至上千萬的買家,但是大部分的賣家90天內買家的數目並不多,join table_A和table_B的時候,

    ODPS會按照seller_id進行分發,table_A的大賣家引起了數據傾斜。

      但是數據本身無法用mapjoin table_B解決,因為賣家超過千萬條,文件大小有幾個GB,超過了1GB的限制。

   5.2、優化方案1:轉為mapjoin

      一個很正常的想法是,盡管B表無法直接mapjoin, 但是是否可以間接mapjoin它呢?

      實際上此思路有兩種途徑:限制行和限制列。

      限制行的思路是不需要join B全表,而只需要join其在A表中存在的,對於本問題場景,就是過濾掉90天內沒有成交的賣家。

      限制列的思路是只取需要的字段。

      加上如上的限制后,檢查過濾后的B表是否滿足了Hive  mapjoin的條件,如果能滿足,那么添加過濾條件生成一個臨時B表,然后mapjoin該表即可。采用此思路的語句如下:

      

      select

         m.buyer_id,

        sum(pay_cnt_90day)  as pay_cnt_90day,

        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,

        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,

        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,

        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,

        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,

        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5

      from ( 

        select  /*+mapjoin(b)*/

          a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day

        from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a

        join

               (

           select seller_id,  sale_level  from table_B b0

           join 

           (select seller_id from table_A group by seller_id) a0

             on b0.seller_id = a0.selller_id

          )  b

        on  a.seller_id  = b.seller_id

        )  m

      group by m.buyer_id

      此方案在一些情況可以起作用,但是很多時候還是無法解決上述問題,因為大部分賣家盡管90天內買家不多,但還是有一些的,過濾后的B表仍然很多。

 

  5.3、優化方案2:join時用case when語句

      此種解決方案應用場景是:傾斜的值是明確的而且數量很少,比如null值引起的傾斜。其核心是將這些引起傾斜的值隨機分發到Reduce,其主要核心邏輯在於join時對這些特殊值concat隨機數,

    從而達到隨機分發的目的。此方案的核心邏輯如下:

       select a.user_id, a.order_id, b.user_id

      from table_a a join table_b b

      on (case when a.user_is is null then concat('hive', rand()) else a.user_id end) = b.user_id

      Hive 已對此進行了優化,只需要設置參數skewinfo和skewjoin參數,不修改SQL代碼,例如,由於table_B的值“0” 和“1”引起了傾斜,值需要做如下設置:

      set hive.optimize.skewinfo=table_B:(selleer_id) [ ( "0") ("1") ) ] 

      set hive.optimize.skewjoin = true;

      但是方案2因為無法解決本問題場景的傾斜問題,因為傾斜的賣家大量存在而且動態變化。

   

  5.4 、優化方案3:倍數B表,再取模join

     1、通用方案

      此方案的思路是建立一個numbers表,其值只有一列int 行,比如從1到10(具體值可根據傾斜程度確定),然后放大B表10倍,再取模join。代碼如下:

      

      select

         m.buyer_id,

        sum(pay_cnt_90day)  as pay_cnt_90day,

        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,

        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,

        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,

        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,

        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,

        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5

      from (

        select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day

        from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a

        join

               (

          select  /*+mapjoin(members)*/

            seller_id,  sale_level ,member

          from table_B

            join members

          )  b

        on  a.seller_id  = b.seller_id

          and mod(a.pay_cnt_90day,10)+1 = b.number 

        )  m

      group by m.buyer_id

         此思路的核心在於,既然按照seller_id分發會傾斜,那么再人工增加一列進行分發,這樣之前傾斜的值的傾斜程度會減少到原來的1/10,可以通過配置numbers表改放大倍數來降低傾斜程度,

      但這樣做的一個弊端是B表也會膨脹N倍。

    2、專用方案

        通用方案的思路把B表的每條數據都放大了相同的倍數,實際上這是不需要的,只需要把大賣家放大倍數即可:需要首先知道大賣家的名單,即先建立一個臨時表動態存放每天最新的大賣家(

      比如dim_big_seller),同時此表的大賣家要膨脹預先設定的倍數(1000倍)。

        在A表和B表分別新建一個join列,其邏輯為:如果是大賣家,那么concat一個隨機分配正整數(0到預定義的倍數之間,本例為0~1000);如果不是,保持不變。具體代碼如下:

      

      select

         m.buyer_id,

        sum(pay_cnt_90day)  as pay_cnt_90day,

        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,

        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,

        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,

        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,

        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,

        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5

      from (

        select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day

        from (  

          select  /*+mapjoin(big)*/

             buyer_id,  seller_id,  pay_cnt_90day,

             if(big.seller_id is not null, concat(  table_A.seller_id,  'rnd',  cast(  rand() * 1000 as bigint ), table_A.seller_id)  as seller_id_joinkey

              from table_A

               left outer join

             --big表seller_id有重復,請注意一定要group by 后再join,保證table_A的行數保持不變

             (select seller_id  from dim_big_seller  group by seller_id)big

             on table_A.seller_id = big.seller_id

        )  a

        join

               (

          select  /*+mapjoin(big)*/

            seller_id,  sale_level ,

            --big表的seller_id_joinkey生成邏輯和上面的生成邏輯一樣

            coalesce(seller_id_joinkey,table_B.seller_id) as seller_id_joinkey

          from table_B

            left out join

          --table_B表join大賣家表后大賣家行數擴大1000倍,其它賣家行數保持不變

          (select seller_id, seller_id_joinkey from dim_big_seller) big

          on table_B.seller_id= big.seller_id

          )  b

        on  a.seller_id_joinkey= b.seller_id_joinkey

          and mod(a.pay_cnt_90day,10)+1 = b.number 

        )  m

      group by m.buyer_id

      相比通用方案,專用方案的運行效率明細好了許多,因為只是將B表中大賣家的行數放大了1000倍,其它賣家的行數保持不變,但同時代碼復雜了很多,而且必須首先建立大數據表。

   5.5 、方案4:動態一分為二

      實際上方案2和3都用了一分為二的思想,但是都不徹底,對於mapjoin不能解決的問題,終極解決方案是動態一分為二,即對傾斜的鍵值和不傾斜的鍵值分開處理,不傾斜的正常join即可,傾斜的把他們找出來做mapjoin,最后union all其結果即可。

      但是此種解決方案比較麻煩,代碼復雜而且需要一個臨時表存放傾斜的鍵值。代碼如下:

      --由於數據傾斜,先找出90天買家超過10000的賣家

      insert overwrite table  temp_table_B

      select 

        m.seller_id,  n.sale_level

      from (

        select   seller_id

        from (

          select seller_id,count(buyer_id) as byr_cnt

          from table_A

          group by seller_id

          ) a

        where a.byr_cnt >10000

        ) m

      left join 

      (

       select seller_id, sale_level  from table_B

      ) n

        on m.seller_id = n.seller_id;

      

      --對於90天買家超過10000的賣家直接mapjoin,對其它賣家直接正常join即可。

      

      select

         m.buyer_id,

        sum(pay_cnt_90day)  as pay_cnt_90day,

        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,

        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,

        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,

        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,

        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,

        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5

      from (

        select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day

        from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a

        join

               (

          select seller_id,  a.sale_level 

           from table_A  a

           left join temp_table_B b

          on a.seller_id = b.seller_id

          where b.seller_id is not null

          )  b

        on  a.seller_id  = b.seller_id

       union all

       

       select  /*+mapjoin(b)*/

          a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day

        from ( 

           select buyer_id,  seller_id,  pay_cnt_90day   

          from table_A

          )  a

        join

               (

           select seller_id,  sale_level  from table_B 

          )  b

        on  a.seller_id  = b.seller_id

     )  m  group by m.buyer_id

     ) m

     group by m.buyer_id

 

    總結:方案1、2以及方案3中的同用方案不能保證解決大表join大表問題,因為它們都存在種種不同的限制和特定使用場景。

    而方案3的專用方案和方案4是推薦的優化方案,但是它們都需要新建一個臨時表來存儲每日動態變化的大賣家。相對方案4來說,方案3的專用方案不需要對代碼框架進行修改,但是B表會被放大,所以一定要是是維度表,不然統計結果會是錯誤的。方案4最通用,自由度最高,但是對代碼的更改也最大,甚至修改更難代碼框架,可以作為終極方案使用。

    

    參考資料:《離線和實時大數據開發實戰》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM