在開發過程中大家都會遇到一個常見的問題,那就是數據傾斜。既然遇到問題,那么就應該想辦法解決問題。解決問題首先要了解出現這個問題的原因。
什么是數據傾斜,比如說:在hive中 map階段早就跑完了,reduce階段一直卡在99%。很大情況是發生了數據傾斜,整個任務在等某個節點跑完。 在spark中大部分的task執行的特別快,
剩下的一些task執行的特別慢,要幾分鍾或幾十分鍾才執行完一個task Hive中大表join的時候,容易產生數據傾斜問題,spark中產生shuffle類算子的操作,groupbykey、reducebykey、join等操作
會引起數據傾斜。
數據傾斜的原因:在進行shuffle的時候,必須 將各個節點上相同的 key 拉取到某個節點上的一個 task 來進行處理 ,比如按照key進行聚合或join等操作。此時如果某個 key對應的數據量特別大的話,
就會發生數據傾斜。比如大部分key對應10條數據,但是個別key卻對應了100萬條數據,那么大部分task可能就只 會分配到10條數據,然后1秒鍾就運行完了;但是個別task可能分配到了100萬數據,
要運行一兩個小時。
解決方案:
第一點:直接過濾掉那些引起傾斜的Key。這種方法很簡單,既然你傾斜,那我不用你就完事。比如說,總共有100萬個key。只有2 個key,是數據量達到10 萬的。其他所有的key,對應的數量都是幾十,這樣join
后會引起傾斜。這個時候,自 己可以去取舍,如果業務和需求可以理解和接受的話,在從hive 表查詢源數據的時候,直接在sql 中 用 where 條件,過濾掉某幾個 key 。那么這幾個 原先有大量數據,會導致數據傾
斜的key,被過濾掉之后,那么在的spark作業中,自然就不會發生數據傾斜了。
第二點:Hive ETL做處理
通過Hive ETL預先對數據按照key進行聚合,或者是預先和其他表進行join,然后在Spark作業中針對的數據源就不是原來的Hive表了,而是預處 理后的Hive表。此時由於數據已經預先進行過
聚合或join操作了,那么在Spark作業中也就不需要使用原先的shuffle類算子執行這類操作了。Hive ETL中進行group by或者join等shuffle操作時,還是會出現數據傾斜,導致Hive ETL的速度很慢。我們只是把數據傾斜
的發生提前到了Hive ETL中。
第三點:提高shuffle的操作並行度
在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,比如reduceByKey(1000),該參數就設置了這個shuffle算子執行時shuffle read task 的數量。對於Spark SQL中的shuffle類語句,比如group by、join等。
需要設置一個參數,即spark.sql.shuffle.partitions,該參數代表了shuffle read task的並行度,該值默認是200,對於很多場景來說都有點過小
其原理很簡單,增加shuffle read task的數量,可以 讓原本分配給一個 task 的多個 key 分配給多個 task , 從而讓每個 task 處理比原來更少的數據 。舉例來說, 如果原本有5個key,每個key對應10條數據,這
5個key都是分配給一個task的,那么這個task就要處理50條數據。而增加了shuffle read task以后, 每個task就分配到一個key,即每個task就處理10條數據,那么自然每個task的執行時間都會變短了
