一、Spark 內存介紹
在執行 Spark 的應用程序時,Spark 集群會啟動 Driver 和 Executor 兩種JVM進程。
Driver 程序主要負責:
- 創建 Spark上下文;
- 提交 Spark作業(Job)並將 Job 轉化為計算任務(Task)交給 Executor 計算;
- 協調各個 Executor 進程間任務調度。
Executor 程序主要負責:
- 在工作節點上執行具體的計算任務(Task),並將計算結果返回給 Driver;
- 為需要持久化的 RDD 提供存儲功能。
由於 Driver 的內存管理相對來說較為簡單,本文主要對 Executor 的內存管理進行分析,下文中的 Spark 內存均特指 Executor 的內存。
二、Executor 內存管理
Executor 進程作為一個 JVM 進程,其內存管理建立在 JVM 的內存管理之上,整個大致包含兩種方式:堆內內存和堆外內存。

spark.memory.offHeap.enabled=true # 開啟堆外內存 spark.memory.offHeap.size =1024 # 分配堆外內存的大小
2.1 堆內內存
- Spark 在代碼中 new 一個實例對象。
- JVM 從堆內內存分配空間,創建對象並返回對象飲用。
- Spark 保存該對象的引用,記錄該對象占用的內存。
- Spark 記錄該對象釋放的內存,刪除該對象的引用。
- 等待 JVM 的垃圾回收機制釋放該對象占用的堆內內存。
- Storage 內存:主要用於存儲 Spark 的 cache 數據,例如 RDD 的 cache,Broadcast 變量,Unroll 數據等。需要主要的是,unrolled 的數據如果內存不夠,會存儲在 driver 端。
- Execution 內存:用於存儲 Spark task 執行過程中需要的對象,如 Shuffle、Join、Sort、Aggregation等計算過程中的臨時數據。
- User 內存:分配 Spark Memory 剩余的內存,用戶可以根據需要使用,可以存儲 RDD transformations 需要的數據結構。
- Reserved 內存:這部分內存是預留給系統用的,固定不變。

2.2 堆外內存

三、統一內存管理機制
- 雙方空間都不足時,則存儲到硬盤;如己方空間不足而對方空余時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的 Block)。
- 執行內存的空間被對方占用后,可讓對方將占用的部分存儲轉存到硬盤,然后“歸還”借用的空間。
- 存儲內存的空間被對方占用后,無法讓對方“歸還”,因為需要考慮到 Shuffle 過程中很多因素,實現起來較為復雜。

四、Spark 內存參數
4.1 統一內存管理參數
參數:spark.memory.fraction 含義:用於執行和存儲的部分(堆空間-300MB)。值越低,溢出和緩存數據逐出的頻率就越高。此配置的目的是為稀疏的,異常大的記錄留出用於內部元數據,用戶數據結構和大小估計不精確的內存。建議將其保留為默認值。 默認值:0.6 (Spark 1.6 默認為 0.75,Spark 2.0+ 默認為 0.6) 參數:spark.memory.storageFraction 含義:可以逐出的存儲內存量,以spark.memory.fraction預留的區域大小的一部分表示。數值越高,則可用於執行的工作內存就越少,任務可能會更頻繁地溢出到磁盤上. 建議將其保留為默認值。 默認值:0.5 參數:spark.memory.offHeap.enabled 含義:如果為true,Spark將嘗試將堆外內存用於某些操作。如果啟用了堆外內存使用,則spark.memory.offHeap.size必須為正。 默認值:false 參數:spark.memory.offHeap.size 含義:可用於堆外分配的絕對內存量(以字節為單位)。此設置對堆內存使用沒有影響,因此,如果執行者的總內存消耗必須在某個硬限制內,那么請確保相應地縮小JVM堆大小。當spark.memory.offHeap.enabled=true時,必須將此值設置為正值. 默認值:0 參數:spark.memory.useLegacyMode 含義:是否啟用Spark 1.5及之前版本中使用的舊版內存管理模式。傳統模式將堆空間嚴格划分為固定大小的區域,如果未調整應用程序,則可能導致過多的溢出. 除非已啟用,否則不會讀取以下不推薦使用的內存分數配置: spark.shuffle.memoryFraction、spark.storage.memoryFraction、spark.storage.unrollFraction 默認值:false 參數:spark.shuffle.memoryFraction 含義:(不建議使用)僅在啟用spark.memory.useLegacyMode讀。在 shuffle 期間用於聚合和協同分組的Java堆的分數。在任何給定時間,用於隨機播放的所有內存映射的集合大小都受到此限制的限制,超出此限制,內容將開始溢出到磁盤。如果經常發生泄漏,請考慮以spark.storage.memoryFraction為代價增加此值. 默認值:0.2 參數:spark.storage.memoryFraction 含義:(不建議使用)僅在啟用spark.memory.useLegacyMode讀。用於Spark的內存緩存的Java堆的分數。這不應大於JVM中對象的"舊"代,默認情況下,該對象的堆大小為0.6,但是如果您配置自己的舊代大小,則可以增加它。 默認值:0.6 參數:spark.storage.unrollFraction 含義:(不建議使用)僅在啟用spark.memory.useLegacyMode讀。spark.storage.memoryFraction分數,用於展開內存中的塊。當沒有足夠的可用存儲空間來完全展開新塊時,通過刪除現有塊來動態分配該塊。 默認值:0.2
4.2 內存參數示例
spark-submit \ --master yarn \ --deploy-mode client \ --executor-memory 18g \ --queue root.exquery \ --class org.apache.spark.examples.SparkPi \ /opt/cloudera/parcels/CDH/lib/spark/examples/lib/spark-examples-1.6.0-cdh5.14.4-hadoop2.6.0-cdh5.14.4.jar \ 100
Spark UI 頁面顯示的內存情況:
可以看出,Storage Memory 的可用內存是 10 GB,這個數咋來的呢?根據前面的內存規則,可以得出以下計算:
systemMemory = spark.executor.memory reservedMemory = 300MB usableMemory = systemMemory - reservedMemory StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction
代入數據,可以得出以下結果:
systemMemory = 18Gb = 19327352832 字節 reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800 usableMemory = systemMemory - reservedMemory = 19327352832 - 314572800 = 19012780032 StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction = 19012780032 * 0.6 * 0.5 = 5703834009.6 = 5.312109375GB
結果和 Spark UI 界面展示的 10GB 差不多有一般的差距,這是因為 Spark UI 上顯示的 Storage Memory 可用內存其實等於 Storage 內存和 Execution 內存之和,也就是 usableMemory * spark.memory.fraction。
StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction = 19012780032 * 0.6 / (1024 * 1024 * 1024) = 10.62421 GB
這個結果和 Spark UI 上看到的結果還是有點出入,為什么呢?這是因為雖然我們設置了 --executor-memory 18g,但是 Spark 的 Executor 端通過 Runtime.getRuntime.maxMemory 拿到的內存其實沒這么大,只有 17179869184 字節,所以 systemMemory = 17179869184,然后計算的數據如下:
systemMemory = 17179869184 字節 reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800 usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384 StorageMemory= usableMemory * spark.memory.fraction = 16865296384 * 0.6 /(1024 * 1024 * 1024) = 9.42421 GB
我們通過將上面的 16865296384 * 0.6 字節除於 1024 * 1024 * 1024 轉換成 9.42421875 GB,和 UI 上顯示的還是對不上,這是因為 Spark UI 是通過除於 1000 * 1000 * 1000 將字節轉換成 GB,如下:
systemMemory = 17179869184 字節 reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800 usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384 StorageMemory= usableMemory * spark.memory.fraction = 16865296384 * 0.6 字節 = 16865296384 * 0.6 / (1000 * 1000 * 1000) = 10.1GB
systemMemory 內存之所以為 17179869184(可以通過方法 Runtime.getRuntime.maxMemory( ) 獲得該值),是因為內存分配池的堆部分分為 Eden,Survivor 和 Tenured 三部分空間,而這里面一共包含了兩個 Survivor 區域,而這兩個 Survivor 區域在任何時候我們只能用到其中一個,所以我們可以使用下面的公式進行描述:
ExecutorMemory = Eden + 2 * Survivor + Tenured Runtime.getRuntime.maxMemory = Eden + Survivor + Tenured
上面的 17179869184 字節可能因為 GC 配置不一樣得到的數據不一樣,但是上面的計算公式是一樣的。