Spark內存管理


本文基於Spark 1.6.0之后的版本
Spark 1.6.0引入了對堆外內存的管理並對內存管理模型進行了改進,SPARK-11389

從物理上,分為堆內內存和堆外內存;從邏輯上分為execution內存和storage內存。
Execution內存主要是用來滿足task執行過程中某些算子對內存的需求,例如shuffle過程中map端產生的中間結果需要緩存在內存中。
Storage內存主要用來存儲RDD持久化的數據或者廣播變量。

Off-heap內存

通過下面的代碼片段(spark2.1版本),可以清楚的知道execution內存和storage內存是如何分配Off-heap內存的。

  protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
  protected[this] val offHeapStorageMemory =
    (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong

  offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
  offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)

off-heap內存分配

On-heap內存

對於on-heap內存的划分如下圖

on-heap內存分配

  • 總內存
    spark2.1中通過下面的代碼獲取

    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    
  • 系統預留內存

    預留內存在代碼中是一個常量RESERVED_SYSTEM_MEMORY_BYTES指定為300M
    這里要求總內存至少是預留內存的1.5倍val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
    並且會做如下的檢測

    if (systemMemory < minSystemMemory) {
       throw new IllegalArgumentException(s"System memory $systemMemory must " +
         s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
         s"option or spark.driver.memory in Spark configuration.")
     }
     // SPARK-12759 Check executor memory to fail fast if memory is insufficient
     if (conf.contains("spark.executor.memory")) {
       val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
       if (executorMemory < minSystemMemory) {
         throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
           s"$minSystemMemory. Please increase executor memory using the " +
           s"--executor-memory option or spark.executor.memory in Spark configuration.")
       }
     }
    
  • Spark可用內存

    Spark可用總內存=(系統內存-預留內存)*spark.memory.fraction

     val usableMemory = systemMemory - reservedMemory
     val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
     (usableMemory * memoryFraction).toLong
    
  • Storage內存
    Storage內存=Spark可用內存*spark.memory.storageFraction

     onHeapStorageRegionSize =
         (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
    
  • Execution內存

    Execution內存=Spark可用內存-Storage內存

private[spark] class UnifiedMemoryManager private[memory] (
conf: SparkConf,
val maxHeapMemory: Long,
onHeapStorageRegionSize: Long,
numCores: Int)
extends MemoryManager(
conf,
numCores,
onHeapStorageRegionSize,
maxHeapMemory - onHeapStorageRegionSize)
```

  • Storage內存與Execution內存的動態調整

    Storage can borrow as much execution memory as is free until execution reclaims its space. When this happens, cached blocks will be evicted from memory until sufficient borrowed memory is released to satisfy the execution memory request.

Similarly, execution can borrow as much storage memory as is free. However, execution memory is never evicted by storage due to the complexities involved in implementing this. The implication is that attempts to cache blocks may fail if execution has already eaten up most of the storage space, in which case the new blocks will be evicted immediately according to their respective storage levels.

上面這段文字是Spark官方對內存調整的注釋,總結有如下幾點
- 當execution內存有空閑的時候,storage可以借用execution的內存;當execution需要內存的時候, storage會釋放借用的內存。這樣做是安全的,因為storage內存如果不夠可以溢出到本地磁盤。

- 當storage內存有空閑的時候也可以借給execution使用,但是當execution沒有使用完的情況下是無法歸還給storage的。因為execution是用來在計算過程中存儲臨時結果的,如果內存被釋放會導致后續的計算失敗。
  • user可支配內存

    這部分內存完全由用戶來支配,例如存儲用戶自定義的數據結構。


更多更好的文章請關注數客聯盟


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM