解決方案二
1. 增加reduce 的jvm內存
2. 增加reduce 個數
3. customer partition
4. 其他優化的討論.
5. reduce sort merge排序算法的討論
6. 正在實現中的hive skewed join.
7. pipeline
8. distinct
9. index 尤其是bitmap index
方式1
既然reduce 本身的計算需要以合適的內存作為支持,在硬件環境容許的情況下,增加reduce 的內存大小顯然有改善數據傾斜的可能,這種方式尤其適合數據分布第一種情況,單個值有大量記錄, 這種值的所有紀錄已經超過了分配給reduce 的內存,無論你怎么樣分區這種情況都不會改變. 當然這種情況的限制也非常明顯,
1. 內存的限制存在
2. 可能會對集群其他任務的運行產生不穩定的影響.
方式2
這個對於數據分布第二種情況有效,唯一值較多,單個唯一值的記錄數不會超過分配給reduce 的內存. 如果發生了偶爾的數據傾斜情況,增加reduce 個數可以緩解偶然情況下的某些reduce 不小心分配了多個較多記錄數的情況. 但是對於第一種數據分布無效.
方式3
一種情況是某個領域知識告訴你數據分布的顯著類型,比如hadoop definitive guide 里面的溫度問題,一個固定的組合(觀測站點的位置和溫度) 的分布是固定的, 對於特定的查詢如果前面兩種方式都沒用,實現自己的partitioner 也許是一個好的方式.
方式5
reduce 分配的內存遠小於處理的數據量時,會產生multi-pass sort 的情況是瓶頸,那么就要問
1. 這種排序是有必要的嘛?
2. 是否有其他排序算法或優化可以根據特定情況降低他瓶頸的閾值?
3. map reduce 適合處理這種情況嘛?
關於問題1. 如果是group by , 那么對於數據分布情況1 ,hash 比sort 好非常多,即使某一個reduce 比其他reduce 處理多的多的數據,hash 的計算方式也不會差距太大.
問題2. 一個是如果實現block shuffle 肯定會極大的減少排序本身的成本, 另外,如果分區之后的reduce 不是使用copy –> sort-merge –> reduce 的計算方式, 在copy 之后將每個block 的頭部信息保存在內存中,不用sort – merge 也可以直接計算reduce, 只不過這時候變成了隨機訪問,而不是現在的sort-merge 之后的順序訪問. block shuffle 的實現有兩種類型,一種是當hadoop 中真正有了列數據格式的時候,數據有更大的機會已經排過序並且按照block 來切分,一般block 為1M ( 可以關注avro-806 ) , 這時候的mapper 什么都不做,甚至連計算分區的開銷都小了很多倍,直接進入reduce 最后一步,第二種類型為沒有列數據格式的支持,需要mapper 排序得到之后的block 的最大最小值,reduce 端在內存中保存最大最小值,copy 完成后直接用這個值來做隨機讀然后進行reduce. ( block shuffle 的實現可以關注 MAPREDUCE-4039 , hash 計算可以關注 MAPREDUCE-1639)
問題3 . map reduce 只有兩個函數,一個map 一個 reduce, 一旦發生數據傾斜就是partition 失效了,對於join 的例子,某一個key 分配了過多的記錄數,對於只有一次partittion的機會,分配錯了數據傾斜的傷害就已經造成了,這種情況很難調試,但是如果你是基於map-reduce-reduce 的方式計算,那么對於同一個key 不需要分配到同一個reduce 中,在第一個reduce 中得到的結果可以在第二個reduce 才匯總去重,第二個reduce 不需要sort – merge 的步驟,因為前一個reduce 已經排過序了,中間的reduce 處理的數據不用關心partition 怎么分,處理的數據量都是一樣大,而第二個reduce 又不使用sort-merge 來排序,不會遇到現在的內存大小的問題,對於skewed join 這種情況瓶頸自然小很多.
方式6
目前hive 有幾個正在開發中的處理skewed join 情況的jira case, HIVE-3086 , HIVE-3286 ,HIVE-3026 . 簡單介紹一下就是facebook 希望通過手工處理提前枚舉的方式列出單個傾斜的值,在join 的時候將這些值特殊列出當作map join 來處理,對於其他值使用原來的方式. 我個人覺得這太不伸縮了,值本身沒有考慮應用過濾條件和優化方式之后的數據量大小問題,他們提前列出的值都是基於整個分區的. join key 如果為組合key 的情況也應該沒有考慮,對metastore 的儲存問題有限制,對輸入的大表和小表都會scan 兩次( 一次處理非skew key , 一次處理skew key 做map join), 對輸出表也會scan 兩次(將兩個結果進行merge) , skew key 必須提前手工列出這又存在額外維護的成本,目前因為還沒有完整的開發完到能夠投入生產的情況,所以等所有特性處理完了有了文檔在看看這個處理方式是否有效,我個人認為的思路應該是接着bucked map join 的思路往下走,只不過不用提前處理cluster key 的問題, 這時候cluster key 的選擇應該是join key + 某個能分散join key 的列, 這等於將大表的同一個key 的值分散到了多個不同的reduce 中,而小表的join key 也必須cluster 到跟大表對應的同一個key , join 中對於數據分布第二種情況不用太難,增加reduce 個數就好,主要是第一種,需要大表的join key 能夠分散,對於同樣join key 的小表又能夠匹配到所有大表中的記錄. 這種思路就是不用掃描大表兩遍或者結果輸出表,不需要提前手工處理,數據是動態sample 的應用了過濾條件之后的數據,而不是提前基於統計數據的不准確結果. 這個基本思路跟tenzing 里面描述的distributed hash join 是一樣的,想辦法切成合適的大小然后用hash 和 map join .
方式7
當同時出現join 和group 的時候, 那么這兩個操作應該是以pipeline (管道) 的方式執行. 在join 的時候就可以直接使用group 的操作符減少大量的數據,而不是等待join 完成,然后寫入磁盤,group 又讀取磁盤做group操作. HIVE-2206 正在做這個優化. hive 里面是沒有pipeline 這個概念的. 像是cloudera 的crunch 或者twitter 的Scalding 都是有這種概念的.
方式8
distinct 本身就是group by 的一種簡寫,我原先以為count(distinct x)這種跟group by 是一樣的,但是發現hive 里面distinct 明顯比group by 要慢,可能跟group by 會有map 端的combiner有關, 另外觀察到hive 在預估count(distinct x) 的reduce 個數比group by 的個數要少 , 所以hive 中使用count(distinct x) , 要么盡量把reduce 個數設置大,直接設置reduce 個數或者hive.exec.reducers.bytes.per.reducer 調小,我個人比較喜歡調后面一個,hive 目前的reduce 個數沒有統計信息的情況下就是用map端輸入之前的數值, 如果你是join 之后還用count(distinct x) 的話,這個默認值一般都會悲劇,如果有where 條件並能過濾一定數量的數據,那么默認reduce 個數可能就還好一點. 不管怎樣,多浪費一點reduce slot 總比等十幾甚至幾十分鍾要好, 或者轉換成group by 的寫法也不錯,寫成group by 的時候distributed by 也很有幫助.
方式9
hive 中的index 就是物化視圖,對於group by 和distinct 的情況等於變成了map 端在做計算,自然不存在傾斜. 尤其是bitmap index , 對於唯一值比較少的列優勢更大,不過index 麻煩的地方在於需要判斷你的sql 是不是常用sql , 另外如果create index 的時候沒有選你查詢的時候用的字段,這個index 是不能用的( hive 中是永遠不可能有DBMS中的用index 去lookup 或者join 原始表這種概念的)
總結
數據傾斜沒有一勞永逸的方式可以解決,了解你的數據集的分布情況,然后了解你所使用計算框架的運行機制和瓶頸,針對特定的情況做特定的優化,做多種嘗試,觀察是否有效
