Spark從1.6.0版本開始,內存管理模塊就發生了改變,舊版本的內存管理模塊是實現了StaticMemoryManager 類,現在被稱為"legacy"。"Legacy"模式默認被置為不可用,這就意味着當你用Spark1.5.x和Spark1.6.x運行相同的代碼會有不同的結果,應當多加注意。考慮的兼容性,可以通過設置spark.memory.useLegacyMode為可用,默認是false.
這篇文章介紹自spark1.6.0版本后的新的內存管理模型,它實現的是UnifiedMemoryManager類。


在這張圖中你可以看到三個主要內存區域。
1.Reserved Memory.這部分內存是被系統預留的,它的大小也是被硬編碼的。在Spark1.6.0版本,它的大小是300MB,這就意味着這部分內存不能計入Spark內存計算,除非重新編譯源碼或設置spark.testing.reservedMemory,它的大小是不可改變的,因為park.testing.reservedMemory只是一個測試參數所以在生產中不推薦使用。注意,這部分內存只是被稱為“Reserved",實際上它不會被spark用來干任何事情 ,但是它限制了你在spark中可分配的內存大小。即使你想將全部JVM堆內存用於spark緩存數據,也不能使用這部分空閑內存(不是真的就浪費了,其實它存儲了Spark的一些內部對象)。供參考,如果你不能為executor至少1.5 * Reserved Memory = 450MB的堆內存,任務將會失敗並提示”please use larger heap size“的錯誤信息。
2.User Memory.這部分內存是分配Spark Memory內存之后的部分,而且這部分用來干什么完全取決於你。你可以用來存儲RDD transformations過程使用的數據結構。例如,你可以通過mapPartitions transformation 重寫Spark aggregation,mapPartitions transformations 保存hash表保證aggregation運行。這部分數據就保存在User Memory。再次強調,這是User Memory它完全由你決定存什么、如何使用,Spark完全不會管你拿這塊區域用來做什么,怎么用,也不會考慮你的代碼在這塊區域是否會導致內存溢出。
3.Spark Memory.這部分內存就是由Spark管理了。這部分內存大小的計算:(“Java Heap” – “Reserved Memory”) * spark.memory.fraction,而且在spark1.6.0版本默認大小為: (“Java Heap” – 300MB) * 0.75。例如:如果堆內存大小有4G,將有2847MB的Spark Memory,Spark Memory=(4*1024MB-300)*0.75=2847MB。這部分內存會被分成兩部分:Storage Memory和Execution Memory,而且這兩部分的邊界由spark.memory.storageFraction參數設定,默認是0.5即50%。新的內存管理模型中的優點是,這個邊界不是固定的,在內存壓力下這個邊界是可以移動的。如一個區域內存不夠用時可以從另一區域借用內存。下邊來討論如何移動及使用的:
1.Storage Memory.這部分內存即可以用來緩存spark數據也可以用來做unroll序列化數據的臨時空間。廣播變量以block的形式也存儲在這里。你奇怪的是unroll,因為你可能會說,並不需要那么多空間去unroll block使其可用——在沒有足夠內存去unroll bolock的情況下,如果得到持久化級別的允許,將直接在這部分內存unroll block。至於廣播變量,當它的持久化級別為MEMORY_AND_DISK時,就會緩存到此。
2.Execution Memory.這部分內存用於存儲執行task過程中的一些對象。例如,它可以用來shuflle map端的中間緩存,也可以用來存儲hash aggregation過程的hash table.在沒有足夠內存的時候,這部分內存支持溢室到磁盤,但是這部分內存的blocks不會被其它線程的task擠出去。
下邊我們來說一下Storage Memory 和Execution Memory之間的邊界移動。從Execution Memory的本質來看,你不能將這部分內存空間的數據擠出去,因為這部分內存的數據是用來計算的中間結果,如果計算過程找不到原來存到這的block數據任務就會失敗。但是對於Storage Memory內存就不會這樣,它只是用來緩存內存中數據,如果將里邊的block數據驅逐出去,就會更新block 元數據映射信息使用到時告知該block被移除了,要想再拿到這些數據從HDD中讀取即可(或者如果緩存級別沒有溢寫就重新計算)。
所以,我們只能Execution Memory可以向Storage Memory擠用空間,反之不可。那么當什么時候會發生Execution Memory 向Storage Memory擠用空間呢?有兩種可能:
- 只要Storage Memory有可用空間,就可以增大Execution Memory 大小,減少Storage Memory 大小。
- Storage Memory的空間大小已經超出了初始設定的大小,並且將這部分空間全部占用,在這種情況下就可以強制將從Storage Memory中移出Blocks,減少它的空間到初始大小。
反過來,在只有當Execution Memory空間有空余時,Storage Memory才可以向Execution Memory借用空間,也就是說Execution Memory只要不夠用了就可以向Storage Memory擠占空間不管Storage Memory有沒有空余,而Storage Memory只能當Execution Memory有空余時才要以借用不能搶占。
初始Storage Memory 大小:“Spark Memory” * spark.memory.storageFraction = (“Java Heap” – “Reserved Memory”) * spark.memory.fraction * spark.memory.storageFraction。根據默認值,即(“Java Heap” – 300MB) * 0.75 * 0.5 = (“Java Heap” – 300MB) * 0.375. 如果Java Heap=4G,那么就有1423.5MB大小的Storage Memory空間。
這就意味着當我們使用Spark cacheu並加載全部數據到executor中時,至少要將Storage Memory大小等於默認初始值大小。因為當Storage Memory區域還沒滿時,Execution Memory區域已經膨脹大於其初始設定大小時,我們不能強制將Execution Memory搶占的空間數據驅逐,所以最終Storage Memory會變小。
希望這篇文章可以幫你更好的理解spark新的內存管理機制,並以此來應用。