Spark調優



下面調優主要基於2.0以后。

代碼優化

1.語言選擇

如果是ETL並進行單節點機器學習,SparkR或Python。優點:語言相對簡單;缺點:使用語言自身的數據結構時,效率低,因為這些數據需要轉換。

如果用到自定義transformations或自定義類,Scala或Java。優點:性能好;缺點:語言相對復雜。

2.API選擇

  • DataFrames
    • 大多數情況下的最佳選擇
    • 更有效率的儲存(Tungsten,減少GC開銷)和處理(Catalyst優化查詢)
    • 全階段代碼生成
    • 直接內存訪問
    • 沒有提供域對象編程和編譯時檢查
    • 部分算子在特定情況可優化,如order后take、window function等
    • 少數功能沒有相應的算子
  • DataSets
    • 適用於性能影響可接受的復雜ETL管道
    • 通過Catalyst提供查詢優化
    • 提供域對象編程和編譯時檢查
    • 較高GC開銷
    • 打破整個階段的代碼生成
  • RDDs
    • 在Spark 2.x中,基本不需要使用,除非某些功能上面兩種API沒有
    • 增加序列化/反序列化開銷。上面兩種結構不必對整個對象進行反序列化也可訪問對象屬性。
    • 沒有通過Catalyst進行查詢優化
    • 沒有全階段代碼生成
    • 高GC開銷

3.內存

  • 數據類型

盡量用DF,默認通常比自定義有更多優化。

另外,優先用原始數據結構和數組,次之為String而非其他集合和自定義對象。可以把自定義類改寫成String的形式來表示,即用逗號,豎線等隔開每個成員變量,又或者用json字符串存儲。

數字或枚舉key優於string

估計內存消耗的方法:1.cache並在UI的Storage頁查看。2.org.apache.spark.util.SizeEstimator可估計object的大小

  • GC(1.6以后)

    • spark.memory.fraction默認0.6,即堆內存減去reserved的300MB后的60%,它用來存儲計算和cache所需的數據(storageFraction設置這兩類數據的邊界。例如0.4代表當execution需要空間時會趕走超過40%的那部分storage空間),剩下的40%存儲Spark的元數據。

    • GC調優:在UI或者配置一些選項后能在worker節點的日志查看GC信息。

    • 過多full GC,則要增加executor內存。

    • 過多minor GC,則調大伊甸區。

    • 如果老年代空間不足,減少用於cache的內存,即減少fraction,同時減少storageFraction。或者直接調小伊甸區。

    • 調用G1回收器,如果heap size比較大,要調大-XX:G1HeapRegionSize。

    • 關於伊甸區大小的設置,可以根據每個executor同時處理的任務數估計。比如一個executor同時處理4個task,而每份HDFS壓縮數據為128M(解壓大概為2到3倍),可以把伊甸區設置為4 x 3 x 128MB

  • OffHeap(2.0以后)

    嘗試使用Tungsten內存管理,設置spark.memory.ofHeap.enabled開啟,spark.memory.ofHeap.size控制大小。

4.Caching

cache不同操作邏輯都需要用到的RDD或DF。會占用storage空間。

目前Spark自帶的cache適合小量數據,如果數據量大,建議用Alluxio,配合in-memory and ssd caching,或者直接存到HDFS。

cache()是persist(MEMORY_ONLY),首選。內存不夠部分(整個partition)在下次計算時會重新計算。
如下所示,persist可以使用不同的存儲級別進行持久化。能不存disk盡量不存,有時比重新算還慢
MEMORY_AND_DISK
MEMORY_ONLY_SER(序列化存,節省內存(小2到5倍),但要反序列化效率稍低,第二種選擇),MEMORY_AND_DISK_SER
DISK_ONLY
MEMORY_ONLY_2, MEMORY_AND_DISK_2 備2份

對已經不需要的RDD或DF調用unpersist

當persist數據而storage不足時,默認會執行LRU算法,可通過persistencePriority控制,來淘汰之前緩存的數據。不同persist選項有不同操作,比如memory_only時,重用溢出的persisted RDD要重新計算,而memory and disk會將溢出部分寫到磁盤。

4.filter、map、join、partition、UDFs等

  • filter:盡早filter,過濾不必要的數據,有時還能避免讀取部分數據(Catalyst的PushdownPredicate)。

  • mapPartitions替代map(foreachPartitions同理):

    該算子比map更高效,但注意算子內運用iterator-to-iterator轉換,而非一次性將iterator轉換為一個集合(容易OOM)。詳情看high performance spark的“Iterator-to-Iterator Transformations with mapPartitions”

  • broadcast join:實際上是設置spark.sql.autoBroadcastJoinThreshold,是否主動用broadcast join交給Spark DF判斷,因為我們只能知道數據源的大小而不一定知道經過處理后join前數據的大小

  • partition:減少partition用coalesce不會產生shuffle(把同節點的partition合並),例如filter后數據減少了不少時可以考慮減少分塊

    repartition能盡量均勻分布data,在join或cache前用比較合適。

    自定義partitioner(很少用)

  • UDFs:在確定沒有合適的內置sql算子才考慮UDFs

  • groupByKey/ reduceByKey

    對於RDD,少用前者,因為它不會在map-side進行聚合。但注意不要與DF的groupBy + agg和DS的groupByKey + reduceGroups混淆,它們的行為會經過Catalyst優化,會有map-side聚合。

  • flatMap:用來替代map后filter

5.I/O

將數據寫到數據庫中時,開線程池,並使用foreachPartition,分batch提交等。

開啟speculation(有些storage system會產生重復寫入),HDFS系統和Spark在同一個節點

6.廣播變量

比如一個函數要調用一個大的閉包變量時,比如用於查詢的map、機器學習模型等

配置優化

1.並行度

num-executors * executor-cores的2~3倍spark.default.parallelismspark.sql.shuffle.partitions

2.數據序列化Kryo

Spark涉及序列化的場景:閉包變量、廣播變量、自定義類、持久化。

Kryo不是所有可序列化類型都支持,2.0之后,默認情況下,simple types, arrays of simple types, or string type的shuffle都是Kryo序列化。

//通過下面設置,不單單shuffle,涉及序列化的都會用Kryo。開啟之后,scala的map,set,list,tuple,枚舉類等都會啟用。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//對於自定義類,要登記。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

優化緩存大小spark.kryoserializer.buffr.mb(默認2M)

3.數據本地化

調節任務等待時間 ,通過spark.locality.xxx,可以根據不同級別設置等待時間。

4.規划

資源均分:spark.scheduler.modeFAIR

限制:--max-executor-cores或修改默認spark.cores.max減少core的情況:

  1. Reduce heap size below 32 GB to keep GC overhead < 10%.
  2. Reduce the number of cores to keep GC overhead < 10%.

5.數據儲存

存儲格式:Parquet首選

壓縮格式:選擇splittable文件如 zip(在文件范圍內可分), bzip2(壓縮慢,其他都很好), LZO(壓縮速度最快)。上傳數據分開幾個文件,每個最好不超幾百MB,用maxRecordsPerFile控制。文件不算太大,用gzip(各方面都好,但不能分割)。在conf中通過spark.sql.partuet.compression.codec設置。

6.shuffle

spark.reducer.maxSizeInFlight(48m): reduce端拉取數據的buffer大小,如果內存夠大,且數據量大時,可嘗試96m,反之調低。

spark.reducer.maxReqsInFlight(Int.MaxValue):當reduce端的節點比較多時,過多的請求對發送端不利,為了穩定性,有時可能需要減少。

spark.reducer.maxBlocksInFlightPerAddress(Int.MaxValue):和上面對應,控制每次向多少個節點拉取數據。

spark.maxRemoteBlockSizeFetchToMem(Long.MaxValue):當一個block的數據超過這個值就把這個block拉取到到磁盤。

spark.shuffle.compress(true)

spark.shuffle.file.buffer:所產生准備shuffle的文件的大小,調大可減少溢出磁盤的次數,默認32k,可嘗試64k。

spark.shuffle.sort.bypassMergeThreshold(200):小於這個值時用BypassMergeSortShuffleWriter。

Netty only:

maxRetries * retryWait兩個參數:有可能CG時間長導致拉取不到數據

spark.shuffle.io.preferDirectBufs(true):如果對外內存緊缺,可以考慮關掉。

spark.shuffle.io.numConnectionsPerPeer(1):對於具有許多硬盤和少數主機的群集,這可能導致並發性不足以使所有磁盤飽和,因此用戶可能會考慮增加此值。

7.executor內存壓力和Garbage Collection

收集統計數據:在Spark-submit添加--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps"。這之后能在worker節點的logs里看到信息。也可以在UI查看。

如果full GC被觸發多次,則表明老年代空間不夠用,可以增加executor的內存或減少spark.memory.fraction的值(會影響性能)

如果很多minor GC,分配更多內存給Eden區(-Xmn=4/3*E)。關於伊甸區大小的設置,可以根據每個executor同時處理的任務數估計。比如一個executor同時處理4個task,而每份HDFS壓縮數據為128M(解壓大概為2到3倍),可以把伊甸區設置為4 x 3 x 128MB。

啟用G1收集器也有可能提高效率。

spark.executor.memory是不包含overhead的,所以實際上占用的內存比申請的多。在executor內存中默認40%存一些數據和Spark的元數據,剩下的60%中,默認50% execution(計算) and 50% storage(用於caching) memory,但這是動態變動的,storage可借用execution的空間,當execution需要空間時又會趕走超過50%的那部分storage空間。這里數據量都是按block計算的。

storage過小會cache丟失,代價取決於persist等級,可能直接刪除,需要時重計算淘汰數據或溢出淘汰數據到磁盤,execution過小會有很多I/O。

在1.6之前,內存空間划分為

  • Executionspark.shuffle.memoryFraction(default0.2):buffering intermediate data when performing shuffles, joins, sorts and aggregations
  • Storagespark.storage.memoryFraction(default0.6): caching, broadcasts and large task results
  • Other default 0.2: data structures allocated by user code and internal Spark metadata.

在1.6及之后,內存空間划分為

  • Execution and Storage spark.memory.fraction (1.6時0.75,2.0后0.6)
    • spark.memory.storageFraction(default0.5): 當上面的內存中,storage超過50%,cached data may be evicted
  • Other

8.集群配置

Resource and Job Scheduling

每個spark application運行一系列獨立的executor進程。在application中,多個job可能同時運行,只要他們被分配到不同的線程。

executor調度

Spark’s standalone, YARN modes 和 coarse-grained Mesos mode都是靜態划分,即app被分配最大量的executor,並擁有這些executor,直到app完成。Mesos可以實現動態划分。

Spark可以設置動態調整executor,根據workload決定是否回收executor,這更適合多服務。這機制對上面提到的三種集群部署模式都適用。在動態調整下,如果有task等待executor且有空余資源,就會啟動executor。這里有兩個參數,一個規定task的等待時間,一個規定每次提供executor的間隔時間(開始提供1、然后2、4等)。移除則是根據idletimeout參數設定時間。這種動態調整會帶來一個問題,即reducer端未得到所需數據,mapper端進程就因空閑而退出,導致數據無法獲取。為解決這個問題,在動態調整階段,executor的退出並非馬上退出,而是會持續一段時間。這個做法通過外部shuffle服務完成(所以啟動動態調整時也要啟動外部shuffle服務),reducer端會從service處拉取數據而不是executor。

cache數據也有類似的參數設置spark.dynamicAllocation.cachedExecutorIdleTimeout。

參數調整經驗

  • spark.dynamicAllocation.initialExecutors決定了Executor初始數目。這個值的選取可以根據歷史任務的Executor數目的統計,按照二八原則來設置,例如80%的歷史業務的Executor數目都不大於參數值。同時,也要考慮集群的資源緊張度,當資源比較緊張時,這個值需要設置得小一點。
  • spark.dynamicAllocation.maxExecutors決定了業務最大可以擁有的Executor數目。默認無窮大,要注意過大會使大業務獨占大部分資源,造成小任務沒有資源的情況;過小會導致大任務執行時間超出業務要求。
  • spark.dynamicAllocation.executorIdleTimeout決定了Executor空閑多長時間后會被動態刪除。當這個值比較小時,集群資源會比較充分地共享,但會影響業務的執行時間(在Executor被刪除后,可能需要重新申請新的Executor來執行task);當這個值比較大時,不利於資源的共享,若一些比較大的任務占用資源,遲遲不釋放,就會造成其他任務得不到資源。這個值的選取需要在用戶業務的執行時間和等待時間上做一個權衡。需要注意的是,當spark.dynamicAllocation.maxExecutors為有限值時,spark.dynamicAllocation.executorIdleTimeout過小會導致某些任務不能申請新的資源。例如maxExecutors=10,而某個業務所需的資源大於或等於10個Executor,業務在申請到10個Executor之后,申請到Executor由於空閑(有可能因為task還沒來得及分配到其上)而被刪除,目前社區Spark SQL的邏輯是不會再申請新的Executor的,這樣就會導致任務執行速度變慢。

job調度

FIFO:如果前面的job不需要所有資源,那么第二個job也可啟動。適合長作業、不適合短作業;適合CPU繁忙型。

FAIR:輪詢分配,大概平均的資源,適合多服務。該模式還可以設置pool,對job進行分組,並對各組設置優先級,從而實現job的優先級。可設置的參數有schedulingMode(FIFO和FAIR)、weight(池的優先級,如某個為2,其他為1,則2的那個會獲得比其他多一倍的資源)、minShare(至少資源數)

小文件合並

當Reducer數目比較多時,可能會導致小文件過多。最好在輸出前將文件進行合並。另外,后台應定期對數據進行壓縮和合並。

冷熱數據分離

  • 冷:性能低、數據塊大、備份數少、壓縮率高
  • 熱:反之(壓縮速度快)

參考

書籍:

Spark: The Definitive Guide

High Performance Spark

Spark SQL 內核剖析

文章:

官網Tunning Guide

官網Spark Configuration

Microsoft Azure's Optimize Spark jobs


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM