spark 源碼分析之十五 -- Spark內存管理剖析


本篇文章主要剖析Spark的內存管理體系。

在上篇文章 spark 源碼分析之十四 -- broadcast 是如何實現的?中對存儲相關的內容沒有做過多的剖析,下面計划先剖析Spark的內存機制,進而進入內存存儲,最后再剖析磁盤存儲。本篇文章主要剖析內存管理機制。

整體介紹

Spark內存管理相關類都在 spark core 模塊的 org.apache.spark.memory 包下。

文檔對這個包的解釋和說明如下:

This package implements Spark's memory management system. This system consists of two main components, a JVM-wide memory manager and a per-task manager:

- org.apache.spark.memory.MemoryManager manages Spark's overall memory usage within a JVM. This component implements the policies for dividing the available memory across tasks and for allocating memory between storage (memory used caching and data transfer) and execution (memory used by computations, such as shuffles, joins, sorts, and aggregations).
- org.apache.spark.memory.TaskMemoryManager manages the memory allocated by individual tasks. Tasks interact with TaskMemoryManager and never directly interact with the JVM-wide MemoryManager. Internally, each of these components have additional abstractions for memory bookkeeping: - org.apache.spark.memory.MemoryConsumers are clients of the TaskMemoryManager and correspond to individual operators and data structures within a task. The TaskMemoryManager receives memory allocation requests from MemoryConsumers and issues callbacks to consumers in order to trigger spilling when running low on memory. - org.apache.spark.memory.MemoryPools are a bookkeeping abstraction used by the MemoryManager to track the division of memory between storage and execution.

 

即內存管理主要涉及了兩個組件:JVM 范圍的內存管理和單個任務的內存管理。

  1. MemoryManager管理Spark在JVM中的總體內存使用情況。該組件實現了跨任務划分可用內存以及在存儲(內存使用緩存和數據傳輸)和執行(計算使用的內存,如shuffle,連接,排序和聚合)之間分配內存的策略。
  2. TaskMemoryManager管理由各個任務分配的內存。任務與TaskMemoryManager交互,永遠不會直接與JVM范圍的MemoryManager交互。

 

在TaskMemoryManager內部,每個組件都有額外的記憶簿來記錄內存使用情況:

 

  • MemoryConsumers是TaskMemoryManager的客戶端,對應於任務中的各個運算符和數據結構。TaskMemoryManager接收來自MemoryConsumers的內存分配請求,並向消費者發出回調,以便在內存不足時觸發溢出。
  • MemoryPools是MemoryManager用來跟蹤存儲和執行之間內存划分的薄記抽象。 

如圖:

 

MemoryManager的兩種實現: 

There are two implementations of org.apache.spark.memory.MemoryManager which vary in how they handle the sizing of their memory pools: 
- org.apache.spark.memory.UnifiedMemoryManager, the default in Spark 1.6+, enforces soft boundaries between storage and execution memory, allowing requests for memory in one region to be fulfilled by borrowing memory from the other.
- org.apache.spark.memory.StaticMemoryManager enforces hard boundaries between storage and execution memory by statically partitioning Spark's memory and preventing storage and execution from borrowing memory from each other. This mode is retained only for legacy compatibility purposes.

 

org.apache.spark.memory.MemoryManager有兩種實現,它們在處理內存池大小方面有所不同:

  • org.apache.spark.memory.UnifiedMemoryManager,Spark 1.6+中的默認值,強制存儲內存和執行內存之間的軟邊界,允許通過從另一個區域借用內存來滿足一個區域中的內存請求。
  • org.apache.spark.memory.StaticMemoryManager 通過靜態分區Spark的內存,強制存儲內存和執行內存之間的硬邊界並防止存儲和執行從彼此借用內存。 僅為了傳統兼容性目的而保留此模式。

先來一張自己畫的類圖,對涉及類之間的關系有一個比較直接的認識:

 

下面我們逐一對涉及的類做說明。

MemoryMode

內存模式:主要分堆內內存和堆外內存,MemoryMode是一個枚舉類,從本質上來說,ON_HEAP和OFF_HEAP都是MemoryMode的子類。

MemoryPool

文檔說明如下:

Manages bookkeeping for an adjustable-sized region of memory. This class is internal to the MemoryManager. 

 

即它負責管理可調大小的內存區域的簿記工作。可以這樣理解,內存就是一個金庫,它是一個負責記賬的管家,主要負責記錄內存的借出歸還。這個類專門為MempryManager而設計。

給內存記賬,其實從本質上來說,它不是Spark內存管理部分的核心功能,但是又很重要,它的核心方法都是被MemoryManager來調用的。

理解了這個類,其子類就比較好理解了。記賬的管家有兩種實現,分別是StorageMemoryPool和ExecutionMemoryPool。

StorageMemoryPool

文檔解釋:

Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage (caching).

 

說白了,它就是專門給負責存儲或緩存的內存區域記賬的。

其類結構如下:

它有三種方法:

1. acquireMemory:獲取N個字節的內存給指定的block,如果有必要,即內存不夠用了,可以將其他的從內存中驅除。源碼如下:

圖中標記的邏輯,參照下文MemoryStore的剖析。

2. releaseMemory:釋放內存。源碼如下:

很簡單,就只是在統計值_memoryUsed 上面做減法。

3. freeSpaceToShrinkPool:可用空間通過`spaceToFree`字節縮小此存儲內存池的大小。源碼如下:

 

簡單地可以看出,這個方法是在收縮存儲內存池之前調用的,因為這個方法返回值是要收縮的值。

收縮存儲內存池是為了擴大執行內存池,即這個方法是在收縮存儲內存,擴大執行內存時用的,這個方法只是為了縮小存儲內存池作准備的,並沒有真正的縮小存儲內存池。

實現思路,首先先計算需要驅逐的內存大小,如果需要驅逐內存,則跟 acquireMemory 方法類似,調用MemoryStore 的 evictBlocksToFreeSpace方法,否則直接返回。

總結:這個類是給存儲內存池記賬的,也負責不夠時或內存池不滿足縮小條件時,通知MemoryStore驅逐內存。

 

ExecutionMemoryPool

文檔解釋:

Implements policies and bookkeeping for sharing an adjustable-sized pool of memory between tasks. 
Tries to ensure that each task gets a reasonable share of memory,
instead of some task ramping up to a large amount first and then causing others to spill to disk repeatedly.
If there are N tasks, it ensures that each task can acquire at least 1 / 2N of the memory before it has to spill,
and at most 1 / N. Because N varies dynamically, we keep track of the set of active tasks and redo the calculations
of 1 / 2N and 1 / N in waiting tasks whenever this set changes. This is all done by synchronizing access to mutable
state and using wait() and notifyAll() to signal changes to callers. Prior to Spark 1.6, this arbitration of memory
across tasks was performed by the ShuffleMemoryManager.

 

實現策略和簿記,以便在任務之間共享可調大小的內存池。 嘗試確保每個任務獲得合理的內存份額,而不是首先增加大量任務然后導致其他任務重復溢出到磁盤。

如果有N個任務,它確保每個任務在溢出之前至少可以獲取1 / 2N的內存,最多1 / N.

由於N動態變化,我們會跟蹤活動任務的集合並在每當任務集合改變時重做等待任務中的1 / 2N和1 / N的計算。

這一切都是通過同步對可變狀態的訪問並使用 wait() 和 notifyAll() 來通知對調用者的更改來完成的。 在Spark 1.6之前,跨任務的內存仲裁由ShuffleMemoryManager執行。 

 
類內部結構如下:

memoryForTask聲明如下:

1 @GuardedBy("lock")
2 private val memoryForTask = new mutable.HashMap[Long, Long]()

其中,key 指的是 taskAttemptId, value 是內存使用情況(以byte計算)。它用來記錄每一個任務內存使用情況。

它也有三類方法:

1. 獲取總的或每一個任務的內存使用大小,源碼如下:

memoryForTask 記錄了每一個task使用的內存大小。

 

2. 給一個任務分配內存,源碼如下:

numBytes表示申請的內存大小(in byte),taskAttemptId 表示申請內存的 task id,maybeGrowPool 表示一個可能會增加執行池大小的回調。 它接受一個參數(Long),表示應該擴展此池的所需內存量。computeMaxPoolSize 表示在此給定時刻返回此池的最大允許大小的回調。這不是字段,因為在某些情況下最大池大小是可變的。 例如,在統一內存管理中,可以通過驅逐緩存塊來擴展執行池,從而縮小存儲池。

如果之前該任務沒有申請過,則將(taskAttemptId <- 0) 放入到 memoryForTask map 中, 然后釋放鎖並喚醒lock鎖等待區的線程。

被喚醒的因為synchronized實現的是一個互斥鎖,所以當前僅當只有一個線程執行while循環。

首先根據 (需要的內存大小 - 池總空閑內存大小)來確認是否需要擴大池,由於存儲池可能會偷執行池的內存,所以需要執行 maybeGrowPool 方法。

computeMaxPoolSize計算出此時該池允許的最大內存大小。然后分別算出每個任務最大分配內存和最小分配內存。進而計算出分配給該任務的最大分配大小(maxToGrant)和實際分配大小(toGrant)。

如果實際分配大小 小於需要分配的內存大小 並且 當前任務占有內存 + 實際分配內存 < 每個任務最小分配內存,則該線程進入鎖wait區等待,等待內存可用時喚醒,否則將內存分配給任務。

可以看到這個方法中的wait和notify方法並不是成對的,因為新添加的taskAttemptId不能滿足內存可用的條件。因為這個鎖是從外部傳過來的,即MemoryManager也可能對其做了操作,使內存空余下來,可供任務分配。

3. 釋放task內存,源碼如下:

它有兩個方法,分別是釋放當前任務已經使用的所有內存空間 releaseAllMemoryForTask 和釋放當前任務的指定大小的內存空間 releaseMemory。

思路:

releaseAllMemoryForTask 先計算好當前任務使用的全部內存,然后調用 releaseMemory 方法釋放內存。

releaseMemory 方法則會比對當前使用內存和要釋放的內存,如果要釋放的內存大小小於 當前使用的 ,做減法即可。釋放之后的任務內存如果小於等於0,則移除task即可,最后通知lock鎖等待區的對象,讓其重新分配內存。

在這個記賬的實現里,每一個來的task不一定是可以分配到內存的,所以,鎖在其中起了很大的資源協調的作用,也防止了內存的溢出。

 

MemoryManager

文檔說明:

An abstract memory manager that enforces how memory is shared between execution and storage. In this context, execution memory refers to that used for computation in shuffles, joins, sorts and aggregations, while storage memory refers to that used for caching and propagating internal data across the cluster. There exists one MemoryManager per JVM.

一種抽象內存管理器,用於強制執行和存儲之間共享內存的方式。在這個上下文下,執行內存是指用於在shuffle,join,sort和aggregation中進行計算的內存,而存儲內存是指用於在群集中緩存和傳播內部數據的內存。 每個JVM都有一個MemoryManager。

先來說一下其依賴的MemoryPool,源碼如下:

MemoryPool中的lock對象就是MemoryManager對象

存儲內存池和執行內存池分別有兩個:堆內和堆外。

onHeapStorageMemory和onHeapExecutionMemory 是從構造方法傳過來的,先不予考慮。

maxOffHeapMemory 默認是 0, 可以根據 spark.memory.offHeap.size 參數設置,文檔對這個參數的說明如下:

The absolute amount of memory in bytes which can be used for off-heap allocation. 
This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit 
then be sure to shrink your JVM heap size  accordingly. This must be set to a positive value when spark.memory.offHeap.enabled=true.

 

存儲堆外內存 = 最大堆外內存(offHeapStorageMemory) X 堆外存儲內存占比,這個占比默認是0.5,可以根據 spark.memory.storageFraction 來調節

執行堆外內存 = 最大堆外內存 - 存儲堆外內存

還有跟 Tungsten 管理內存有關的常量:

這三個常量分別定義了tungsten的內存形式、內存頁大小和內存分配器。

 

其方法解釋如下:

1. 獲取存儲池最大使用內存,抽象方法,待子類實現。

 

2. 獲取已使用內存

3. 獲取內存,這也是抽象方法,待子類實現

 

4. 釋放內存

這些請求都委托給對應的MemoryPool來做了

1.6 之前 使用MemoryManager子類 StaticMemoryManager 來做內存管理。

StaticMemoryManager

這個靜態內存管理中的執行池和存儲池之間有嚴格的界限,兩個池的大小永不改變。

注意:如果想使用這個內存管理方式,設置 spark.memory.useLegacyMode 為 true即可(默認是false)

 

下面我們重點看1.6 之后的默認使用的MemoryManager子類 -- UnifiedMemoryManager

UnifiedMemoryManager

先來看文檔說明:

這個MemoryManager保證了存儲池和執行池之間的軟邊界,即可以互相借用內存來滿足彼此動態的內存需求變化。執行和存儲的占比由 spark.memory.storageFraction 配置,默認是0.6,即偏向於存儲池。其中存儲池的默認占比是由 spark.memory.storageFraction 參數決定,默認是 0.5 ,即 存儲池默認占比 = 0.6 * 0.5 = 0.3 ,即存儲池默認占比為0.3。存儲池可以盡可能多的向執行池借用空閑內存。但是當執行池需要它的內存的時候,會把一部分內存池的內存對象從內存中驅逐出,直到滿足執行池的內存需求。類似地,執行池也可以盡可能地借用存儲池中的空閑內存,不同的是,執行內存不會被存儲池驅逐出內存,也就是說,緩存block時可能會因為執行池占用了大量的內存池不能釋放導致緩存block失敗,在這種情況下,新的block會根據StorageLevel做相應處理。

 

我們主要來看其實現的父類MemoryManager 的方法:

1. 獲取存儲池最大使用內存:

其中,maxHeapMemory 是從構造方法傳進來的成員變量,maxOffHeapMemory 是根據參數 spark.memory.offHeap.size 配置生成的。

可以看出,存儲池的允許的最大使用內存是實時變化的,因為總內存不變,執行池內存使用情況隨任務執行情況變化而變化。

 

2. 獲取內存,逐一來看:

實現思路:先根據存儲方式(堆內還是堆外)確定存儲池,執行池,存儲區域內存大小和最大總內存。

然后調用執行池的 acquireMemory 方法申請內存,computeMaxExecutionPoolSize是隨存儲的實時變化而變化的,增大ExecutionPool的回調也被調用來確保有足夠空間可供執行池分配。

acquireUnrollMemory 直接調用 acquireStorageMemory 方法。

acquireStorageMemory實現思路:先根據存儲方式(堆內還是堆外)確定存儲池,執行池,存儲區域內存大小和最大總內存。

存儲內存如果大於最大內存,直接存儲失敗,否則,繼續查看所需內存大小是否大於內存池最大空閑內存,如果大於,則從執行池中申請足夠的空閑空間,注意,真正申請的空間大小在0 和numBytes - storagePool.memoryFree 之間,繼續調用storagePool的acquireMemory 方法去申請內存,如果不夠申請,則會驅逐出舊或空的block塊。

最后,我們來看一下其伴生對象:

首先 apply 方法就類似於工廠方法的創造方法。我們對比下面的一張圖,來說明一下Spark內存結構:

系統內存:可以根據 spark.testing.memory 參數來配置(主要用於測試),默認是JVM 的可以使用的最大內存。

保留內存:可以根據 spark.testing.reservedMemory 參數來配置(主要用於測試), 默認是 300M

最小系統內存:保留內存 * 1.5 后,再向下取整

系統內存的約束:系統內存必須大於最小保留內存,即 系統可用內存必須大於 450M, 可以通過 --driver-memory 或  spark.driver.memory 或 --executor-memory 或spark.executor.memory 來調節

可用內存 = 系統內存 - 保留內存

堆內內存占比默認是0.6, 可以根據 spark.memory.fraction 參數來調節

最大堆內內存 = 堆內可用內存 * 堆內內存占比

堆內內存存儲池占比默認是 0.5 ,可以根據spark.memory.storageFraction 來調節。

默認堆內存儲內存大小 = 最大堆內內存 * 堆內內存存儲池占比。即堆內存儲池內存大小默認是 (系統JVM最大可用內存 -  300M)* 0.6 * 0.5, 即約等於JVM最大可用內存的三分之一。

注意: 下圖中的spark.memory.fraction是0.75,是Spark 1.6 的默認配置。在Spark 2.4.3 中默認是0.6。

 圖片來源:https://0x0fff.com/spark-memory-management/

至此,Saprk 的內存管理模塊基本上剖析完畢。

總結:先介紹了內存的管理池,即MemoryPool的實現,然后重點分析了Spark 1.6 以后的內存管理機制,着重說明Spark內部的內存是如何划分以及如何動態調整內存的。

 

注,關於堆內內存和堆外內存的介紹,可參照:https://www.jianshu.com/p/50be08b54bee

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM