大數據之性能調優方面(數據傾斜、shuffle、JVM等方面)


一、對於數據傾斜的發生一般都是一個key對應的數據過大,而導致Task執行過慢,或者內存溢出(OOM),一般是發生在shuffle的時候,比如reduceByKey,groupByKey,sortByKey等,容易產生數據傾斜。

那么針對數據傾斜我們如何解決呢?我們可以首先觀看log日志,以為log日志報錯的時候會提示在哪些行,然后就去檢查發生shuffle的地方,這些地方比較容易發生數據傾斜。

其次,因為我們都是測試的,所以都是在client端進行的,也可以觀察WebUI,上面也會有所有對應的stage的划分等。

解決方案:

①聚合源數據,我們的數據一般來自Hive表,那么在生成Hive表的時候對數據進行聚合,按照key進行分組,將key所對應的value以另一種方式存儲,比如拼接成一個字符串這樣的,我們就可以省略groupByKey和reduceByKey的操作,那么我們就避免了shuffle的產生,如果不能完美的拼接成字符串,那我們也至少可以減少數據量,提高一點性能

②過濾key操作,這種方式就有點粗暴了,如果你老大允許的話,這也是一種不錯的方案。

③提高並行度,我們可以通過提高shuffle的reduce的並行度來提高reduce端的task執行數量,從而分擔數據壓力,但是如果出現之前運行時OOM了,加大了reduce端的task的數量,可以運行了,但是執行時間一長就要放棄這種方案。

④雙重聚合,用於groupByKey和reduceByKey,比較使用join(hive中join優化也有類似的雙重聚合操作設置參數hive.mao.aggr=true和hive.groupby.skewindata=true具體過程這里不做介紹)先加入隨機數進行分組,然后前綴去掉在進行分組

⑤將reduce的join轉換成map的join,如果兩個RDD進行join,有一個表比較小的話,可以將小表進行broadcast,這樣每個節點都會用一個小表,如果兩個表都很大,可以先將兩個表按相同的方式進行分區操作,最后合並。雖然map join替換了reduce join ,但這是我們消耗了部分內存換來的,所以我們需要考慮OOM現象。

⑥sample抽樣分解聚合,也就是說將傾斜的key單獨拉出來,然后用一個RDD進行打亂join

⑦隨機數+擴容表,也就是說通過flatMap進行擴容,然后將隨機數打入進去,再進行join,但是這樣不能從根本上解決數據傾斜,只能緩解這種現象。

⑧如果你所在公司很有錢,可以直接加機器,硬件足夠高,這些也就不是問題了。

 

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

shuffle方面的性能調優:

設置map輸出文件的合並參數  .set(“spark.shuffle.consolidation”,"true"),默認是不開啟的,設置為true,則無法並發執行。

設置map端的內存緩沖區大小,和reduce端內存的大小,這個主要針對文件過大,導致性能低,但是調優效果不是很明顯。  

  map端參數:  .set("spark.shuffle.file.buffer","64k")

  reduce端參數  .set("spark.shuffle.memoryFraction","0.3")

這兩個參數根據我們觀察日志的讀寫文件的多少來調節的,適量調節,如果是stand-alone模式觀察4040頁面,如果是yarn模式直接進入yarn Ulog日志查看。

shuffle中有以下幾種shuffle,具體情況根據你的業務要求來取舍。hashshuffle+consolidation、sortshuffle、鎢絲shuffle(tungsten-sort他里面有自己的內存機制,可以有效的防止內存溢出現象)

mappartitions的使用必須要慎重!因為mappartition是取出一個分區的數據進行操作,那么如果數據過大我們就有可能造成OOM溢出現象的發生,所以mappartitions的使用應該只能在數據量小的情況下。

默認shuffle的內存占用為20%,持久化占用60%,根據具體業務調整。

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

JVM方面性能調優:

內存不足的時候會導致minor gc的頻繁,導致spark停止工作。

頻繁進行gc的時候,可能有些年輕代里面的對象被回收,但是因為內存性能不足的問題,導致傳入了老年代,而如果老年代里面內存溢滿,就會進行full gc操作,也就是全局清理機制,這個過程時間會很長,十幾秒中至幾十分鍾,這樣就導致了spark的性能變低。

調優:

降低cache操作內存的占比,大不了用persist操作,將一部分數據寫入磁盤或者是進行序列化操作,減少RDD緩存的內存占用,降低cache操作的內存占用,那么算子函數的內存占比就上去了,可以減少頻繁的gc操作,簡單來說就是讓task執行算子函數的時候擁有更多可用的內存。

spark.storage.memoryFraction=0.6  cache的默認占用內存是60%。上述已說明

executor對外內存的設置,如果我們發現shuffle output  file  not  found的錯誤,那么我們就需要調節一下對外內存了

(寫在spark-submit參數中)--conf  spark.yarn.executor.memoryOverhead=2048(基於yarn模式)

這個參數不是在sparkContext中調節的,而是在spark-submit中指定的參數,可以防止OOM溢出現象。

偶爾也有可能出現連接等待超時的現象,因為executor跨接點拉取數據的時候,可能另一個executor正在進行JVM垃圾回收(所有線程停止),導致連接報錯,not found,這時候我們就需要調節連接參數,要在spark-submit中指定spark.core.connection.ack.wait.timeout=300設置等待時間稍微長一點即可。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM