一、數據傾斜
數據傾斜一般發生在對數據進行重新划分以及聚合的處理過程中。執行Spark作業時,數據傾斜一般發生在shuffle過程中,因為Spark的shuffle過程需要進行數據的重新划分處理。在執行shuffle過程中,Spark需要將各個節點上相同key的數據拉取到某個處理節點的task中進行處理,如對事實數據按照某個維度key進行聚合或者join等含shuffle操作。在此過程中,如果各個key對應的數據量相差較大,存在某一個或者幾個key對應的數據量特別大,就是發生了數據傾斜。例如一個聚合作業中,大部分key對應100條數據,但是少數個別key卻對應了100萬條左右的數據,那么執行時若一個task處理一個key對應的數據,則大部分task很快(如1秒鍾)完成,個別task要處理100萬條數據,需要花費較多的時間(如1個小時)完成。而整個Spark作業的運行速度是由運行時間最長的task決定的,這里整個作業的運行時間變得很長,不能充分利用Spark的並行能力,極大地影響作業的處理性能。
如圖,是一個簡單的數據傾斜示意圖,在shuffle過程中對數據按照key進行重新划分后,其中一個key(hello)對應的數據量遠大於其他key(world,start)的數據量,從而出現了數據傾斜。這就導致后續的處理任務中,task1的運行時間肯定遠遠高於其他的task2、task3的運行時間。該Spark作業的運行時間也由task1的運行時間決定。因此,在處理過程中出現數據傾斜時,Spark作業的運行會非常緩慢,無法體現出Spark處理大數據的高效並行優勢,甚至可能因為某些task處理的數據量過大導致內存溢出,使得整個作業無法正常執行。
二、數據傾斜的現象
數據傾斜發生后現現象會有兩種情況:
1、大部份的Task執行的時候會很快,當發生數據傾斜后的task會執行很長時間。
2、有時候數據傾斜直接報OOM即:JVM Out Of Memory內存溢出的錯誤。
三、解決方法
(1)過濾傾斜key優化法
如果在Spark作業中發生數據傾斜時,能夠確定只有少數個別的key數據量傾斜較大,而它們又對計算結果影響不大,我們可以采用這種方法。針對發生傾斜的key,預先直接過濾掉它們,使得后面的計算能夠高效地快速完成。
(2)提升shuffle操作的並行度優化法
在Spark的操作算子中,有一些常用的操作算子會觸發shuffle操作,如reduceByKey、aggregateByKey、groupByKey、cogroup、join、repartition等。Spark作業中出現數據傾斜時,很可能就是我們的開發代碼中使用的這些算子的計算過程造成的。因為這些算子的執行會觸發shuffle過程,進而引起數據的重新划分,導致數據傾斜的產生。默認情況下,我們不會考慮shuffle操作的並行度分配,而是交由Spark控制。但是Spark的默認shuffle並行度(即shuffle read task的數量)為200,這對於很多場景都有點過小。在出現數據傾斜時,我們可以顯式提高shuffle操作的並行度,以緩解某些一般的數據傾斜情況。
這種方法需要在使用shuffle類算子時,自己指定並行參數。在開發中我們可以對使用到的shuffle類算子預先傳入一個並行參數,如aggregateByKey(1000),顯式設置該算子執行時shuffle read task的數量為1000,增加實際執行的並行度到1000。這樣增加shuffle read task的數量后,可以讓原本分配給一個task的多個key數據重新划分給多個task,通過提高並行度再次分散數據,從而減少每個task的處理數據量,有效緩解和減輕數據傾斜的影響。
上圖展示了一個並行度優化的簡單情形。如圖所示,由於低並行度發生數據傾斜問題時,在提高了shuffle操作的並行度之后,之前由一個task處理的數據量被重新分散到不同的多個task(task1、task2、task3)中進行處理,這樣原來的各個task每次處理的數據量減少很多,從而緩解了存在的數據傾斜問題,能夠明顯提高處理速度。
顯式提高shuffle操作的並行度的方法也是一種易於實現的優化方法。這種方法可以有效緩解數據量較大而並行度較低的數據傾斜問題,但是它並不能徹底根除傾斜問題。如果數據傾斜的原因是某些(個)key數據量較大,則這時提高並行度並不能改善數據傾斜的低性能。同樣的這種方法適用場景也很有限,只適用於一些普通場景。
(3)兩階段聚合優化法
在Spark執行作業中,涉及聚合操作時往往會比較容易出現數據傾斜現象。如果在Spark作業執行聚合類操作算子如reduceByKey、aggregateByKey等的過程中,發生數據傾斜時,即出現某些key要聚合的數據量較大的情況下,我們可以采用兩階段聚合優化方法,先將相同的數據打亂成不同的局部數據,進行局部聚合后再統一合並聚合。
上圖所示,兩階段聚合方法的第一階段進行數據的拆分與局部聚合,先給每個待聚合的key附加一個隨機前綴,如10以內的隨機數,此時原先一樣的key數據就擴充成了多個不一樣的key’數據。例如圖中原先的數據(hello,1)、(hello,1)、(hello,1)、(hello,1)、(hello,1)、(hello,1)經過這樣的處理后變為(1_hello,1)、(1_hello,1)、(2_hello,1)、(2_hello,1)、(10_hello,1)、(10_hello,1);然后對處理后的數據執行相應的reduceByKey等聚合操作,進行局部聚合,得到局部聚合結果如(1_hello,2)、(2_hello,2)、(10_hello,2)。第二階段進行局部數據的合並與聚合,先去除聚合好后的局部數據中各個key’ 的前綴,得到原先的key數據,再次進行全局聚合操作,得到最終的聚合結果。如圖中先將局聚合數據(1_hello,2)、(2_hello,2)、(10_hello,2)去除前面添加的前綴變成(hello,2)、(hello,2)、(hello,2),然后再進行一次全局聚合得到最終結果(hello,6)。
兩階段聚優化方法通過將原本相同的key先附加隨機前綴的方式,變成多個不同的key’,以達到分散傾斜數據的目的。這種方法可以讓原本被一個task負責的大量數據分散開到多個task中執行局部聚合,進而解決單個task處理數據量過多的問題;然后在局部聚合的基礎上,再次進行全局聚合,得到最終結果。雖然這樣做使得原本只需一步的聚合操作變為兩步聚合操作,但是卻有效地減輕了由於數據傾斜帶來的性能影響,反而能夠加速整個聚合作業的執行速度。
此方法非常適合於由於聚合類的shuffle操作導致的數據傾斜,可以大幅度緩解聚合算子的數據傾斜,迅速提升Spark作業的整體性能。但是該優化方法僅僅適用於聚合類的數據傾斜問題,適用范圍也相對較窄,不適用其他情況的數據傾斜處理。
(4)轉化join優化法
此優化方法適用於針對RDD的join操作,而且join操作中兩個RDD或者表的數據量相差較大,即其中一個RDD或者表的數據量相對比較小,這時我們可以考慮此種方法。例如事實表連接小量的維度表進行代理鍵查找替換的過程,就是一個大表join小表的情形。若進行連接計算的一個RDD數據量相對較小,完全可以先緩存在內存中時,我們可以將較小的RDD中的數據存到一個Broadcast變量(即廣播變量)中,先廣播到各個節點中,即在每個節點的內存中緩存一份;之后對另外一個較大的RDD(數據)執行一個map類算子,在該map函數內,從本節點的Broadcast變量中獲取較小的RDD的全量數據,與當前RDD的各條數據進行比對連接,進而實現預期的join操作。其基本過程如圖,將原來的RDD1 join RDD2的task通過將RDD1作為廣播變量變成了只需要RDD1 map RDD2的task執行,避免了join算子的執行,從而消除了原先join過程中數據傾斜的出現。
這種方法的主要思路是不使用直接的join算子實現連接,而是根據連接表的特點使用廣播變量(Broadcast)和map類算子實現實際的join操作效果,不用經過shuffle過程,從而有效規避join的shuffle操作,徹底避免此過程中數據傾斜的發生。因而,這種優化方法對join操作導致的數據傾斜問題有較好的優化效果,因為它避免了shuffle過程的出現,即從根本上避免了數據傾斜的產生。當然該方法也僅適用於一個大表join一個小表的情況,不適用於兩個大表join的情況,因為如果廣播出去的RDD數據量較大時,廣播過程也會有很大的開銷,甚至有可能發生內存溢出,反而降低整體的性能。
三、總結
在實際的Spark作業中,如果發現有數據傾斜現象,很多情況下,可以根據具體的業務場景和發生原因使用上述方法優化作業的執行性能。但是如果要處理的數據傾斜場景較為復雜時,可能需要組合使用上述多種方法,這時我們要能夠根據自己的實際情況,考慮不同的思路,靈活地運用多種方法,解決自己的數據傾斜問題。