Spark 調優(轉)


Spark 調優

返回原文英文原文:Tuning Spark

Because of the in-memory nature of most Spark computations, Spark programs can be bottlenecked by any resource in the cluster: CPU, network bandwidth, or memory. Most often, if the data fits in memory, the bottleneck is network bandwidth, but sometimes, you also need to do some tuning, such as storing RDDs in serialized form, to decrease memory usage. This guide will cover two main topics: data serialization, which is crucial for good network performance and can also reduce memory use, and memory tuning. We also sketch several smaller topics.

譯者信息因為大部分Spark程序都具有“內存計算”的天性,所以集群中的所有資源:CPU、網絡帶寬或者是內存都有可能成為Spark程序的瓶頸。通常情況下,如果數據完全加載到內存那么網絡帶寬就會成為瓶頸,但是你仍然需要對程序進行優化,例如采用序列化的方式保存RDD數據(Resilient Distributed Datasets),以便減少內存使用。該文章主要包含兩個議題:數據序列化和內存優化,數據序列化不但能提高網絡性能還能減少內存使用。與此同時,我們還討論了其他幾個的小議題。

 

Data Serialization

Serialization plays an important role in the performance of any distributed application. Formats that are slow to serialize objects into, or consume a large number of bytes, will greatly slow down the computation. Often, this will be the first thing you should tune to optimize a Spark application. Spark aims to strike a balance between convenience (allowing you to work with any Java type in your operations) and performance. It provides two serialization libraries:

  • Java serialization: By default, Spark serializes objects using Java’sObjectOutputStreamframework, and can work with any class you create that implements java.io.Serializable. You can also control the performance of your serialization more closely by extending java.io.Externalizable. Java serialization is flexible but often quite slow, and leads to large serialized formats for many classes.
  • Kryo serialization: Spark can also use the Kryo library (version 2) to serialize objects more quickly. Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support allSerializabletypes and requires you to register the classes you’ll use in the program in advance for best performance.

 

譯者信息

數據序列化

序列化對於提高分布式程序的性能起到非常重要的作用。一個不好的序列化方式(如序列化模式的速度非常慢或者序列化結果非常大)會極大降低計算速度。很多情況下,這是你優化Spark應用的第一選擇。Spark試圖在方便和性能之間獲取一個平衡。Spark提供了兩個序列化類庫:

  • Java 序列化:在默認情況下,Spark采用Java的ObjectOutputStream序列化一個對象。該方式適用於所有實現了java.io.Serializable的類。通過繼承 java.io.Externalizable,你能進一步控制序列化的性能。Java序列化非常靈活,但是速度較慢,在某些情況下序列化的結果也比較大。
  • Kryo序列化:Spark也能使用Kryo(版本2)序列化對象。Kryo不但速度極快,而且產生的結果更為緊湊(通常能提高10倍)。Kryo的缺點是不支持所有類型,為了更好的性能,你需要提前注冊程序中所使用的類(class)。
You can switch to using Kryo by callingSystem.setProperty("spark.serializer", "spark.KryoSerializer") beforecreating your SparkContext. The only reason it is not the default is because of the custom registration requirement, but we recommend trying it in any network-intensive application.

 

Finally, to register your classes with Kryo, create a public class that extends spark.KryoRegistrator and set thespark.kryo.registratorsystem property to point to it, as follows:

import com.esotericsoftware.kryo.Kryo

class MyRegistrator extends spark.KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[MyClass1])
    kryo.register(classOf[MyClass2])
  }
}

// Make sure to set these properties *before* creating a SparkContext!
System.setProperty("spark.serializer", "spark.KryoSerializer")
System.setProperty("spark.kryo.registrator", "mypackage.MyRegistrator")
val sc = new SparkContext(...)
The Kryo documentation describes more advanced registration options, such as adding custom serialization code.

 

If your objects are large, you may also need to increase thespark.kryoserializer.buffer.mbsystem property. The default is 32, but this value needs to be large enough to hold the largest object you will serialize.

Finally, if you don’t register your classes, Kryo will still work, but it will have to store the full class name with each object, which is wasteful.

譯者信息

你可以在創建SparkContext之前,通過調用System.setProperty("spark.serializer", "spark.KryoSerializer"),將序列化方式切換成Kryo。Kryo不能成為默認方式的唯一原因是需要用戶進行注冊;但是,對於任何“網絡密集型”(network-intensive)的應用,我們都建議采用該方式。

最后,為了將類注冊到Kryo,你需要繼承 spark.KryoRegistrator並且設置系統屬性spark.kryo.registrator指向該類,如下所示:

 

import com.esotericsoftware.kryo.Kryo

class MyRegistrator extends spark.KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[MyClass1])
    kryo.register(classOf[MyClass2])
  }
}

// Make sure to set these properties *before* creating a SparkContext!
System.setProperty("spark.serializer", "spark.KryoSerializer")
System.setProperty("spark.kryo.registrator", "mypackage.MyRegistrator")
val sc = new SparkContext(...)

Kryo 文檔描述了很多便於注冊的高級選項,例如添加用戶自定義的序列化代碼。

如果對象非常大,你還需要增加屬性spark.kryoserializer.buffer.mb的值。該屬性的默認值是32,但是該屬性需要足夠大以便能夠容納需要序列化的最大對象。

最后,如果你不注冊你的類,Kryo仍然可以工作,但是需要為了每一個對象保存其對應的全類名(full class name),這是非常浪費的。

 

Memory Tuning

There are three considerations in tuning memory usage: the amount of memory used by your objects (you may want your entire dataset to fit in memory), the cost of accessing those objects, and the overhead ofgarbage collection (if you have high turnover in terms of objects).

By default, Java objects are fast to access, but can easily consume a factor of 2-5x more space than the “raw” data inside their fields. This is due to several reasons:

  • Each distinct Java object has an “object header”, which is about 16 bytes and contains information such as a pointer to its class. For an object with very little data in it (say oneIntfield), this can be bigger than the data.
  • Java Strings have about 40 bytes of overhead over the raw string data (since they store it in an array ofChars and keep extra data such as the length), and store each character as two bytes due to Unicode. Thus a 10-character string can easily consume 60 bytes.
  • Common collection classes, such asHashMapandLinkedList, use linked data structures, where there is a “wrapper” object for each entry (e.g.Map.Entry). This object not only has a header, but also pointers (typically 8 bytes each) to the next object in the list.
  • Collections of primitive types often store them as “boxed” objects such asjava.lang.Integer.

This section will discuss how to determine the memory usage of your objects, and how to improve it – either by changing your data structures, or by storing data in a serialized format. We will then cover tuning Spark’s cache size and the Java garbage collector.

譯者信息

內存優化

內存優化有三個方面的考慮:對象所占用的內存(你或許希望將所有的數據都加載到內存),訪問對象的消耗以及垃圾回收(garbage collection)所占用的開銷。

通常,Java對象的訪問速度更快,但其占用的空間通常比其內部的屬性數據大2-5倍。這主要由以下幾方面原因:

  • 每一個Java對象都包含一個“對象頭”(object header),對象頭大約有16字節,包含了指向對象所對應的類(class)的指針等信息以。如果對象本身包含的數據非常少,那么對象頭有可能會比對象數據還要大。
  • Java String在實際的字符串數據之外,還需要大約40字節的額外開銷(因為String將字符串保存在一個Char數組,需要額外保存類似長度等的其他數據);同時,因為是Unicode編碼,每一個字符需要占用兩個字節。所以,一個長度為10的字符串需要占用60個字節。
  • 通用的集合類,例如HashMap、LinkedList等,都采用了鏈表數據結構,對於每一個條目(entry)都進行了包裝(wrapper)。每一個條目不僅包含對象頭,還包含了一個指向下一條目的指針(通常為8字節)。
  • 基本類型(primitive type)的集合通常都保存為對應的類,例如java.lang.Integer

該章節討論如何估算對象所占用的內存以及如何進行改進——通過改變數據結構或者采用序列化方式。然后,我們將討論如何優化Spark的緩存以及Java內存回收(garbage collection)。

 

Determining Memory Consumption

The best way to size the amount of memory consumption your dataset will require is to create an RDD, put it into cache, and look at the SparkContext logs on your driver program. The logs will tell you how much memory each partition is consuming, which you can aggregate to get the total size of the RDD. You will see messages like this:

INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)

 

This means that partition 1 of RDD 0 consumed 717.5 KB.

譯者信息

確定內存消耗

確定對象所需要內存大小的最好方法是創建一個RDD,然后將其放入緩存,最后閱讀驅動程序(driver program)中SparkContext的日志。日志會告訴你每一部分占用的內存大小;你可以收集該類信息以確定RDD消耗內存的最終大小。日志信息如下所示:

INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)

該信息表明RDD0的第一部分消耗717.5KB的內存。

 

Tuning Data Structures

The first way to reduce memory consumption is to avoid the Java features that add overhead, such as pointer-based data structures and wrapper objects. There are several ways to do this:

  1. Design your data structures to prefer arrays of objects, and primitive types, instead of the standard Java or Scala collection classes (e.g.HashMap). The fastutil library provides convenient collection classes for primitive types that are compatible with the Java standard library.
  2. Avoid nested structures with a lot of small objects and pointers when possible.
  3. Consider using numeric IDs or enumeration objects instead of strings for keys.
  4. If you have less than 32 GB of RAM, set the JVM flag-XX:+UseCompressedOopsto make pointers be four bytes instead of eight. Also, on Java 7 or later, try-XX:+UseCompressedStringsto store ASCII strings as just 8 bits per character. You can add these options in spark-env.sh.

 

譯者信息

優化數據結構

減少內存使用的第一條途徑是避免使用一些增加額外開銷的Java特性,例如基於指針的數據結構以對對象進行再包裝等。有很多方法:

  1. 使用對象數組以及原始類型(primitive type)數組以替代Java或者Scala集合類(collection class)。 fastutil 庫為原始數據類型提供了非常方便的集合類,且兼容Java標准類庫。
  2. 盡可能的避免采用還有指針的嵌套數據結構來保存小對象。
  3. 考慮采用數字ID或者枚舉類型一邊替代String類型的主鍵。
  4. 如果內存少於32G,設置JVM參數-XX:+UseCompressedOops以便將8字節指針修改成4字節。於此同時,在Java 7或者更高版本,設置JVM參數-XX:+UseCompressedStrings以便采用8比特來編碼每一個ASCII字符。你可以將這些選項添加到spark-env.sh

 

Serialized RDD Storage

When your objects are still too large to efficiently store despite this tuning, a much simpler way to reduce memory usage is to store them in serialized form, using the serialized StorageLevels in the RDD persistence API, such asMEMORY_ONLY_SER. Spark will then store each RDD partition as one large byte array. The only downside of storing data in serialized form is slower access times, due to having to deserialize each object on the fly. We highly recommend using Kryo if you want to cache data in serialized form, as it leads to much smaller sizes than Java serialization (and certainly than raw Java objects).

譯者信息

序列化RDD存儲

經過上述優化,如果對象還是太大以至於不能有效存放,還有一個減少內存使用的簡單方法——序列化,采用RDD持久化API的序列化StorageLevel,例如MEMORY_ONLY_SER。Spark將RDD每一部分都保存為byte數組。序列化帶來的唯一缺點是會降低訪問速度,因為需要將對象反序列化。如果需要采用序列化的方式緩存數據,我們強烈建議采用Kryo,Kryo序列化結果比Java標准序列化更小(其實比對象內部的原始數據都要小)。

 

Garbage Collection Tuning

JVM garbage collection can be a problem when you have large “churn” in terms of the RDDs stored by your program. (It is usually not a problem in programs that just read an RDD once and then run many operations on it.) When Java needs to evict old objects to make room for new ones, it will need to trace through all your Java objects and find the unused ones. The main point to remember here is that the cost of garbage collection is proportional to the number of Java objects, so using data structures with fewer objects (e.g. an array ofInts instead of aLinkedList) greatly lowers this cost. An even better method is to persist objects in serialized form, as described above: now there will be only one object (a byte array) per RDD partition. Before trying other techniques, the first thing to try if GC is a problem is to use serialized caching.

譯者信息

優化內存回收

如果你需要不斷的“翻動”程序保存的RDD數據,JVM內存回收就可能成為問題(通常,如果只需進行一次RDD讀取然后進行操作是不會帶來問題的)。當需要回收舊對象以便為新對象騰內存空間時,JVM需要跟蹤所有的Java對象以確定哪些對象是不再需要的。需要記住的一點是,內存回收的代價與對象的數量正相關;因此,使用對象數量更小的數據結構(例如使用int數組而不是LinkedList)能顯著降低這種消耗。另外一種更好的方法是采用對象序列化,如上面所描述的一樣;這樣,RDD的每一部分都會保存為唯一一個對象(一個byte數組)。如果內存回收存在問題,在嘗試其他方法之前,首先嘗試使用序列化緩存(serialized caching)。

 

GC can also be a problem due to interference between your tasks’ working memory (the amount of space needed to run the task) and the RDDs cached on your nodes. We will discuss how to control the space allocated to the RDD cache to mitigate this.

Measuring the Impact of GC

The first step in GC tuning is to collect statistics on how frequently garbage collection occurs and the amount of time spent GC. This can be done by adding-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStampsto yourSPARK_JAVA_OPTSenvironment variable. Next time your Spark job is run, you will see messages printed in the worker’s logs each time a garbage collection occurs. Note these logs will be on your cluster’s worker nodes (in thestdoutfiles in their work directories), not on your driver program.

譯者信息

每項任務(task)的工作內存以及緩存在節點的RDD之間會相互影響,這種影響也會帶來內存回收問題。下面我們討論如何為RDD分配空間以便減輕這種影響。

估算內存回收的影響

優化內存回收的第一步是獲取一些統計信息,包括內存回收的頻率、內存回收耗費的時間等。為了獲取這些統計信息,我們可以把參數-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps添加到環境變量SPARK_JAVA_OPTS。設置完成后,Spark作業運行時,我們可以在日志中看到每一次內存回收的信息。注意,這些日志保存在集群的工作節點(work nodes)而不是你的驅動程序(driver program). 

 

Cache Size Tuning

One important configuration parameter for GC is the amount of memory that should be used for caching RDDs. By default, Spark uses 66% of the configured executor memory (spark.executor.memoryorSPARK_MEM) to cache RDDs. This means that 33% of memory is available for any objects created during task execution.

In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of memory, lowering this value will help reduce the memory consumption. To change this to say 50%, you can callSystem.setProperty("spark.storage.memoryFraction", "0.5"). Combined with the use of serialized caching, using a smaller cache should be sufficient to mitigate most of the garbage collection problems. In case you are interested in further tuning the Java GC, continue reading below.

譯者信息

優化緩存大小

用多大的內存來緩存RDD是內存回收一個非常重要的配置參數。默認情況下,Spark采用運行內存(executor memory,spark.executor.memory或者SPARK_MEM)的66%來進行RDD緩存。這表明在任務執行期間,有33%的內存可以用來進行對象創建。

如果任務運行速度變慢且JVM頻繁進行內存回收,或者內存空間不足,那么降低緩存大小設置可以減少內存消耗。為了將緩存大小修改為50%,你可以調用方法System.setProperty("spark.storage.memoryFraction", "0.5")。結合序列化緩存,使用較小緩存足夠解決內存回收的大部分問題。如果你有興趣進一步優化Java內存回收,請繼續閱讀下面文章。

Advanced GC Tuning

To further tune garbage collection, we first need to understand some basic information about memory management in the JVM:

  • Java Heap space is divided in to two regions Young and Old. The Young generation is meant to hold short-lived objects while the Old generation is intended for objects with longer lifetimes.

  • The Young generation is further divided into three regions [Eden, Survivor1, Survivor2].

  • A simplified description of the garbage collection procedure: When Eden is full, a minor GC is run on Eden and objects that are alive from Eden and Survivor1 are copied to Survivor2. The Survivor regions are swapped. If an object is old enough or Survivor2 is full, it is moved to Old. Finally when Old is close to full, a full GC is invoked.

 

譯者信息

內存回收高級優化

為了進一步優化內存回收,我們需要了解JVM內存管理的一些基本知識。

  • Java堆(heap)空間分為兩部分:新生代和老生代。新生代用於保存生命周期較短的對象;老生代用於保存生命周期較長的對象。
  • 新生代進一步划分為三部分[Eden, Survivor1, Survivor2]
  • 內存回收過程的簡要描述:如果Eden區域已滿則在Eden執行minor GC並將Eden和Survivor1中仍然活躍的對象拷貝到Survivor2。然后將Survivor1和Survivor2對換。如果對象活躍的時間已經足夠長或者Survivor2區域已滿,那么會將對象拷貝到Old區域。最終,如果Old區域消耗殆盡,則執行full GC。
The goal of GC tuning in Spark is to ensure that only long-lived RDDs are stored in the Old generation and that the Young generation is sufficiently sized to store short-lived objects. This will help avoid full GCs to collect temporary objects created during task execution. Some steps which may be useful are:

 

  • Check if there are too many garbage collections by collecting GC stats. If a full GC is invoked multiple times for before a task completes, it means that there isn’t enough memory available for executing tasks.

  • In the GC stats that are printed, if the OldGen is close to being full, reduce the amount of memory used for caching. This can be done using thespark.storage.memoryFractionproperty. It is better to cache fewer objects than to slow down task execution!

  • If there are too many minor collections but not many major GCs, allocating more memory for Eden would help. You can set the size of the Eden to be an over-estimate of how much memory each task will need. If the size of Eden is determined to beE, then you can set the size of the Young generation using the option-Xmn=4/3*E. (The scaling up by 4/3 is to account for space used by survivor regions as well.)

  • As an example, if your task is reading data from HDFS, the amount of memory used by the task can be estimated using the size of the data block read from HDFS. Note that the size of a decompressed block is often 2 or 3 times the size of the block. So if we wish to have 3 or 4 tasks worth of working space, and the HDFS block size is 64 MB, we can estimate size of Eden to be4*3*64MB.

  • Monitor how the frequency and time taken by garbage collection changes with the new settings.

Our experience suggests that the effect of GC tuning depends on your application and the amount of memory available. There are many more tuning options described online, but at a high level, managing how frequently full GC takes place can help in reducing the overhead.

譯者信息

Spark內存回收優化的目標是確保只有長時間存活的RDD才保存到老生代區域;同時,新生代區域足夠大以保存生命周期比較短的對象。這樣,在任務執行期間可以避免執行full GC。下面是一些可能有用的執行步驟:

  • 通過收集GC信息檢查內存回收是不是過於頻繁。如果在任務結束之前執行了很多次full GC,則表明任務執行的內存空間不足。
  • 在打印的內存回收信息中,如果老生代接近消耗殆盡,那么減少用於緩存的內存空間。可這可以通過屬性spark.storage.memoryFraction來完成。減少緩存對象以提高執行速度是非常值得的。
  • 如果有過多的minor GC而不是full GC,那么為Eden分配更大的內存是有益的。你可以為Eden分配大於任務執行所需要的內存空間。如果Eden的大小確定為E,那么可以通過-Xmn=4/3*E來設置新生代的大小(將內存擴大到4/3是考慮到survivor所需要的空間)。
  • 舉一個例子,如果任務從HDFS讀取數據,那么任務需要的內存空間可以從讀取的block數量估算出來。注意,解壓后的blcok通常為解壓前的2-3倍。所以,如果我們需要同時執行3或4個任務,block的大小為64M,我們可以估算出Eden的大小為4*3*64MB。
  • 監控內存回收的頻率以及消耗的時間並修改相應的參數設置。

我們的經歷表明有效的內存回收優化取決於你的程序和內存大小。 在網上還有很多其他的優化選項, 總體而言有效控制內存回收的頻率非常有助於降低額外開銷。

 

Other Considerations

Level of Parallelism

Clusters will not be fully utilized unless you set the level of parallelism for each operation high enough. Spark automatically sets the number of “map” tasks to run on each file according to its size (though you can control it through optional parameters toSparkContext.textFile, etc), and for distributed “reduce” operations, such asgroupByKeyandreduceByKey, it uses the largest parent RDD’s number of partitions. You can pass the level of parallelism as a second argument (see the spark.PairRDDFunctionsdocumentation), or set the system propertyspark.default.parallelismto change the default. In general, we recommend 2-3 tasks per CPU core in your cluster.

譯者信息

其他考慮

並行度

集群不能有效的被利用,除非為每一個操作都設置足夠高的並行度。Spark會根據每一個文件的大小自動設置運行在該文件“Map"任務的個數(你也可以通過SparkContext的配置參數來控制);對於分布式"reduce"任務(例如group by key或者reduce by key),則利用最大RDD的分區數。你可以通過第二個參數傳入並行度(閱讀文檔spark.PairRDDFunctions )或者通過設置系統參數spark.default.parallelism來改變默認值。通常來講,在集群中,我們建議為每一個CPU核(core)分配2-3個任務。

 

Memory Usage of Reduce Tasks

Sometimes, you will get an OutOfMemoryError not because your RDDs don’t fit in memory, but because the working set of one of your tasks, such as one of the reduce tasks ingroupByKey, was too large. Spark’s shuffle operations (sortByKey,groupByKey,reduceByKey,join, etc) build a hash table within each task to perform the grouping, which can often be large. The simplest fix here is to increase the level of parallelism, so that each task’s input set is smaller. Spark can efficiently support tasks as short as 200 ms, because it reuses one worker JVMs across all tasks and it has a low task launching cost, so you can safely increase the level of parallelism to more than the number of cores in your clusters.

譯者信息

Reduce Task的內存使用

有時,你會碰到OutOfMemory錯誤,這不是因為你的RDD不能加載到內存,而是因為任務執行的數據集過大,例如正在執行groupByKey操作的reduce任務。Spark的”混洗“(shuffle)操作(sortByKey、groupByKey、reduceByKey、join等)為了完成分組會為每一個任務創建哈希表,哈希表有可能非常大。最簡單的修復方法是增加並行度,這樣,每一個任務的輸入會變的更小。Spark能夠非常有效的支持段時間任務(例如200ms),因為他會對所有的任務復用JVM,這樣能減小任務啟動的消耗。所以,你可以放心的使任務的並行度遠大於集群的CPU核數。

 

Broadcasting Large Variables

Using the broadcast functionality available inSparkContextcan greatly reduce the size of each serialized task, and the cost of launching a job over a cluster. If your tasks use any large object from the driver program inside of them (e.g. a static lookup table), consider turning it into a broadcast variable. Spark prints the serialized size of each task on the master, so you can look at that to decide whether your tasks are too large; in general tasks larger than about 20 KB are probably worth optimizing.

Summary

This has been a short guide to point out the main concerns you should know about when tuning a Spark application – most importantly, data serialization and memory tuning. For most programs, switching to Kryo serialization and persisting data in serialized form will solve most common performance issues. Feel free to ask on the Spark mailing list about other tuning best practices.

譯者信息

廣播”大變量“

使用SparkContext的 廣播功能可以有效減小每一個任務的大小以及在集群中啟動作業的消耗。如果任務會使用驅動程序(driver program)中比較大的對象(例如靜態查找表),考慮將其變成可廣播變量。Spark會在master打印每一個任務序列化后的大小,所以你可以通過它來檢查任務是不是過於龐大。通常來講,大於20KB的任務可能都是值得優化的。

總結

該文指出了Spark程序優化所需要關注的幾個關鍵點——最主要的是數據序列化和內存優化。對於大多數程序而言,采用Kryo框架以及序列化能夠解決性能有關的大部分問題。非常歡迎在Spark mailing list提問優化相關的問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM