【翻譯】Spark 調優 (Tuning Spark) 中文版


由於Spark自己的調優guidance已經覆蓋了很多很有價值的點,因此這里直接翻譯一份過來。也作為一個積累。

 

Spark 調優 (Tuning Spark)

由於大多數Spark計算任務是在內存中運行計算,任何集群中的資源限制都可能成為Spark程序的瓶頸,比如:CPU、網絡、帶寬、內存。通常情況下,如果內存能容納所處理數據,主要的瓶頸則僅是網絡帶寬。但有些時候您也需要做一些調優,比如利用RDD序列化存儲來降低內存消耗。本手冊將會涵蓋以下兩個大點:數據序列化(對優化網絡傳輸和降低內存開銷有顯著效果)、內存優化。我們同時會介紹另外的幾個小點。

數據序列化 (Data Serialization)

在分布式應用中序列化起着非常重要的作用。序列化的性能較慢或序列化的結果較大都會拖慢了整體計算的性能。通常來說,序列化應是你在調優Spark程序時首要考慮的因素。Spark做了許多努力來平衡序列化的易用性(即方便地將任意Java類型對象序列化)和性能。Spark提供了兩種序列化庫:

  • Java serialization:默認選擇。Spark利用Java的ObjectOutputStream方式進行序列化,適用於任意實現了java.io.Serializable接口的類。同時你也可以利用java.io.Externalizable接口去實現自定義的序列化。Java serialization方式比較靈活可移植,但性能較低,而且序列化后的結果較大。
  • Kryo serialization:Spark 同時也支持Kryo庫(version 2)去實現更快速的序列化。Kryo一般要比Java自帶序列化快10倍,序列化結果也更小,但需要用戶手動將需要序列化的類手動注冊到Kryo的Register上去使其生效。

配置 sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")可是Kryo生效,該序列化指定會在shuffle階段和rdd持久序列化(persist)階段生效。我們推薦您使用Kryo,沒有將其作為默認序列化方案的唯一原因是需要用戶手動注冊。在Spark 2.0.0以后的版本中,Spark內部的shuffle 已使用 Kryo來序列化原始類型、原始類型的數組類型和string類型。

Kryo已在Spark core中,無需另外引入依賴。SparkConf中使用registerKryoClasses方法即可。

val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf)

您可通過Kryo官方文檔 來獲取更多的高級配置及自定義內容,包括添加自定義的序列化代碼。如果您的對象確實很大,您可能需要考慮調整spark.kryoserializer.buffer 參數來保證緩沖區有足夠的內存去容納大對象。

最后,即使用戶不手動注冊類到Kryo,Kryo依然能夠起效,只是這種情況下Kryo會對每個類存儲完整的類名,造成不必要的浪費。

內存調優 (Memory Tuning)

內存調優中通常要考慮三個方面:Java對象占用的內存大小、訪問這些對象的開銷以及GC開銷。

通常來說,訪問Java對象時很快的,但訪問對象內部成員字段的時候卻可能會造成2-5倍的損耗,可能原因包括如下:

  • Java object都會有個對象頭,16字節大小,保存着指向該類的指針。在一些字段很小(比如只有一個int)的情況下,對象頭大小甚至比對象數據本身還要大。
  • Java String 類型會有40字節左右的額外開銷,用於保存長度等信息。同時string內默認用utf-16編碼存儲,導致存儲的開銷也比純粹的char[]要大。
  • 集合類型的類(List、Set、Map等)許多用了鏈式結構或樹形結構,相當於每個Entry的一個Wrapper對象。這會導致多余的對象頭和鏈表指針存儲的開銷。
  • 泛型中的原始類型會被強制裝箱成對應的對象類型。

本小節會先從一個Spark內存管理的overview引入,然后再討論用戶的優化策略。我們還會着重描述如何計算你的對象所占用內存的大小以及如何去通過改變數據結構或序列化去優化它。接着我們還會討論一下Spark緩存大小的調優和GC策略的調優。

Spark 內存管理 Overview

Spark的內存主要分為兩類:運行與存儲。運行內存包括shuffle、join、sort、aggregation那個計算所占用的內存,存儲內存包括cache、集群中數據傳輸所占用的內存。Spark程序里運行和存儲實際上是共用一塊內存區域 M,即如果運行不怎么消耗內存,那么存儲模塊課請求所有的可用內存,反之亦然。運行內存在內存不足時會搶占存儲內存,除非存儲內存已經降低到一個特定閾值 R。換句話說,R代表一塊存儲區域的大小,這塊存儲區域是不會被運行占用的。但反過來,由於邏輯過於復雜,存儲不會去搶占運行內存。

如上設計保證了幾個期望的特性。首先,純運行類的應用可以盡可能地申請足夠多的內存,避免了無謂的內存分割浪費。其次,需要cache內存的應用也能保證一個最小大小的存儲不會被清除。最后,這種設計在大多數場景下提供了開箱即用且性能可接受的方案,用戶不必過於關注實際spark程序的內存分布。

以下為兩個相關的配置,但我們不建議普通用戶去主動修改,因為該配置可適用於大多數場景。

  • spark.memory.fraction 表示 M 相對於Java堆空間的比例大小,默認0.6 。剩下 40% 的內存會給用戶的數據結構、Spark內部元數據等,並能作為一個預防因超大對象而導致 OOM的緩沖。該值設定需與JVM的老年代空間(old/tenured)相適應(即不能超過)。
  • spark.memory.storageFraction 表示 R 相對於 M 的比例大小,默認 0.5 。如上文所描述,R 是存儲內存保持最小的不會被搶占的閾值。

內存消耗估計

最好的衡量一個dataset內存消耗大小的方法是創建一個rdd,cache到memory,然后再Spark WebUI的Storage頁面去觀察具體數值。

若要估計一個特定對象的內存消耗,可使用SizeEstimator.estimate()的方法。這是個有效地估計不同數據結構並優化內存的方法,也可用於去衡量預估廣播變量(即java rdd代碼塊中一些外部final傳入的變量)在每個executor的堆內存上的大小。

數據結構類型優化

這里的數據指的是用於傳輸的數據。降低內存消耗的最首要的方法便是避免Java對象機制帶來額外開銷,比如引用類型的對象或wrapper類對象。有幾種優化方法:

  • 優先選擇原始類型、數組,而不是集合類。
  • 避免嵌套或wrapper結構
  • id方面能用數值型的考慮不要用string類型
  • 對於32gb內存以下的機器,添加 -XX:+UseCompressedOops 進行指針壓縮,把8-byte指針壓縮成 4-byte。可在spark-env.sh中配置。

序列化的RDD存儲

當你的對象在經過了如上優化后依然較大時,一個更簡單的方法是序列化的方式去存儲這些對象。這可以通過在cache/persist中設置StorageLevel實現,比如MEMORY_ONLY_SER。Spark會將對象用一個序列化后的large byte array去存儲。當然由於需要即時的反序列化,這會帶來訪問上的性能開銷,因此這方面是需要用戶自行平衡的。Spark極力推薦使用Kryo序列化來提高序列化/反序列化性能以及壓縮存儲的字節數。

垃圾回收調優 (GC調優)

JVM垃圾回收在一些臨時對象創建頻繁的程序里是一個大問題。JVM GC 大致思路就是JVM尋找不再被引用的對象,通過如分代或標記等機制進行清除釋放內存。需要注意的是,Java GC的開銷是和Java對象數量成正比的,所以建議盡可能使用簡單的數據結構或原始類型來降低這部分開銷。另一個更好的方法是,將對象序列化成byte[]保存,則實際上每個RDD分區的主要存儲就是一個大型byte數組。因此,在做其他GC參數優化前,若性能影響不大,上一小節提及的RDD序列化存儲方案是個強烈建議試用的方案。

用戶自己的task執行和RDD的cache緩存的平衡也會產生GC問題(即上文的運行內存與存儲內存),下文將會對此進行討論。

評估GC的影響

GC調優第一步是先收集gc觸發的頻率、gc耗時、造成程序pause的最長耗時等統計數據,在Java參數中添加-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps可獲得日志 (參見 configuration guide做jvm參數調整)。executor的日志在executor的stdout中查看,而並非driver的log。

調優GC

首先需要知道JVM內存管理的一些信息:

  • Java heap被划分為 young 和 old。young負責一些生命周期短的對象,old負責一些生命周期較長以及超過young分配大小的對象。
  • young 被划分為 eden, survivor1, survivor2。
  • 簡單描述GC過程:eden滿了,觸發minor GC,將對象從eden和survivor1拷貝到survivor2。如果對象存活時間足夠長或survivor2滿了,則將其刷入old。如果old接近滿了,則觸發full gc,會同時對young和old進行gc清理。一般認為,full gc會影響整體程序的性能,應盡量的減少。

Spark GC調優的目標是保證只有那些長時間存活的RDD保存在old,同時young能持續有足夠空間存儲短時間存活的對象。這樣可以幫助避免full gc頻繁地去回收task 運行階段創建的臨時對象。以下為一些技巧:

  • 檢查gc統計中是否發生了過多的gc,如果full gc在一個task中發生了過多次,用戶需考慮適當添加executor內存。
  • 如果minor GC較多,major GC/full GC 較少,嘗試分配多點內存給Eden。你可以根據你自己的task的大致使用內存來估計eden區大小 E ,因此 young區大小一般為 4/3 * E 。使用Java參數 -Xmn=4/3*E
  • 如果GC統計中 old 接近滿了,則適當降低spark.memory.fraction,畢竟減少點緩存比GC影響執行性能更能讓人接受。或者,考慮減少young大小;或者,調大JVM的NewRatio參數,大部分JVM該值默認為2,表示old 占了2/3的堆內存,這個比例應當足夠大,且比spark.memory.fraction 的比例大。
  • 嘗試 G1 GC -XX:+UseG1GC。在一些大堆 JVM的場景下,有助於提高gc性能。G1需要額外的堆空間進行對象移動,考慮調大-XX:G1HeapRegionSize
  • 舉個例子,如果你的task在從HDFS讀取數據,task的內存使用可根據讀取的HDFS block大小來估計。需注意通常一個解壓的block是原block的2-3倍大,假設一個executor有個3-4個task在運行,HDFS block size為128M,那么eden區大小考慮為4 * 3 * 128 MB
  • 每當使用新配置后,需繼續監控GC的耗時及頻率。

Spark executors的GC 參數可通過spark.executor.extraJavaOptions配置。關於GC還有很多可以探討的,請大家自行上網去深入學習。

其他優化

並行度

如果並行度設置不夠高,集群資源可能不能被較充分利用。Spark默認會根據textFile文件數或parallelize的去設置map task的個數,reduce task的個數默認則使用最大的父RDD的分區數。用戶可通過設置大多數並行方法的第二個參數指定並行度或修改默認配置spark.default.parallelism。我們認為每個CPU核上並行運行2-3個task是推薦配置。

Reduce Task的內存使用

由於你的RDD無法適配空余內存,程序會報OOM。有時候導致這種情況的可能只是你的其中一個數據集。通常的優化方法有增大並行度使得每個分區的輸入變小或提供自己的partitioner使得數據更加均勻分布。Spark最小可支持的Task運行時長為200ms(依靠executor的JVM重用以及Spark自身的啟動task的低開銷),因此用戶可以放心的增大並行度/分區數甚至可以超過幾倍的core數。

廣播大變量

使用 sparkcontext的broadcast函數可以顯著減少每個序列化task的大小,以及啟動一個job的開銷。如果你的task需要使用driver程序中的一些大變量,則考慮把它變為一個廣播變量。Spark會吧每個task的序列化大小打印到master的log中,用戶可以觀察日志來判斷自己的tasks是否過大。通常來說,一個task序列化大小超過20KB則被認為應當做優化了。

數據本地性

數據本地性會對spark job的性能產生極大影響。如果代碼和數據在一起,則計算會變得很快;但如果兩者分開了,則其中一方需移動到另一方的進程去執行。一般來說,遷移代碼比遷移數據要快。這是Spark構建數據本地性機制的基本原則。

數據本地性可被表示為數據離代碼有多近,從近到遠可被表示為幾個級別:

  • PROCESS_LOCAL 數據在同一個JVM中
  • NODE_LOCAL 數據在同一個結點,比如HDFS的同一個DataNode或同結點另一個executor上。
  • NO_PREF 數據從任何地方訪問速度都一樣,無任何本地性偏好。
  • RACK_LOCAL 數據在同一個機架上,一般定義即是同一個交換機子網下。
  • ANY 數據不在同一個機架上,可能在網絡上。

Spark優先調度tasks都有最佳的本地性,但這不總是有可能做到的。如果有空閑的executor和未處理的數據,Spark會降低數據本地性。此時有兩個選擇:一是等待直到當前節點cpu釋放足夠起一個新task去處理同結點數據;二是立即在別的結點起個新task兵移動數據。

通常Spark依然采取第一種,等待cpu空閑在本地起新task。如果等待超時了,才執行第二種方案移動數據。每個級別間的等待超時參數可以統一配置也可以分別配置,參見 spark.locality。如果你的任務的本地性很差並且執行時間較長,你可能需要調大超時時間。一般來說默認配置適用大多數場景。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM