一、對於數據傾斜的發生一般都是一個key對應的數據過大,而導致Task執行過慢,或者內存溢出(OOM),一般是發生在shuffle的時候,比如reduceByKey,groupByKey,sortByKey等,容易產生數據傾斜。
那么針對數據傾斜我們如何解決呢?我們可以首先觀看log日志,以為log日志報錯的時候會提示在哪些行,然后就去檢查發生shuffle的地方,這些地方比較容易發生數據傾斜。
其次,因為我們都是測試的,所以都是在client端進行的,也可以觀察WebUI,上面也會有所有對應的stage的划分等。
解決方案:
①聚合源數據,我們的數據一般來自Hive表,那么在生成Hive表的時候對數據進行聚合,按照key進行分組,將key所對應的value以另一種方式存儲,比如拼接成一個字符串這樣的,我們就可以省略groupByKey和reduceByKey的操作,那么我們就避免了shuffle的產生,如果不能完美的拼接成字符串,那我們也至少可以減少數據量,提高一點性能
②過濾key操作,這種方式就有點粗暴了,如果你老大允許的話,這也是一種不錯的方案。
③提高並行度,我們可以通過提高shuffle的reduce的並行度來提高reduce端的task執行數量,從而分擔數據壓力,但是如果出現之前運行時OOM了,加大了reduce端的task的數量,可以運行了,但是執行時間一長就要放棄這種方案。
④雙重聚合,用於groupByKey和reduceByKey,比較使用join(hive中join優化也有類似的雙重聚合操作設置參數hive.mao.aggr=true和hive.groupby.skewindata=true具體過程這里不做介紹)先加入隨機數進行分組,然后前綴去掉在進行分組
⑤將reduce的join轉換成map的join,如果兩個RDD進行join,有一個表比較小的話,可以將小表進行broadcast,這樣每個節點都會用一個小表,如果兩個表都很大,可以先將兩個表按相同的方式進行分區操作,最后合並。雖然map join替換了reduce join ,但這是我們消耗了部分內存換來的,所以我們需要考慮OOM現象。
⑥sample抽樣分解聚合,也就是說將傾斜的key單獨拉出來,然后用一個RDD進行打亂join
⑦隨機數+擴容表,也就是說通過flatMap進行擴容,然后將隨機數打入進去,再進行join,但是這樣不能從根本上解決數據傾斜,只能緩解這種現象。
⑧如果你所在公司很有錢,可以直接加機器,硬件足夠高,這些也就不是問題了。
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
shuffle方面的性能調優:
設置map輸出文件的合並參數 .set(“spark.shuffle.consolidation”,"true"),默認是不開啟的,設置為true,則無法並發執行。
設置map端的內存緩沖區大小,和reduce端內存的大小,這個主要針對文件過大,導致性能低,但是調優效果不是很明顯。
map端參數: .set("spark.shuffle.file.buffer","64k")
reduce端參數 .set("spark.shuffle.memoryFraction","0.3")
這兩個參數根據我們觀察日志的讀寫文件的多少來調節的,適量調節,如果是stand-alone模式觀察4040頁面,如果是yarn模式直接進入yarn Ulog日志查看。
shuffle中有以下幾種shuffle,具體情況根據你的業務要求來取舍。hashshuffle+consolidation、sortshuffle、鎢絲shuffle(tungsten-sort他里面有自己的內存機制,可以有效的防止內存溢出現象)
mappartitions的使用必須要慎重!因為mappartition是取出一個分區的數據進行操作,那么如果數據過大我們就有可能造成OOM溢出現象的發生,所以mappartitions的使用應該只能在數據量小的情況下。
默認shuffle的內存占用為20%,持久化占用60%,根據具體業務調整。
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
JVM方面性能調優:
內存不足的時候會導致minor gc的頻繁,導致spark停止工作。
頻繁進行gc的時候,可能有些年輕代里面的對象被回收,但是因為內存性能不足的問題,導致傳入了老年代,而如果老年代里面內存溢滿,就會進行full gc操作,也就是全局清理機制,這個過程時間會很長,十幾秒中至幾十分鍾,這樣就導致了spark的性能變低。
調優:
降低cache操作內存的占比,大不了用persist操作,將一部分數據寫入磁盤或者是進行序列化操作,減少RDD緩存的內存占用,降低cache操作的內存占用,那么算子函數的內存占比就上去了,可以減少頻繁的gc操作,簡單來說就是讓task執行算子函數的時候擁有更多可用的內存。
spark.storage.memoryFraction=0.6 cache的默認占用內存是60%。上述已說明
executor對外內存的設置,如果我們發現shuffle output file not found的錯誤,那么我們就需要調節一下對外內存了
(寫在spark-submit參數中)--conf spark.yarn.executor.memoryOverhead=2048(基於yarn模式)
這個參數不是在sparkContext中調節的,而是在spark-submit中指定的參數,可以防止OOM溢出現象。
偶爾也有可能出現連接等待超時的現象,因為executor跨接點拉取數據的時候,可能另一個executor正在進行JVM垃圾回收(所有線程停止),導致連接報錯,not found,這時候我們就需要調節連接參數,要在spark-submit中指定spark.core.connection.ack.wait.timeout=300設置等待時間稍微長一點即可。