【使用場景】
兩個RDD進行join的時候,如果數據量都比較大,那么此時可以sample看下兩個RDD中的key分布情況。如果出現數據傾斜,是因為其中某一個RDD中的少數幾個key的數據量過大,而另一個RDD中的所有key都分布比較均勻,此時可以考慮采用本解決方案。
【解決方案】
- 對有數據傾斜那個RDD,使用sample算子采樣出一份樣本,統計下每個key的數量,看看導致數據傾斜數據量最大的是哪幾個key。
- 然后將這幾個key對應的數據從原來的RDD中拆分出來,形成一個單獨的RDD,並給每個key都打上n以內的隨機數作為前綴;不會導致傾斜的大部分key形成另外一個RDD。
- 接着將需要join的另一個RDD,也過濾出來那幾個傾斜key對應的數據並形成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴,不會導致傾斜的大部分key也形成另外一個RDD。
- 再將附加了隨機前綴的獨立RDD與另一個膨脹n倍的獨立RDD進行join,這樣就可以將原先相同的key打散成n份,分散到多個task中去進行join了。
- 而另外兩個普通的RDD就照常join即可。
- 最后將兩次join的結果使用union算子合並起來即可,就是最終的join結果。
【方案優點】
對於兩個大RDD進行join時的數據傾斜,如果只是某幾個key導致了傾斜,采用該方式可以用最有效的方式打散key進行join。而且只需要針對少數傾斜key對應的數據進行擴容n倍,不需要對全量數據進行擴容,避免了占用過多內存。
【方案局限】
如果導致傾斜的key特別多的話,比如成千上萬個key都導致數據傾斜,就不能使用本解決方案了。
【代碼實現】
代碼實現:https://github.com/wwcom614/Spark
