1,Spark參數調優


Spark調優

一、代碼規范

  • 調優順序:spark任務的調優順序依次是代碼規范、資源參數(並行度)、數據傾斜、shuffle調優、業務層面

1.1 避免創建重復RDD

  • 對於新手,或者一些較為復雜的spark任務,可能會忘記之前對於某一份數據已經創建過一個RDD,而重復創建,造成不必要的計算;

1.2 盡量復用同一個RDD

  • 下游需要使用key-value類型和key類型的兩個RDD,這兩個RDD的數據完全相同,只是格式不同,那么就只需要創建key-value這一個RDD就行,而使用key類型的RDD直接復用key-value類型的RDD就行了;因此,對於需要使用數據相同,格式不同的數據源時,最好復用字段較多的RDD;

1.3 多次使用的RDD要持久化

  • 當一個RDD被使用了多次,比如上面的復用同一個RDD,那么這個RDD就要做持久化,否則這個RDD就會被計算多次;例如,a = rdd1.map(); b = rdd1.map(); 那么就需要對 rdd1做持久化rdd1.persist(),否則rdd1就會被計算兩次;

1.4 使用高性能算子

  • 用reduceByKey替代groupByKey求聚合:前者是map-side預聚合算子,會在map端預聚合,類似於Combiner;
  • 用combineByKey代替groupByKey求topN:前者可以自定義分區內合並和分區間合並的計算邏輯,也是預聚合;
  • mapPartition替代map:一次調用處理一個分區的數據,對於需要在map中創建很多重復對象的場景,最好使用mapPartition,同時注意OOM問題;
  • foreachPartition替代foreach:道理同mapPartition一樣;在需要將rdd的數據寫入MySQL時,后者是一條一條數據插入,並且每條數據都會創建一次數據庫連接;而前者則是一個分區操作一次,性能有很高的提升;

1.5 好習慣

  • 廣播大變量:當需要在算子中使用大變量(1g以內)時,最好將大變量廣播到Executor中,例如:rdd1.filter(x=>slant.contains(x)),如果slant在20M~1G之間,就可以將slant廣播;

  • filter后coalesce:由於filter后,各個分區中的數據不再均衡,使用coalesce再平衡一下分區數據;

  • 優化數據結構:對於算子中的數據結構,能用數組就不要用集合類型,最好使用字符串代替對象,用基本類型代替字符串;

  • 使用Kryo序列化:spark中的三個場景會涉及到序列化,算子中使用外部變量、將自定義對象作為RDD中的類型、可序列化的持久化策略(如MEMORY_ONLY_SER),使用kryo的性能會高很多;使用Kryo序列化時,最好注冊所有的自定義類;conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]));

  • persist后unpersist:unpersist是立即釋放緩存,對復用的RDD使用persist緩存后,需要使用行動算子提交job后,才會真正的緩存,然后再使用unpersist釋放緩存;所以當persist緩存的RDD不會再使用時,最好是手動unpersist釋放緩存;

二、參數調優

資源參數


1.1 --num-executors 100

  • 參數解釋:任務可以申請的Excutor最大數量,並不是一次性分配100個Excutor;Excutor數量會在任務的運行過程中動態調整,有 job處於pending狀態則申請Excutor,一個Excutor空閑時間過長則將其移除;Excutor的數量決定了任務的並行度;

  • 申請Excutor:當有任務處於pending狀態(積壓)超過一定時間,就認為資源不足,需要申請Excutor;

    何時申請:當pending積壓的任務超過spark.dynamicAllocation.schedulerBacklogTimeout(1秒)就申請
    申請多少:申請數量 = 正在運行和pending的任務數量 * spark.dynamicAllocation.executorAllocationRatio(1)/ 並行度
    
  • 移除Excutor:

    spark.dynamicAllocation.enabled(false)決定是否使用資源動態分配;必須開啟外部shuffle;
    spark.dynamicAllocation.executorIdleTimeout (60s)空閑60s就會被回收(並且沒有緩存);
    
  • 決定任務的並行度:executor的數量就是工作節點的數量,直接決定了任務的並行度;准確的說是由executor*core決定的;這只是物理上提供的最大並行度,而任務實際的並行度還是由程序中設置的並行度決定,也就是RDD的分區數;

1.2 --executor-memory 5g

  • 參數解釋:每個executor的內存大小;對於spark調優和OOM異常,通常都是對executor的內存做調整,spark內存模型也是指executor的內存分配,所以executor的內存管理是非常重要的;
  • 內存分配:該參數是總的內存分配,而在任務運行中,會根據spark內存模型對這個總內存再次細分;在實際生產中,通常需要根據程序中使用的緩存內存和計算內存,來划分不同的比例,從而合理的利用內存,避免OOM,提高性能;

1.3 --executor-cores 4

  • 參數解釋:每個executor的核數;是每個executor內部的並行度,即一個executor中可同時執行的task數量;
  • 並行度:core的數量決定了一個executor同時執行的task數量,如果task數量越多,則意味着占用的executor內存也越多;所以,在executor內存固定的情況下,可以通過增加executor數量,減少core數量,使任務總並行度不變的前提下,降低OOM風險;如果任務需要廣播大變量,可以增大core數,使更多的task共用廣播變量;

1.4 --driver-memory

  • 參數解釋:driver端的內存大小;如果要collect大量數據到driver端,或者要廣播大變量時,就需要調大driver端的內存;一般給個3G、4G就夠了;

內存參數


spark.storage.memoryFraction、spark.shuffle.memoryFraction(spark1.6之前靜態內存管理)

  • 參數解釋:在spark1.6之前,使用的是靜態內存管理,而這兩個參數就是用來決定緩存內存和執行內存大小的;在spark1.6及之后,采用的是統一內存管理(也叫動態內存管理),這兩個參數就廢棄了(但也可以讓它生效)

spark.memory.fraction(spark1.6及之后,統一內存管理)

  • 參數解釋:spark1.6及之后采用的是統一內存管理,也叫動態內存管理,顧名思義,就是緩存內存和執行內存統一管理,並且是動態的;首先解釋“統一”:spark.memory.fraction是堆內內存中用於執行、shuffle、緩存的內存比例;這個值越低,則執行時溢出到磁盤更頻繁、同時緩存被逐出內存也更頻繁;一般使用默認值就好了,spark2.2默認是0.6,那么剩下的0.4就是用於存儲用戶的數據結構(比如map算子中定義的中間數據)以及spark內部的元數據;

spark.memory.storageFraction

  • 參數解釋:存儲內存不會被逐出內存的總量,這個是基於spark.memory.fraction的占比;這個值越高,則執行、shuffle的內存就越少,從而溢寫到磁盤就越頻繁;一般使用默認值就好了,spark2.2默認是0.5;

spark.kryoserializer.buffer.max

  • 參數解釋:kryo序列化時使用的緩存大小;如果collect大量數據到driver端,可能會拋buffer limit exceeded異常,這個時候就要調大該參數;默認是64m,掛了就設置為1024m;如果序列化的一個對象很大,那么就需要增大改參數的值spark.kryoserializer.buffer(默認64k);

dfs.client.block.write.locateFollowingBlock.retries

  • 參數解釋:寫入塊后嘗試關閉的次數;Unable to close file because the last block does not have enough number of replicas異常的原因;2.7.4已修復;默認是5,掛了就設置為6;

spark.driver.maxResultSize

  • 參數解釋:一次collect到driver端的最大內存大小,Total size of serialized results of 16 tasks (1048.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)異常時需要調大該值;默認1g,掛了就設置為2g,0表示不限制;

shuffle參數


spark.shuffle.file.buffer

  • 參數解釋:shuffle write時,會先寫到BufferedOutputStream緩沖區中,然后再溢寫到磁盤;該參數就是緩存區大小,默認32k,建議設置為64k;

spark.shuffle.spill.batchSize

  • 參數解釋:shuffle在spill溢寫過程中需要將數據序列化和反序列化,這個是一個批次處理的條數;默認是10000,可以調大該值,2萬5萬都可以;

spark.shuffle.io.maxRetries

  • 參數解釋:shuffle read拉取數據時,由於網絡異常或者gc導致拉取失敗,會自動重試,改參數就是配置重試次數,在數據量達到十億、百億級別的時候,最好調大該參數以增加穩定性;默認是3次,建議設置為10到20;

spark.shuffle.io.retryWait

  • 參數解釋:該參數是 spark.shuffle.io.maxRetries的重試間隔,默認是5s,建議設置為20s;

spark.reducer.maxSizeInFlight

  • 參數解釋:shuffle read拉取數據時的緩存區大小,也就是一次拉取的數據大小;注意是從5個節點拉取48M的數據,而不是從一個節點獲取48M;默認48m,建議設置為96m;
  • 原理解釋:從遠程節點拉取數據時,是並行的從發送5個請求,每個請求拉取的最大長度是 48M / 5,但是拉取時都是以block為最小單位的,所以實際獲取的有可能會大於這個值;

spark.reducer.maxReqsInFlight

  • 參數解釋:shuffle read時,一個task的一個批次同時發送的請求數量;默認是 Int的最大值;
  • 原理解釋:構造遠程請求時,單個請求大小限制是 48M / 5,而在一次拉取遠程block數據時,是按批次拉取,一個批次的大小限制是 48M,所以理想情況下一個批次會發送5個請求;但如果block的分布不均勻,導致一個請求的請求大小遠小於 48M / 5 (例如1M),而一個批次的大小限制是48M,所以這個批次就會發送48個請求;當節點數較多時,一個task的一個批次可能會發送非常多的請求,導致某些節點的入站連接數過多,從而導致失敗;

spark.reducer.maxReqSizeShuffleToMem

  • 參數解釋:shuffle read時,從遠程拉取block如果大於這個值就會強行落盤,默認是Long的最大值,建議小於2G,一般設為200M,spark2.2開始生效;(spark2.3開始換成了這個參數spark.maxRemoteBlockSizeFetchToMem);shuffle read這個部分的參數在spark的版本更新中變化較大,所以在優化時一定要根據集群的spark版本設置對應的參數;
  • 原理解釋:一次拉取請求中,如果要拉取的數據比較大,內存放不下,就直接落盤;對於數據傾斜比較嚴重的任務,有可能一個block非常大,而沒有足夠的內存存放時就會OOM,所以最好限制該參數的大小;還有一個原因就是 netty的最大限制是2G,所以大於2G肯定會報錯;spark2.4該參數的默認值是:Int的最大值-512 (2G,減512用來存儲元數據);spark3.0的最大值也是2G,並且給了默認值200M;

spark.reducer.maxBlocksInFlightPerAddress

  • 參數解釋:shuffle read時,一個節點同時被拉取的最大block數,如果太多可能會導致executor服務或nodemanager崩潰;默認Int的最大值;(spark2.2.1開始支持);

  • 原理解釋:shuffle read時每個task都會從shuffle write所在的節點拉取自己的block數據,如果一個shuffle write的executor運行了9個task,就會write9個data文件;如果shuffle read有1000核,那么同時運行1000個task,每個task要到shuffle write所在的executor獲取9個block,極端情況下一個shuffle write的executor會被請求9000次;當節點數非常多時,一個shuffle write的executor會同時被很多節點拉取block,從而導致失敗;

文件相關


spark.sql.files.maxPartitionBytes

  • 參數解釋:sparksql讀取文件時,每個分區的最大文件大小,這個參數決定了讀文件時的並行度;默認128M;例如一個300M的text文件,按128M划分為3個切片,所以SparkSQL讀取時最少有3個分區;
  • 原理解釋:sparksql讀取文件的並行度=max(spark默認並行度,切片數量(文件大小/ 該參數));這里要注意壓縮文件是否可分割;但是要注意,對於parquet格式,一個切片對應一個row group;

spark.sql.parquet.compression.codec

  • 參數解釋:parquet格式的壓縮方式,默認是snappy,可以選擇gzip、lzo、或者uncompressed不壓縮;

spark.io.compression.codec

  • 參數解釋:spark中rdd分區、廣播變量、shuffle輸出的壓縮格式,spark2.2默認是lz4;

spark.serializer

  • 參數解釋:spark序列化的實現,這里的序列化是針對shuffle、廣播和rdd cache的序列化方式;默認使用java的序列化方式org.apache.spark.serializer.JavaSerializer性能比較低,所以一般都使用org.apache.spark.serializer.KryoSerializer ,使用Kryo序列化時最好注冊十分需要空間的類型,可以節省很多空間;spark task的序列化由參數spark.closure.serializer配置,目前只支持JavaSerializer;

spark.sql.hive.convertMetastoreParquet

  • 參數解釋:是否采用spark自己的Serde來解析Parquet文件;Spark SQL為了更好的性能,在讀取hive metastore創建的parquet文件時,會采用自己Parquet Serde,而不是采用hive的Parquet Serde來序列化和反序列化,這在處理null值和decimal精度時會有問題;默認為true,設為false即可(會采用與hive相同的Serde);

spark.sql.parquet.writeLegacyFormat

  • 參數解釋:是否使用遺留的(hive的方式)format來寫Parquet文件;由於decimal精度問題,hive讀取spark創建的Parquet文件會報錯;所以這里的spark采用與hive相同的writeFormat來寫Parquet文件,這樣hive在讀取時就不會報錯;並且上下游表的精度最好一致,例如a表的字段精度為decimal(10,2),b表也最好是decimal(10,2);
  • 原理解釋:在hive中decimal類型是固定的用int32來表示,而標准的parquet規范約定,根據精度的不同會采用int32和int64來存儲,而spark就是采用的標准的parquet格式;所以對於精度不同decimal的,底層的存儲類型有變化;所以使用spark存儲的parquet文件,在使用hive讀取時報錯;將spark.sql.parquet.writeLegacyFormat(默認false)配置設為true,即采用與hive相同的format類來讀寫parquet文件;

參考文章


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM