本文基於Spark 1.6.0之后的版本
Spark 1.6.0引入了對堆外內存的管理並對內存管理模型進行了改進,SPARK-11389。
從物理上,分為堆內內存和堆外內存;從邏輯上分為execution內存和storage內存。
Execution內存主要是用來滿足task執行過程中某些算子對內存的需求,例如shuffle過程中map端產生的中間結果需要緩存在內存中。
Storage內存主要用來存儲RDD持久化的數據或者廣播變量。
Off-heap內存
通過下面的代碼片段(spark2.1版本),可以清楚的知道execution內存和storage內存是如何分配Off-heap內存的。
protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
protected[this] val offHeapStorageMemory =
(maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)

On-heap內存
對於on-heap內存的划分如下圖

-
總內存
spark2.1中通過下面的代碼獲取val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) -
系統預留內存
預留內存在代碼中是一個常量
RESERVED_SYSTEM_MEMORY_BYTES指定為300M
這里要求總內存至少是預留內存的1.5倍val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
並且會做如下的檢測if (systemMemory < minSystemMemory) { throw new IllegalArgumentException(s"System memory $systemMemory must " + s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " + s"option or spark.driver.memory in Spark configuration.") } // SPARK-12759 Check executor memory to fail fast if memory is insufficient if (conf.contains("spark.executor.memory")) { val executorMemory = conf.getSizeAsBytes("spark.executor.memory") if (executorMemory < minSystemMemory) { throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + s"$minSystemMemory. Please increase executor memory using the " + s"--executor-memory option or spark.executor.memory in Spark configuration.") } } -
Spark可用內存
Spark可用總內存=(系統內存-預留內存)*spark.memory.fraction
val usableMemory = systemMemory - reservedMemory val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6) (usableMemory * memoryFraction).toLong -
Storage內存
Storage內存=Spark可用內存*spark.memory.storageFractiononHeapStorageRegionSize = (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong -
Execution內存
Execution內存=Spark可用內存-Storage內存
private[spark] class UnifiedMemoryManager private[memory] (
conf: SparkConf,
val maxHeapMemory: Long,
onHeapStorageRegionSize: Long,
numCores: Int)
extends MemoryManager(
conf,
numCores,
onHeapStorageRegionSize,
maxHeapMemory - onHeapStorageRegionSize)
```
- Storage內存與Execution內存的動態調整
Storage can borrow as much execution memory as is free until execution reclaims its space. When this happens, cached blocks will be evicted from memory until sufficient borrowed memory is released to satisfy the execution memory request.
Similarly, execution can borrow as much storage memory as is free. However, execution memory is never evicted by storage due to the complexities involved in implementing this. The implication is that attempts to cache blocks may fail if execution has already eaten up most of the storage space, in which case the new blocks will be evicted immediately according to their respective storage levels.
上面這段文字是Spark官方對內存調整的注釋,總結有如下幾點
- 當execution內存有空閑的時候,storage可以借用execution的內存;當execution需要內存的時候, storage會釋放借用的內存。這樣做是安全的,因為storage內存如果不夠可以溢出到本地磁盤。
- 當storage內存有空閑的時候也可以借給execution使用,但是當execution沒有使用完的情況下是無法歸還給storage的。因為execution是用來在計算過程中存儲臨時結果的,如果內存被釋放會導致后續的計算失敗。
-
user可支配內存
這部分內存完全由用戶來支配,例如存儲用戶自定義的數據結構。
更多更好的文章請關注數客聯盟
