Spark內存管理


spark內存按用途分為兩大類:execution memory和storage memory。其中execution memory是spark任務用來進行shuffle,join,sort,aggregation等運算所使用的內存;storage memory是用來緩存和傳播集群數據所使用的內存。在storage memory中還存在一個unroll memory。這個memory存在於將非序列化的數據保存到storage代表的內存的過程中。由於可能要存儲的非序列化數據過大,為了避免直接讀取所有非序列化數據造成內存溢出,Spark采用逐步讀取的策略,將讀取的數據預估大小保存到unroll memory中,如果unroll memory不足則向storage申請內存。如果最終內存可以成功讀取傳入的數據,則清空unroll memory(將內存還回storage memory),並從storage memory中申請對應大小的內存,並實際將數據保存到內存中。(注意的是這里的unroll memory和storage memory內存的更新都只是數值的變化,並不實際分配或釋放內存)

+

 

spark在1.6開始啟用了新的動態內存管理模型代替了之前的靜態內存管理模型,新的內存模型如下:

 

設JVM一共的內存為T,其中JVM有300MB的保留內存,execution和storage共享內存占比可以通過spark.memory.fraction配置(默認為0.6),在execution和storage共享內存中,storage的占比可以通過spark.memory.storageFraction配置(默認為0.5),在storage內存中每個任務的unroll初始為1MB,上限與storage的上限相同。

新老模型的區別主要在於新模型中,execution和storage的內存可以互相使用,且unroll內存不再設置上限;而老模型中execution和storage的內存是相互隔離的,如果出現一方內存不足,而另一方內存充足的情況,內存不足的一方也不能使用內存充足一方的內存。

Spark可以指定使用堆內內存存儲模式還是堆外內存存儲模式。但是storage內存好像只使用了堆內存儲模式,只有execution內存可以選擇是使用堆內存儲還是堆外存儲。Spark使用名為tungsten的內存管理機制來管理內存的分配和回收。tungsten主要使用類似操作系統的頁管理模式將內存分成多個頁,實現邏輯內存地址到物理內存地址的轉換,統一了堆內和堆外內存的管理。

spark內存管理配置

配置參數 默認值 說明
spark.memory.fraction 0.6 設置spark的execution和storage memory一共使用(JVM堆內存-300MB)的比例。
spark.memory.storageFraction 0.5 execution&storage共享內存中storage內存所占比例。默認是一半,即storage內存默認為(JVM堆內存-300MB) 0.6 0.5
spark.memory.offHeap.enabled false 是否開啟堆外存儲模式
spark.memory.offHeap.size 0 在開啟堆外存儲模式時,堆外存儲的內存大小。這個配置需要spark.memory.offHeap.enabled=true

spark內存管理流程圖

spark-store

MemoryStore是Spark緩存數據(使用storage memory)的入口,由BlockManager調用。MemoryStore有兩類數據的保存形式:將序列化數據保存到storage內存中,和將非序列化數據保存到內存中。對於非序列化數據,由於可能傳入的數據過大,為了避免內存溢出,需要先估算傳入的非序列化數據大小,並申請unroll內存,如果發現非序列化數據可以放入內存則再實際將數據保存到內存中。Spark當前存在兩種內存管理器(通過spark.memory.useLegacyMode指定,默認使用新的內存管理方法即UnifiedMemoryManager),內存管理器管理着StorageMemoryPool和ExecutionMemoryPool,負責向其申請和釋放內存,每個JVM中只有一個內存管理器。流程如下: 

  1. 對於保存序列化數據(putBytes),首先向StorageMemoryPool申請storage內存,如果內存不足則StorageMemoryPool會收回借用給execution並還沒有被使用的內存。對於保存非序列化數據(putIterator),首先預估存入的數據大小,並向StorageMemoryPool申請unroll內存,如果內存充足,則釋放unroll內存,並申請數據大小的storage內存
  2. 如果內存充足則將數據封裝為MemoryEntry保存到MemoryStore的map中

MemoryConsumer是Spark任務運行申請內存(使用execution memory)的入口,MemoryConsumer為抽象類,需要申請內存進行計算的類繼承自MemoryConsumer。TaskMemoryManager是管理每個任務申請的execution內存的類,每一個executor對應一個TaskMemoryManager。任務運行可以使用堆內內存也可以使用堆外內存(根據配置文件參數spark.memory.offHeap.enabled指定)。流程如下: 

  1. MemoryConsumer向TaskMemoryManager申請execution內存
  2. TaskMemoryManager向ExecutionMemoryPool申請內存,如果execution內存不足,會借用storage內存,如果還不足會強制將storage中緩存的數據刷新到磁盤上釋放內存
  3. 如果ExecutionMemoryPool返回的內存不足,則調用MemoryConsumer.spill方法,將MemoryConsumer占用的內存數據刷新到磁盤上釋放內存
  4. 根據配置的內存模式,為MemoryConsumer分配堆內/堆外內存
  5. 分配的內存會被包裝為MemoryBlock,每個MemoryBlock對應一個page
  6. TaskMemoryManager中維護了任務的pageTable,任務可以通過page number查詢到對應的MemoryBlock

源碼解析

spark內存管理相關的模塊/包比較多,總體包括:

  • core/java/org.apache.spark.memory
  • core/scala/org.apache.spark.memory
  • core/scala/org.apache.spark.storage/memory
  • common/unsafe/java/org.apache.spark.unsafe.memory

其中core/scala/org.apache.spark.memory包包含的是內存管理的核心類,包括MemoryManager、MemoryPool以及具體實現類。

core/scala/org.apache.spark.storage/memory包只有一個類MemoryStore,主要用來將數據塊以序列化/非序列化的形式寫入storage memory中

core/java/org.apache.spark.memory包包含了三個類:MemoryConsumer,TaskMemoryManager,MemoryMode,主要用來處理spark任務申請/釋放execution memory的邏輯。

common/unsafe/java/org.apache.spark.unsafe.memory包被TaskMemoryManager申請/釋放內存方法調用,用來實際申請內存。

這里首先介紹內存管理核心類(core/scala/org.apache.spark.memory包內存),然后再按execution和storage分別介紹申請/釋放execution memory和storage memory相關類。

內存管理核心類(core/scala/org.apache.spark.memory)

MemoryPool

memoryPool是組成MemoryManager的類,它包括兩個具體實現類:ExecutionMemoryPool和StorageMemoryPool,分別對應execution memory和storage memory。memoryPool及其子類的主要用途是用來標記execution/storage內存的使用情況(已使用內存大小,剩余內存大小,總內存大小等)。當需要新的內存時,spark通過memoryPool來判斷內存是否充足。需要注意的是memoryPool以及子類方法只是用來標記內存使用情況,而不實際分配/回收內存。

  /** * 返回當前pool的大小 */ final def poolSize: Long = lock.synchronized { _poolSize } /** * 返回當前pool可用內存大小 */ final def memoryFree: Long = lock.synchronized { _poolSize - memoryUsed } /** * 增大pool大小 */ final def incrementPoolSize(delta: Long): Unit = lock.synchronized { require(delta >= 0) _poolSize += delta } /** * 減小pool大小 */ final def decrementPoolSize(delta: Long): Unit = lock.synchronized { require(delta >= 0) require(delta <= _poolSize) require(_poolSize - delta >= memoryUsed) _poolSize -= delta } 

ExecutionMemoryPool

ExecutionMemoryPool被用來記錄被任務使用的內存情況。ExecutionMemoryPool保證每個任務都能獲取到一定的內存大小,避免先來的任務直接占用絕大部分內存導致后來的任務為了獲取到足夠的內存而頻繁的進行刷磁盤操作。如果有N個任務,ExecutionMemoryPool保證任務所占有的內存在被收回前可至少獲取到1 / 2N大小的內存,最多可以獲取1 / N大小的內存,其中N是處於active狀態的任務數。由於N是動態變化的,這個類會跟蹤N的變化,在N改變時通過notifyAll的方法通知處於等待狀態的任務重算1 / 2N和1 / N。

acquireMemory

acquireMemory為每個任務分配內存,返回實際分配的內存大小,如果不能分配內存,則返回0。這個方法在某些場景下會阻塞任務直到獲取到足夠的空閑內存(如當任務數量增多,而老任務已經占據大量內存時,新來的任務不能獲取到至少1 / 2N的內存時),來保證每個任務都有機會獲取到execution總內存的1 / 2N

private[memory] def acquireMemory( numBytes: Long, taskAttemptId: Long, maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit, computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized { assert(numBytes > 0, s"invalid number of bytes requested: $numBytes") // 如果memoryForTask中不包含當前任務,說明當前任務是新加入的,則喚醒等待的任務來重新計算maxMemoryPerTask // 和minMemoryPerTask if (!memoryForTask.contains(taskAttemptId)) { memoryForTask(taskAttemptId) = 0L lock.notifyAll() } // 保持循環直到任務已經獲得到可獲取的任務最大內存,或者當前有足夠的內存分配給任務(可用內存大於任務最小內存) while (true) { val numActiveTasks = memoryForTask.keys.size val curMem = memoryForTask(taskAttemptId) // 首先從storage中釋放足夠的內存(直接獲取storage空閑內存 || 收回之前storage借出的內存) maybeGrowPool(numBytes - memoryFree) // Maximum size the pool would have after potentially growing the pool. // This is used to compute the upper bound of how much memory each task can occupy. This // must take into account potential free memory as well as the amount this pool currently // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management, // we did not take into account space that could have been freed by evicting cached blocks. // 計算每個任務可使用的最大&最小內存 val maxPoolSize = computeMaxPoolSize() val maxMemoryPerTask = maxPoolSize / numActiveTasks val minMemoryPerTask = poolSize / (2 * numActiveTasks) // 理論上可以分配給當前任務的最大內存(min(申請內存數,任務可獲得的內存數)) val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem)) // 實際可以分配給任務的內存(min(可分配最大內存數,當前剩余內存數)) val toGrant = math.min(maxToGrant, memoryFree) // 如果可以分配的內存<任務需要的內存 && 一共分配給任務的內存<任務最少可分配的內存,則阻塞任務,直到其他任務來釋放內存 if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) { logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free") lock.wait() } else { // 否則,返回實際分配給任務的內存 memoryForTask(taskAttemptId) += toGrant return toGrant } } 0L // Never reached } 
releaseMemory

releaseMemory方法用來釋放給定的taskAttemptId的任務內存,釋放的大小為numBytes。這個類會在最后調用notifyAll方法來喚醒等待內存的任務重新計算1 / 2N和1 / N。

  def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized { val curMem = memoryForTask.getOrElse(taskAttemptId, 0L) var memoryToFree = if (curMem < numBytes) { logWarning( s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " + s"of memory from the $poolName pool") curMem } else { numBytes } if (memoryForTask.contains(taskAttemptId)) { memoryForTask(taskAttemptId) -= memoryToFree if (memoryForTask(taskAttemptId) <= 0) { memoryForTask.remove(taskAttemptId) } } lock.notifyAll() // Notify waiters in acquireMemory() that memory has been freed } 

StorageMemoryPool

用來記錄被用來存儲(緩存)的內存大小的內存池。分為堆內和堆外存儲模式,主要對外提供了申請和釋放內存的方法(acquireMemory,releaseMemory,freeSpaceToShrinkPool),需要注意的是這個類只作為內存記賬,只是記錄內存的使用情況,而不實際分配和釋放物理內存。由於StorageMemoryPool中的內存相關的參數可被多個線程同時訪問進行加減,所以StorageMemoryPool中涉及內存操作的方法都是同步方法。

acquireMemory

acquireMemory用來為緩存block提供內存空間,當Pool中的內存不足時,需要首先釋放部分空間。

  /** * @param blockId * @param numBytesToAcquire 需要保存的block的大小 * @param numBytesToFree StorageMemoryPool需要釋放的空間大小(現有的剩余存儲空間不足) * @return 是否足夠提供numBytesToAcquire的內存空間 */ def acquireMemory( blockId: BlockId, numBytesToAcquire: Long, numBytesToFree: Long): Boolean = lock.synchronized { assert(numBytesToAcquire >= 0) assert(numBytesToFree >= 0) assert(memoryUsed <= poolSize) // 如果當前Pool的內存空間不足,需要釋放numBytesToFree的空間才能放下block if (numBytesToFree > 0) { memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode) } // 判斷現有剩余內存是否大於等於申請的內存空間 val enoughMemory = numBytesToAcquire <= memoryFree if (enoughMemory) { _memoryUsed += numBytesToAcquire } enoughMemory } 
releaseMemory

releaseMemory用來釋放Pool的內存,這個方法只是簡單的進行內存數字上的加減,不實際釋放空間。

  def releaseMemory(size: Long): Unit = lock.synchronized { if (size > _memoryUsed) { logWarning(s"Attempted to release $size bytes of storage " + s"memory when we only have ${_memoryUsed} bytes") _memoryUsed = 0 } else { _memoryUsed -= size } } 
freeSpaceToShrinkPool

freeSpaceToShrinkPool方法用來縮小StorageMemoryPool的大小,作為Spark的ExecutionMemoryPool和StorageMemoryPool動態調整大小的支撐。首先判斷StorageMemoryPool是否有足夠的空間可以釋放,如果剩余空間不足需要釋放的空間,則調用memoryStore.evictBlocksToFreeSpace來釋放空間,最后返回StorageMemoryPool可釋放空間大小。

  def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized { val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree) val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode) // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. spaceFreedByReleasingUnusedMemory + spaceFreedByEviction } else { spaceFreedByReleasingUnusedMemory } } 

MemoryManger

MemoryManager是管理execution內存和storage內存的抽象類。其中execution內存主要被任務使用,用來進行計算、轉換等操作,如shuffle,join,排序,聚合等。storage內存被BlockManager所使用,用來保存block數據。每一個JVM只有一個MemoryManager。MemoryManager提供了獲取和釋放execution內存和storage內存的方法

Spark提供了兩種內存模型,即堆內內存和堆外內存。堆內內存即使用JVM來管理內存,堆外內存則直接調用java的Unsafe方法,直接對內存進行存取,從而避免了GC並節省了內存空間。

UnifiedMemoryManager

MemoryManager有兩個實現類:StaticMemoryManager和UnifiedMemoryManager。其中StaticMemoryManager是1.6之前的內存管理類,他的execution內存和storage內存是固定的不能動態調整。而UnifiedMemoryManager是新的內存管理類,execution內存和storage內存可以相互借用,動態調整。

execution和storage共享內存的大小設置為(JVM總內存-300MB)* spark.memory.fraction (default 0.6),其中storage默認的大小為spark.memory.storageFraction (default 0.5)。也就是說,storage大小默認是JVM堆空間的0.3。storage可以無限制的借用execution未使用的內存空間,直到execution需要的內存不夠時。當execution內存不夠需要回收storage之前借用的內存時,storage被緩存的塊會被從內存移到磁盤上,釋放之前借用的execution的內存。同樣的,execution也可以無限制的借用storage未使用的內存空間。但是execution已借用的內存不會被storage收回。

getMaxMemory

getMaxMemory方法用來獲取MemoryManager管理的execution和storage共享內存的最大值。計算方式為:首先系統默認保留有300MB的內存空間,其次操作系統的總內存不能小於默認保留內存的1.5倍,記為最小系統內存。再次如果設置了spark.execution.memory,則大小不能小於最小系統內存。最終execution和storage的共享內存為

$$ 可用內存 = (系統內存-保留內存) * spark.memory.fraction

$$

  private def getMaxMemory(conf: SparkConf): Long = { // 系統內存 val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) // 保留內存(默認300MB) val reservedMemory = conf.getLong("spark.testing.reservedMemory", if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES) // 最小系統內存為保留內存的1.5倍 val minSystemMemory = (reservedMemory * 1.5).ceil.toLong if (systemMemory < minSystemMemory) { throw new IllegalArgumentException(s"System memory $systemMemory must " + s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " + s"option or spark.driver.memory in Spark configuration.") } // executor設置的內存不能小於最小系統內存 if (conf.contains("spark.executor.memory")) { val executorMemory = conf.getSizeAsBytes("spark.executor.memory") if (executorMemory < minSystemMemory) { throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + s"$minSystemMemory. Please increase executor memory using the " + s"--executor-memory option or spark.executor.memory in Spark configuration.") } } // 最大可用內存為(系統內存-保留內存)*(execution&storage所占百分比) val usableMemory = systemMemory - reservedMemory val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6) (usableMemory * memoryFraction).toLong } 
acquireExecutionMemory

用來申請execution內存。execution最大內存=execution和storage共享內存最大值-min(storage已使用內存, storage初始大小)。即execution可以使用storage中的全部剩余內存,而且還可以收回之前借給storage的屬於execution的內存。

  override private[memory] def acquireExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long = synchronized { assertInvariants() assert(numBytes >= 0) val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match { case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, onHeapStorageRegionSize, maxHeapMemory) case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, offHeapStorageMemory, maxOffHeapMemory) } /** * 通過移除緩存的塊來增大execution pool的內存,這會減少storage pool的內存 */ def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { if (extraMemoryNeeded > 0) { val memoryReclaimableFromStorage = math.max( storagePool.memoryFree, storagePool.poolSize - storageRegionSize) if (memoryReclaimableFromStorage > 0) { // 調用storagePool.freeSpaceToShrinkPool來釋放storage占用的內存 val spaceToReclaim = storagePool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) storagePool.decrementPoolSize(spaceToReclaim) executionPool.incrementPoolSize(spaceToReclaim) } } } /** * execution可用的最大內存(execution可以使用storage中全部的剩余內存,而且還可以收回storage * 之前借出的屬於execution的內存) */ def computeMaxExecutionPoolSize(): Long = { maxMemory - math.min(storagePool.memoryUsed, storageRegionSize) } executionPool.acquireMemory( numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize) } 
acquireStorageMemory

用來申請storage內存方法。如果storage內存不足,則可以從execution的空閑內存中借用部分,如果在借用了內存后storage的可用內存仍然不滿足申請的內存大小,則將storage上的塊刷新到磁盤上來釋放內存。

  override def acquireStorageMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized { assertInvariants() assert(numBytes >= 0) val (executionPool, storagePool, maxMemory) = memoryMode match { case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, maxOnHeapStorageMemory) case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, maxOffHeapMemory) } // 如果請求的內存大於最大內存直接返回 if (numBytes > maxMemory) { // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + s"memory limit ($maxMemory bytes)") return false } // 如果請求的內存大於storage現有剩余內存,則從execution的空閑內存中借用 if (numBytes > storagePool.memoryFree) { val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes) executionPool.decrementPoolSize(memoryBorrowedFromExecution) storagePool.incrementPoolSize(memoryBorrowedFromExecution) } // 這里調用方法,如果空閑內存還是不夠,需要將storage上的塊刷新到磁盤上,來釋放足夠的內存 storagePool.acquireMemory(blockId, numBytes) } 

MemoryStore(core/scala/org.apache.spark.storage/memory)

MemoryStore是Spark申請storage內存的類,用來將數據塊保存到申請的storage內存中,並提供了從內存/磁盤獲取保存的數據的方法。在storage內存不足時,負責將內存中保存的數據刷新到磁盤上並釋放占用的內存。MemoryStore在保存數據之前,會調用MemoryManager的相關acquire方法,判斷StorageMemoryPool中是否有足夠的內存可以分配,如果可用內存不足則直接返回false,由調用者調用BlockEvictionHandler.dropFromMemory來移除內存中緩存的數據塊,釋放內存空間。如果可用內存充足則直接將數據塊保存到內存中。

putBytes

用來將數據以序列化的方式保存。首先通過MemoryManager從StorageMemoryPool中獲取到足夠的內存空間,然后將數據封裝為SerializedMemoryEntry保存到內存中,並建立blockId到內存地址的映射,便於查找。

  def putBytes[T: ClassTag]( blockId: BlockId, size: Long, memoryMode: MemoryMode, _bytes: () => ChunkedByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // 這里判斷是否有足夠的內存,如果不夠會最終調用BlockEvictionHandler里面的方法將內存中 // 的內容刷新到磁盤上,釋放內存空間 if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) { // We acquired enough memory for the block, so go ahead and put it val bytes = _bytes() assert(bytes.size == size) // 創建memoryEntry並放到map中管理 val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]]) entries.synchronized { entries.put(blockId, entry) } logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) true } else { false } 

putIteratorAsBytes

嘗試將給定的非序列化塊以非序列化的形式保存到內存中。為了防止由於放入的數據塊過大導致的內存溢出,這個方法在遍歷塊中數據時,定期的檢測是否還有足夠的unroll內存,如果不足則向storage申請內存(這個過程不會真正保存數據,只是保存數據的預估大小)。如果最終計算出數據塊的大小可以保存到內存中,則表明storage內存充足,直接從storage中扣除這部分內存數值,然后這時才真正將數據塊保存到內存中,這個方法在真正保存內存之前都是內存數值的計算,並不會真正申請內存,所以這個方法不會申請比實際需要內存更多的內存來存儲塊。

  private[storage] def putIteratorAsValues[T]( blockId: BlockId, values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") var elementsUnrolled = 0 var keepUnrolling = true val initialMemoryThreshold = unrollMemoryThreshold val memoryCheckPeriod = 16 // 當前任務保留的為展開操作的內存 var memoryThreshold = initialMemoryThreshold val memoryGrowthFactor = 1.5 var unrollMemoryUsedByThisBlock = 0L var vector = new SizeTrackingVector[T]()(classTag) keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP) // 如果成功從storage memory中獲取展開的內存,則更新當前任務使用的展開內存大小 if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") } else { unrollMemoryUsedByThisBlock += initialMemoryThreshold } while (values.hasNext && keepUnrolling) { vector += values.next() // 每讀取memoryCheckPeriod個元素后,檢查一下內存是否足夠 if (elementsUnrolled % memoryCheckPeriod == 0) { // 如果當前讀入的數據預估大小超過了展開內存的大小,則擴容 val currentSize = vector.estimateSize() if (currentSize >= memoryThreshold) { // 一次多申請一些空間 val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong // 嘗試再申請amountToRequest大小的空間 keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP) // 申請成功則更新使用的展開內存總量,如果失敗則退出循環 if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest } // 不論申請是否成功都更新memoryThreshold memoryThreshold += amountToRequest } } elementsUnrolled += 1 } // 到這如果keepUnrolling還是true,則說明成功分配了所有的unroll需要的內存 if (keepUnrolling) { val arrayValues = vector.toArray vector = null // 將values包裝成memoryEntry val entry = new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) val size = entry.size def transferUnrollToStorage(amount: Long): Unit = { memoryManager.synchronized { // 釋放unroll內存 releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) // 申請storage 內存 val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) assert(success, "transferring unroll memory to storage memory failed") } } // Acquire storage memory if necessary to store this block in memory. val enoughStorageMemory = { // 如果展開的內存小於values實際的大小(這是因為上面的while循環取得都是預估的大小) if (unrollMemoryUsedByThisBlock <= size) { // 再申請額外的內存 val acquiredExtra = memoryManager.acquireStorageMemory( blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) // 將unroll的內存數據遷移到storage內存上 if (acquiredExtra) { transferUnrollToStorage(unrollMemoryUsedByThisBlock) } acquiredExtra } else { // unrollMemoryUsedByThisBlock > size // If this task attempt already owns more unroll memory than is necessary to store the // block, then release the extra memory that will not be used. // 如果unroll的內存大於values的大小,則釋放多余的內存 val excessUnrollMemory = unrollMemoryUsedByThisBlock - size releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) transferUnrollToStorage(size) true } } // 如果將unroll的內存成功遷移到storage,則將memoryEntry放入entries中 if (enoughStorageMemory) { entries.synchronized { entries.put(blockId, entry) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) Right(size) } else { assert(currentUnrollMemoryForThisTask >= currentUnrollMemoryForThisTask, "released too much unroll memory") Left(new PartiallyUnrolledIterator( this, unrollMemoryUsedByThisBlock, unrolled = arrayValues.toIterator, rest = Iterator.empty)) } } else { // 如果沒有獲取到足夠的unroll 內存,則報錯,並返回PariallyUnrolledIterator // We ran out of space while unrolling the values for this block logUnrollFailureMessage(blockId, vector.estimateSize()) Left(new PartiallyUnrolledIterator( this, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest = values)) } } 

evictBlocksToFreeSpace

當storage內存不足時,會調用evictBlocksToFreeSpace方法來移除內存中的塊釋放內存空間。如果在釋放空間之后,申請存儲的塊的大小還是大於內存大小或者申請存儲的塊與移除的塊屬於同一個RDD,則移除失敗。

  private[spark] def evictBlocksToFreeSpace( blockId: Option[BlockId], space: Long, memoryMode: MemoryMode): Long = { assert(space > 0) memoryManager.synchronized { var freedMemory = 0L val rddToAdd = blockId.flatMap(getRddId) // 保存需要移除的塊 val selectedBlocks = new ArrayBuffer[BlockId] // 判斷block是否可以被移除: // block內存模式==傳入的內存模式 && block所屬的rdd != 傳入block所屬的rdd def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = { entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) } entries.synchronized { val iterator = entries.entrySet().iterator() // 如果申請的空間大於內存可用空間,並且entries里面還有塊, // 則嘗試將entries中的塊移除來釋放內存空間 while (freedMemory < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey val entry = pair.getValue // 判斷當前entry的塊是否可以被移除: // 塊的內存模式與申請的內存模式相同 && 塊所在rdd != 申請塊所在rdd if (blockIsEvictable(blockId, entry)) { // 為防止移除正在讀的塊,這里需要加寫鎖,lockForWriting方法為非阻塞方法,如果獲取不到 // 寫鎖,則直接跳過 if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) { selectedBlocks += blockId freedMemory += pair.getValue.size } } } } // 刪除內存中的塊 def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = { val data = entry match { case DeserializedMemoryEntry(values, _, _) => Left(values) case SerializedMemoryEntry(buffer, _, _) => Right(buffer) } // 調用blockEvictionHandler移除內存塊(刷磁盤,最終調用remove方法刪entries) val newEffectiveStorageLevel = blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag) if (newEffectiveStorageLevel.isValid) { blockInfoManager.unlock(blockId) } else { blockInfoManager.removeBlock(blockId) } } // 如果最終有足夠的空間 if (freedMemory >= space) { logInfo(s"${selectedBlocks.size} blocks selected for dropping " + s"(${Utils.bytesToString(freedMemory)} bytes)") for (blockId <- selectedBlocks) { // 遍歷刪除選中要移除的內存塊 val entry = entries.synchronized { entries.get(blockId) } if (entry != null) { dropBlock(blockId, entry) } } logInfo(s"After dropping ${selectedBlocks.size} blocks, " + s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}") freedMemory } else { blockId.foreach { id => logInfo(s"Will not store $id") } selectedBlocks.foreach { id => blockInfoManager.unlock(id) } 0L } } } 

tungsten(common/unsafe/java/org.apache.spark.unsafe.memory)

Spark的內存模式稱為tungsten,統一管理堆內內存分配和堆外內存分配兩種內存分配方式,負責內存的實際分配和回收,以及內存的訪問尋址。雖然storageMemoryPool也包括了堆外內存方式,但是好像storage只用到了堆內內存存儲(保存到storage內存的數據只是封裝為MemoryEntry直接保存到LinkedHashMap中)。tungsten的內存管理模式只是在execution內存使用時用到,即任務相關的內存申請可以使用堆外或堆內內存。

對於堆外內存來說,由於是Spark直接對物理內存進行管理(申請/釋放),內存的偏移地址就是數據實際存儲地址,所以可以直接用指向內存地址的指針來訪問數據。而對於堆內存儲來說,由於內存管理是交由JVM管理的,而由於GC,對象的存儲地址會發生變化,所以不能直接將數據的物理地址保存到指針中來訪問。

Spark的tungsten內存管理機制使用了類似於操作系統的頁和頁內偏移量的方式來統一管理堆內和堆外內存地址,由頁來保存邏輯地址到物理地址的映射。tungsten將物理內存分為一系列的頁來管理,頁內的數據具體再通過頁內偏移量來實際訪問。tungsten使用一個64 bits的long型來代表數據的內存地址。long型的高13位為頁碼,即tungsten最多支持8192個頁,long型的低51位為頁內偏移量。訪問內存數據流程:首先根據tungsten的long型內存地址解析出頁號和頁內偏移量,然后根據頁號獲取到對應頁的物理地址,則數據的實際物理地址為頁物理地址+頁內偏移量(這里說的邏輯和物理地址是在應用層面上進行的又一次抽象,不同於操作系統的邏輯和物理地址,但是思想是一致的)。流程圖如下:

spark-store

對於堆內和堆外模式來說,邏輯地址是一致的,而物理地址是不同的。對於堆外模式來說,物理地址直接由一個64 bits的絕對內存地址來表示,對於堆內模式來說,物理地址由相對於JVM對象的偏移量來表示。為了統一這兩個內存模式,tungsten使用MemoryLocation來表示堆內和堆外的內存地址。

MemoryLocation

MemoryLocation來表示堆內和堆外的物理內存地址。對於堆內模式來說,obj是JVM對象的基地址,offset是相對於obj的偏移量(默認為0),對於堆外模式來說,obj為空,offset為絕對內存地址。

public class MemoryLocation { // 堆內內存標記內存的地址 @Nullable Object obj; // 堆外內存標記內存的地址 long offset; public MemoryLocation(@Nullable Object obj, long offset) { this.obj = obj; this.offset = offset; } public MemoryLocation() { this(null, 0); } public void setObjAndOffset(Object newObj, long newOffset) { this.obj = newObj; this.offset = newOffset; } public final Object getBaseObject() { return obj; } public final long getBaseOffset() { return offset; } } 

MemoryBlock

MemoryBlock是MemoryLocation的子類,用來表示tungsten中的內存。其中增加了記錄內存大小的length和記錄內存對應頁號的pageNumber變量。

public class MemoryBlock extends MemoryLocation { private final long length; public int pageNumber = -1; public MemoryBlock(@Nullable Object obj, long offset, long length) { super(obj, offset); this.length = length; } public long size() { return length; } public static MemoryBlock fromLongArray(final long[] array) { return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); } } 

MemoryAllocator

tungsten實際的內存分配和回收由MemoryAllocator接口的實現類完成。MemoryAllocator有兩個實現類:HeapMemoryAllocator和UnsafeMemoryAllocator,分別負責堆內模式和堆外模式的內存分配和回收。MemoryAllocator有兩個接口allocate和free來實現內存的分配和回收。

HeapMemoryAllocator

HeapMemoryAllocator負責堆內內存模式的內存分配和回收,具體來說new一個long型數組來實現內存的分配,而由於JVM是自動進行內存回收的,所以HeapMemoryAllocator的free方法不負責內存的回收。HeapMemoryAllocator在實現時,針對大內存的分配和回收進行了優化,會將回收的大內存直接緩存在map中,之后如果再有申請相同大小的內存的請求時,直接返回緩存中的內存而不用每次都new一個對象,從而減少GC。

  // 內存緩存 private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> bufferPoolsBySize = new HashMap<>(); public MemoryBlock allocate(long size) throws OutOfMemoryError { // 如果需要啟用緩存機制 if (shouldPool(size)) { // 則從緩存池中找到第一個可用的內存塊,直接返回 synchronized (this) { final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); if (pool != null) { while (!pool.isEmpty()) { final WeakReference<MemoryBlock> blockReference = pool.pop(); final MemoryBlock memory = blockReference.get(); if (memory != null) { assert (memory.size() == size); return memory; } } bufferPoolsBySize.remove(size); } } } // 申請一個size+7/8大小的long數組 long[] array = new long[(int) ((size + 7) / 8)]; // 將數組的起始位置,作為占位用的使用堆外內存的offset(這里是0),和偏移量傳入MemoryBlock return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); } public void free(MemoryBlock memory) { final long size = memory.size(); // 是否需要啟用緩存機制 if (shouldPool(size)) { // 將當前釋放的內存塊緩存起來,以便下次申請同樣大小的內存時直接返回 synchronized (this) { LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); if (pool == null) { pool = new LinkedList<>(); bufferPoolsBySize.put(size, pool); } pool.add(new WeakReference<>(memory)); } } else { // Do nothing } } 
UnsafeMemoryAllocator

UnsafeMemoryAllocator負責堆外模式的內存分配和回收,直接調用java Unsafe包的方法直接對內存進行申請和釋放操作。

  @Override public MemoryBlock allocate(long size) throws OutOfMemoryError { long address = Platform.allocateMemory(size); return new MemoryBlock(null, address, size); } @Override public void free(MemoryBlock memory) { assert (memory.obj == null) : "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?"; Platform.freeMemory(memory.offset); } 

execution memory分配(core/java/org.apache.spark.memory)

TaskMemoryManager

TaskMemoryManager管理每個任務的內存使用,在其中維護了任務可用所有內存頁,已分配的內存頁,任務中的consumers等。主要用來實現execution內存的分配,回收,tungsten地址到實際內存地址映射的轉換。

acquireExecutionMemory

為指定的consumer分配N bytes的內存,如果內存不足,則會調用consumer的spill方法來釋放內存。這個方法不真正分配內存,只是進行內存數值的加減(實際調用ExecutionMemoryPool)執行流程如下:

  1. 首先從execution memory獲取當前任務的內存,如果內存充足則到3(可能觸發將內存塊刷到磁盤)
  2. 如果內存不足,嘗試從其他consumer那釋放內存(調用consumer的spill方法,將consumer占用的內存刷新到磁盤),如果內存滿足條件,則到3
  3. 如果內存還是不足,嘗試從自身釋放部分內存,最終返回分配給consumer的內存大小
  public long acquireExecutionMemory(long required, MemoryConsumer consumer) { assert(required >= 0); assert(consumer != null); MemoryMode mode = consumer.getMode(); synchronized (this) { long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode); // 如果內存不足,且存在調用過spill的consumer,首先嘗試從這些consumer中釋放一些內存 if (got < required) { for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { try { // 釋放內存 long released = c.spill(required - got, consumer); if (released > 0) { logger.debug("Task {} released {} from {} for {}", taskAttemptId, Utils.bytesToString(released), c, consumer); // 重新向execution memory申請required - got內存 got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); // 如果獲取到足夠內存,則直接返回 if (got >= required) { break; } } } catch (IOException e) { logger.error("error while calling spill() on " + c, e); throw new OutOfMemoryError("error while calling spill() on " + c + " : " + e.getMessage()); } } } } // 如果沒有consumer || 已有的spill consumer釋放的內存不足,則嘗試釋放自己占用的內存 if (got < required) { try { long released = consumer.spill(required - got, consumer); if (released > 0) { logger.debug("Task {} released {} from itself ({})", taskAttemptId, Utils.bytesToString(released), consumer); got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); } } catch (IOException e) { logger.error("error while calling spill() on " + consumer, e); throw new OutOfMemoryError("error while calling spill() on " + consumer + " : " + e.getMessage()); } } consumers.add(consumer); logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer); return got; } } 

allocatePage

由MemoryConsumer調用來申請內存頁,流程如下:

  1. 調用acquireExecutionMemory獲取內存

  2. 分配一個pageNumber

  3. 調用allocate方法實際分配內存(堆內直接new一個acquired大小的數組,堆外直接手動申請內存,並包裝為

    MemoryBlock)

  4. 設置pageTable[pageNumber]=page,並返回包裝好的MemoryBlock

  public MemoryBlock allocatePage(long size, MemoryConsumer consumer) { assert(consumer != null); assert(consumer.getMode() == tungstenMemoryMode); if (size > MAXIMUM_PAGE_SIZE_BYTES) { throw new IllegalArgumentException( "Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes"); } long acquired = acquireExecutionMemory(size, consumer); if (acquired <= 0) { return null; } final int pageNumber; synchronized (this) { // 獲取可用的頁號 pageNumber = allocatedPages.nextClearBit(0); if (pageNumber >= PAGE_TABLE_SIZE) { releaseExecutionMemory(acquired, consumer); throw new IllegalStateException( "Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages"); } allocatedPages.set(pageNumber); } MemoryBlock page = null; try { // 實際申請內存,封裝成MemoryBlock返回 page = memoryManager.tungstenMemoryAllocator().allocate(acquired); } catch (OutOfMemoryError e) { logger.warn("Failed to allocate a page ({} bytes), try again.", acquired); synchronized (this) { acquiredButNotUsed += acquired; allocatedPages.clear(pageNumber); } return allocatePage(size, consumer); } page.pageNumber = pageNumber; pageTable[pageNumber] = page; if (logger.isTraceEnabled()) { logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired); } return page; } 

freePage

釋放內存塊流程:

  1. 將page table 設為null
  2. 清除allocatePage對應的pageNumber
  3. 清除申請的內存(堆外模式需要手動清除,堆內模式無需做,因為已經沒有引用memoryBlock的地方了,會被gc回收)
  4. 重置execution memory(簡單的將當前任務使用的內存減去,並執行notifyAll方法,喚醒其他由於沒有獲得最少內存而被阻塞的線程)
  public void freePage(MemoryBlock page, MemoryConsumer consumer) { assert (page.pageNumber != -1) : "Called freePage() on memory that wasn't allocated with allocatePage()"; assert(allocatedPages.get(page.pageNumber)); pageTable[page.pageNumber] = null; synchronized (this) { allocatedPages.clear(page.pageNumber); } if (logger.isTraceEnabled()) { logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size()); } long pageSize = page.size(); memoryManager.tungstenMemoryAllocator().free(page); releaseExecutionMemory(pageSize, consumer); } 

地址轉換

TaskMemoryManager提供了物理地址和邏輯地址的相互轉換

  public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) { if (tungstenMemoryMode == MemoryMode.OFF_HEAP) { // 在堆外內存模式中,offset就是絕對內存地址,這里轉換成頁內的相對地址,保證offset在51bits之內 offsetInPage -= page.getBaseOffset(); } return encodePageNumberAndOffset(page.pageNumber, offsetInPage); } @VisibleForTesting public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) { assert (pageNumber != -1


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM