Spark學習筆記(9)---性能調優


性能調優

目錄

調節並行度

並行度:其實就是指的是,Spark作業中,各個stage的task數量,也就代表了Spark作業的在各個階段(stage)的並行度。

假設,現在已經在spark-submit腳本里面,給我們的spark作業分配了足夠多的資源,比如50個executor,每個executor有10G內存,每個executor有3個cpu core。基本已經達到了集群或者yarn隊列的資源上限。
但是task沒有設置,或者設置的很少,比如就設置了,100個task。50個executor,每個executor有3個cpu core,也就是說,你的Application任何一個stage運行的時候,都有總數在150個cpu core,可以並行運行。但是你現在,只有100個task,平均分配一下,每個executor分配到2個task,ok,那么同時在運行的task,只有100個,每個executor只會並行運行2個task。每個executor剩下的一個cpu core,就浪費掉了。

  1. 官方推薦,task數量設置成spark application總cpu core數量的2~3倍,比如150個cpu core,基本要設置task數量為300~500

  2. 如何設置一個Spark Application的並行度

SparkConf conf = new SparkConf()
  .set("spark.default.parallelism", "500")

重構RDD與持久化

  1. RDD架構重構與優化
    盡量去復用RDD,差不多的RDD,可以抽取稱為一個共同的RDD,供后面的RDD計算時,反復使用。

  2. 公共RDD一定要實現持久化
    對於要多次計算和使用的公共RDD,一定要進行持久化。

  3. 持久化,是可以進行序列化的
    如果正常將數據持久化在內存中,那么可能會導致內存的占用過大,這樣的話,也許,會導致OOM內存溢出。
    當純內存無法支撐公共RDD數據完全存放的時候,就優先考慮,使用序列化的方式在純內存中存儲。將RDD的每個partition的數據,序列化成一個大的字節數組,就一個對象;序列化后,大大減少內存的空間占用。
    如果序列化純內存方式,還是導致OOM,內存溢出;就只能考慮磁盤的方式,內存+磁盤的普通方式(無序列化)。
    缺點:在獲取數據的時候需要反序列化

  4. 數據的高可靠性
    在內存資源很充沛的情況下,可以持久化一個副本

廣播大變量

而每個task在處理變量的時候,都會拷貝一份變量的副本,如果變量很大的話,就會耗費很多內存。這時可以采用廣播變量的方式,把這個變量廣播出去,因為廣播變量只在每個節點的Executor才一份副本
廣播變量在初始的時候,就只在Driver上有一份。task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的BlockManager中,嘗試獲取變量副本;如果本地沒有,那么就從Driver遠程拉取變量副本,並保存在本地的BlockManager中;此后這個executor上的task,都會直接使用本地的BlockManager中的副本。
executor的BlockManager除了從driver上拉取,也可能從其他節點的BlockManager上拉取變量副本,總之越近越好。

使用Kryo序列化

默認情況下,Spark內部是使用Java的序列化機制,ObjectOutputStream/ObjectInputStream,對象輸入輸出流機制,來進行序列化。
優點:處理起來比較方便,只是在算子里面使用的變量,必須是實現Serializable接口的。
缺點:默認的序列化機制的效率不高,序列化的速度比較慢;序列化以后的數據,占用的內存空間相對還是比較大。
Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化后的數據要更小,大概是Java序列化機制的1/10。
Kryo序列化機制,一旦啟用以后,會生效的幾個地方:

  1. 算子函數中使用到的外部變量,使用Kryo后,優化網絡傳輸的性能,可以優化集群中內存的占用和消耗
  2. 持久化RDD時進行序列化,StorageLevel.MEMORY_ONLY_SER。持久化RDD占用的內存越少,task執行的時候創建的對象,就不至於頻繁的占滿內存,頻繁發生GC。
  3. shuffle:可以優化網絡傳輸的性能

使用Kryo序列化步驟:

  1. SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  2. 注冊你使用到的,需要通過Kryo序列化的自定義類。SparkConf.registerKryoClasses(new Class[]{CategorySoryKey.class})

使用fastutil優化數據格式

  • fastutil介紹

fastutil是擴展了Java標准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue;
fastutil能夠提供更小的內存占用,更快的存取速度;我們使用fastutil提供的集合類,來替代自己平時使用的JDK的原生的Map、List、Set,好處在於,fastutil集合類,可以減小內存的占用,並且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設置元素的值的時候,提供更快的存取速度;
fastutil也提供了64位的array、set和list,以及高性能快速的,以及實用的IO類,來處理二進制和文本類型的文件

fastutil的每一種集合類型,都實現了對應的Java中的標准接口(比如fastutil的map,實現了Java的Map接口),因此可以直接放入已有系統的任何代碼中。
fastutil還提供了一些JDK標准類庫中沒有的額外功能(比如雙向迭代器)。

fastutil除了對象和原始類型為元素的集合,fastutil也提供引用類型的支持,但是對引用類型是使用等於號(=)進行比較的,而不是equals()方法。

  • Spark中應用fastutil的場景
  1. 如果算子函數使用了外部變量;那么第一,你可以使用Broadcast廣播變量優化;第二,可以使用Kryo序列化類庫,提升序列化性能和效率;第三,如果外部變量是某種比較大的集合,那么可以考慮使用fastutil改寫外部變量,首先從源頭上就減少內存的占用,通過廣播變量進一步減少內存占用,再通過Kryo序列化類庫進一步減少內存占用。

  2. 在你的算子函數里,如果要創建比較大的Map、List等集合,可能會占用較大的內存空間,而且可能涉及到消耗性能的遍歷、存取等集合操作;那么此時,可以考慮將這些集合類型使用fastutil類庫重寫,使用了fastutil集合類以后,就可以在一定程度上,減少task創建出來的集合類型的內存占用。避免executor內存頻繁占滿,頻繁喚起GC,導致性能下降。

<dependency>
  <groupId>it.unimi.dsi</groupId>
  <artifactId>fastutil</artifactId>
  <version>7.0.6</version>
</dependency>

UserVisitSessionAnalyzeSpark.java中831行有示例。

調節數據本地化等待時長

PROCESS_LOCAL > NODE_LOCAL > NO_PREF > RACK_LOCAL > ANY
Spark要對任務(task)進行分配的時候, 會計算出每個task要計算的是哪個分片的數據(partition),Spark的task分配算法,會按照上面的順序來進行分配。
可能PROCESS_LOCAL節點的計算資源和計算能力都滿了;Spark會等待一段時間,默認情況下是3s鍾(不是絕對的,還有很多種情況,對不同的本地化級別,都會去等待),到最后,就會選擇一個比較差的本地化級別,比如說,將task分配到靠它要計算的數據所在節點,比較近的一個節點,然后進行計算。

  • 何時調節這個參數

觀察日志,spark作業的運行日志,先用client模式,在本地就直接可以看到比較全的日志。日志里面會顯示,starting task...,PROCESS LOCAL、NODE LOCAL
如果是發現,好多的級別都是NODE_LOCAL、ANY,那么最好就去調節一下數據本地化的等待時長。調節完,應該是要反復調節,每次調節完以后,再來運行,觀察日志
spark.locality.wait, 3s, 6s, 10s...

JVM調優之降低cache操作的內存占比

spark中,堆內存又被划分成了兩塊兒,一塊兒是專門用來給RDD的cache、persist操作進行RDD數據緩存用的;另外一塊兒,用來給spark算子函數的運行使用的,存放函數中自己創建的對象。

默認情況下,給RDD cache操作的內存占比,是0.6,60%的內存都給了cache操作了。但是問題是,如果某些情況下,cache不是那么的緊張,問題在於task算子函數中創建的對象過多,然后內存又不太大,導致了頻繁的minor gc,甚至頻繁full gc,導致spark頻繁的停止工作。性能影響會很大。

可以通過spark ui,如果是spark on yarn的話,那么就通過yarn的界面,去查看你的spark作業的運行統計。可以看到每個stage的運行情況,包括每個task的運行時間、gc時間等等。如果發現gc太頻繁,時間太長。此時就可以適當調價這個比例。

降低cache操作的內存占比,大不了用persist操作,選擇將一部分緩存的RDD數據寫入磁盤,或者序列化方式,配合Kryo序列化類,減少RDD緩存的內存占用;降低cache操作內存占比;對應的,算子函數的內存占比就提升了。這個時候,可能,就可以減少minor gc的頻率,同時減少full gc的頻率。對性能的提升是有一定的幫助的。

spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2

JVM調優之調節Executor堆外內存與連接等待時長

  • Executor堆外內存

有時候,如果你的spark作業處理的數據量特別特別大,幾億數據量;然后spark作業一運行,時不時的報錯,shuffle file cannot find,executor、task lost,out of memory(內存溢出)

可能是說executor的堆外內存不太夠用,導致executor在運行的過程中,可能會內存溢出;然后可能導致后續的stage的task在運行的時候,可能要從一些executor中去拉取shuffle map output文件,
但是executor可能已經掛掉了,關聯的block manager也沒有了;所以可能會報shuffle output file not found;resubmitting task;executor lost;spark作業徹底崩潰。

--conf spark.yarn.executor.memoryOverhead=2048

spark-submit腳本里面,去用--conf的方式,去添加配置; 切記,不是在你的spark作業代碼中,用new SparkConf().set()這種方式去設置,不要這樣去設置,是沒有用的!一定要在spark-submit腳本中去設置。

默認情況下,這個堆外內存上限大概是300多M;通常項目,真正處理大數據的時候,這里都會出現問題,導致spark作業反復崩潰,無法運行;此時就會去調節這個參數,到至少1G(1024M),甚至說2G、4G

  • 連接等待時長

如果Executor遠程從另一個Executor中拉取數據的時候,那個Executor正好在gc,此時呢,無法建立網絡連接,會卡住;spark默認的網絡連接的超時時長,是60s;如果卡住60s都無法建立連接的話,那么就宣告失敗了。

碰到某某file。一串file id。uuid(dsfsfd-2342vs--sdf--sdfsd)。not found。file lost。很有可能是有那份數據的executor在jvm gc。所以拉取數據的時候,建立不了連接。然后超過默認60s以后,直接宣告失敗。

--conf spark.core.connection.ack.wait.timeout=300

spark-submit腳本,切記,不是在new SparkConf().set()這種方式來設置的。通常來說,可以避免部分的偶爾出現的某某文件拉取失敗,某某文件lost

參考資料

《北風網Spark項目實戰》
github: https://github.com/yangtong123/StudySpark


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM