spark 源碼分析之十六 -- Spark內存存儲剖析


上篇spark 源碼分析之十五 -- Spark內存管理剖析 講解了Spark的內存管理機制,主要是MemoryManager的內容。跟Spark的內存管理機制最密切相關的就是內存存儲,本篇文章主要介紹Spark內存存儲。

總述

跟內存存儲的相關類的關系如下:

 

 

MemoryStore是負責內存存儲的類,其依賴於BlockManager、SerializerManager、BlockInfoManager、MemoryManager。

BlockManager是BlockEvictionHandler的實現類,負責實現dropFromMemory方法,必要時從內存中把block丟掉,可能會轉儲到磁盤上。

SerializerManager是負責持久化的一個類,可以參考文章spark 源碼分析之十三 -- SerializerManager剖析做深入了解。

BlockInfoManager是一個實現了對block讀寫時的一個鎖機制,具體可以看下文。

MemoryManager 是一個內存管理器,從Spark 1.6 以后,其存儲內存池大小和執行內存池大小是可以動態擴展的。即存儲內存和執行內存必要時可以從對方內存池借用空閑內存來滿足自己的使用需求。可以參考文章 spark 源碼分析之十五 -- Spark內存管理剖析 做深入了解。

BlockInfo 保存了跟block相關的信息。

BlockId的name不同的類型有不同的格式,代表不同的block類型。

StorageLevel 表示block的存儲級別,它本身是支持序列化的。

當存儲一個集合為序列化字節數組時,失敗的結果由 PartiallySerializedBlock 返回。

當存儲一個集合為Java對象數組時,失敗的結果由 PartiallyUnrolledIterator 返回。

RedirectableOutputStream 是對另一個outputstream的包裝outputstream,負責直接將數據中轉到另一個outputstream中。

ValueHolder是一個內存中轉站,其有一個getBuilder方法可以獲取到MemoryEntryBuilder對象,該對象會負責將中轉站的數據轉換為對應的可以保存到MemStore中的MemoryEntry。

我們逐個來分析其源碼:

BlockInfo

它記錄了block 的相關信息。

level: StorageLevel 類型,代表block的存儲級別

classTag:block的對應類,用於選擇序列化類

tellMaster:block 的變化是否告知master。大部分情況下都是需要告知的,除了廣播的block。

size: block的大小(in byte)

readerCount:block 讀的次數

writerTask:當前持有該block寫鎖的taskAttemptId,其中 BlockInfo.NON_TASK_WRITER 表示非 task任務 持有鎖,比如driver線程,BlockInfo.NO_WRITER 表示沒有任何代碼持有寫鎖。

BlockId

A Block can be uniquely identified by its filename, but each type of Block has a different set of keys which produce its unique name. If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.

其子類,在上圖中已經標明。

BlockInfoManager

文檔介紹如下:

Component of the BlockManager which tracks metadata for blocks and manages block locking. The locking interface exposed by this class is readers-writer lock. Every lock acquisition is automatically associated with a running task and locks are automatically released upon task completion or failure. This class is thread-safe.

 

它有三個成員變量,如下:

infos 保存了 Block-id 和 block信息的對應關系。

writeLocksByTask 保存了每一個任務和任務持有寫鎖的block-id

readLockByTasks 保存了每一個任務和任務持有讀鎖的block-id,因為讀鎖是可重入的,所以 ConcurrentHashMultiset 是支持多個重復值的。

方法如下:

 

1. 注冊task

2. 獲取當前task

3. 獲取讀鎖

思路:如果block存在,並且沒有task在寫,則直接讀即可,否則進入鎖等待區等待。

4. 獲取寫鎖

思路:如果block存在,且沒有task在讀,也沒有task在寫,則在寫鎖map上記錄task,表示已獲取寫鎖,否則進入等待區等待

5. 斷言有task持有寫鎖寫block

6. 寫鎖降級

思路:首先把和block綁定的task取出並和當前task比較,若是同一個task,則調用unlock方法

7. 釋放鎖:

思路:若當前任務持有寫鎖,則直接釋放,否則讀取次數減1,並且從讀鎖記錄中刪除一條讀鎖記錄。最后喚醒在鎖等待區等待的task。

8. 獲取為寫一個新的block獲取寫鎖

9. 釋放掉指定task的所有鎖

思路:先獲取該task的讀寫鎖記錄,然后移除寫鎖記錄集中的每一條記錄,移除讀鎖記錄集中的每一條讀鎖記錄。

10. 移除並釋放寫鎖

讀寫鎖記錄清零,解除block-id和block信息的綁定。

 

還有一些查詢方法,不再做詳細說明。

簡單總結一下:

讀鎖支持可重入,即可以重復獲取讀鎖。可以獲取讀鎖的條件是:沒有task在寫該block,對有沒有task在讀block沒有要求。

寫鎖當且僅當一個task獲取,可以獲取寫鎖的條件是:沒有task在讀block,沒有task在寫block。

注意,這種設計可以用在一個block的讀的次數遠大於寫的次數的情況下。我們可以來做個假設:假設一個block寫的次數遠超過讀的次數,同時多個task寫同一個block的操作就變成了串行的,寫的效率,因為只有一個BlockInfoManager對象,即一個鎖,即所有在鎖等待區等待的writer們都在競爭一個鎖。對於讀的次數遠超過寫的次數情況下,reader們可以肆無忌憚地讀取數據數據,基本處於無鎖情況下,幾乎沒有了鎖切換帶來的開銷,並且可以允許不同task同時讀取同一個block的數據,讀的吞吐量也提高了。

總之,BlockInfoManager自己實現了block的一套讀寫鎖機制,這種讀寫鎖的設計思路是非常經典和值得學習的。 

 

 

RedirectableOutputStream

 

文檔說明:

A wrapper which allows an open [[OutputStream]] to be redirected to a different sink.

 

即這個類可以將outputstream重定向到另一個outputstream。

源碼也很簡單:

os成員變量就是重定向的目標outputstream

 

MemoryEntry

memoryEntry本質上就是內存中一個block,指向了存儲在內存中的真實數據。

如上圖,它有兩個子類:

其中,DeserializedMemoryEntry 是用來保存反序列化之后的java對象數組的,value是一個數據,保存着真實的反序列化數據,size表示,classTag記錄着數組中被擦除的數據的Class類型,這種數據只能保存在堆內內存中。

SerializedMemoryEntry 是用來保存序列化之后的ByteBuffer數組的,buffer中記錄的是真實的Array[ByteBuffer]數據。memoryMode表示數據存儲的內存區域,堆外內存還是堆內內存,classTag記錄着序列化前被擦除的數據的Class類型,size表示字節數據大小。

 

MemoryEntryBuilder

build方法將內存數據構建到MemoryEntry中

 

ValuesHolder

本質上來說,就是一個內存中轉站。數據被臨時寫入到這個中轉站,然后調用其getBuilder方法獲取 MemoryEntryBuilder 對象,這個對象用於構建MemoryEntry 對象。

storeValues用於寫入數據,estimateSize用於評估holder中內存的大小。調用getBuilder之后會返回 MemoryEntryBuilder對象,后續可以拿這個builder創建MemoryEntry

調用getBuilder之后,會關閉流,禁止數據寫入。

它有兩個子類:用於中轉Java對象的DeserializedValuesHolder和用於中轉字節數據的SerializedValuesHolder。

其實現類具體如下:

1. DeserializedValuesHolder

2. SerializedValuesHolder

 

 

接下來,我們看一下Spark內存存儲中的重頭戲 -- MemoryStore

MemoryStore

 

文檔說明:

Stores blocks in memory, either as Arrays of deserialized Java objects or as serialized ByteBuffers.

 

類內部結構如下:

對成員變量的說明:

entries 本質上就是在內存中保存blockId和block內容的一個map,它的 accessOrder為true,即最近訪問的會被移動到鏈表尾部。

onHeapUnrollMemoryMap 記錄了taskAttemptId和需要攤開一個block需要的堆內內存大小的關系

offHeapUnrollMemoryMap 記錄了taskAttemptId和需要攤開一個block需要的堆外內存大小的關系

unrollMemoryThreshold 表示在攤開一個block 之前給request分配的初始內存,可以通過 spark.storage.unrollMemoryThreshold 來調整,默認是 1MB

 

下面,開門見山,直接剖析比較重要的方法:

1. putBytes:這個方法只被BlockManager調用,其中_bytes回調用於生成直接被緩存的ChunkedByteBuffer:

思路:先從MemoryManager中申請內存,如果申請成功,則調用回調方法 _bytes 獲取ChunkedByteBuffer數據,然后封裝成 SerializedMemoryEntry對象 ,最后將封裝好的SerializedMemoryEntry對象緩存到 entries中。

2. 把迭代器中值保存為內存中的Java對象

思路:轉換為DeserializedValueHolder對象,進而調用putIterator方法,ValueHolder就是一個抽象,使得putIterator既可以緩存序列化的字節數據又可以緩存Java對象數組。

3. 把迭代器中值保存為內存中的序列化字節數據

思路:轉換為 SerializedValueHolder 對象,進而調用putIterator方法。

MAX_ROUND_ARRARY_LENGTH和unrollMemoryThreshold的定義如下:

1 public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;
2 private val unrollMemoryThreshold: Long = conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)

 

unrollMemoryThreshold 默認是 1MB,可以通過 spark.storage.unrollMemoryThreshold 參數調整大小。

4. putIterator方法由參數ValueHolder,使得緩存字節數據和Java對象可以放到一個方法來。 方法2跟3 都調用了 putIterator 方法,如下:

思路:

第一步:定義攤開內存初始化大小,攤開內存增長率,攤開內存檢查頻率等變量。

第二步:向MemoryManager請求申請攤開初始內存,若成功,則記錄這筆攤開內存。

第三步:然后進入223~240行的while循環,在這個循環里:

  • 循環條件:如果還有值需要攤開並且上次內存申請是成功的,則繼續進行該次循環
  • 不斷想ValueHolder中add數據。如果攤開的元素個數不是UNROLL_MEMORY_CHECK_PERIOD的整數倍,則攤開個數加1;否則,查看ValueHolder中的內存是否大於了已分配內存,若大於,則請求MemoryManager分配內存,並將分配的內存累加到已分配內存中。

第四步:

若上一次向MemoryManager申請內存成功,則從ValueHolder中獲取builder,並且計算准確內存開銷。查看准確內存是否大於了已分配內存,若大於,則請求MemoryManager分配內存,並將分配的內存累加到已分配內存中。

否則,否則打印內存使用情況,返回為攤開該block申請的內存

第五步:

若上一次向MemoryManager申請內存成功,首先調用MemoryEntryBuilder的build方法構建出可以直接存入內存的MemoryEntry,並向MemoryManager請求釋放攤開內存,申請存儲內存,並確保存儲內存申請成功。最后將數據存入內存的entries中。 

否則打印內存使用情況,返回為攤開該block申請的內存

其實之前不是很理解unroll這個詞在這里的含義,一直譯作攤開,它其實指的就是集合的數據轉儲到中轉站這個操作,攤開內存指這個操作需要的內存。

下面來看一下這個方法里面依賴的常量和方法: 

4. 1 unrollMemoryThreshold 在上一個方法已做說明。UNROLL_MEMORY_CHECK_PERIOD 和 UNROLL_MEMORY_GROWTH_FACTOR 常量定義如下:

即,UNROLL_MEMORY_CHECK_PERIOD默認是16,UNROLL_MEMORY_GROWTH_FACTOR 默認是 1.5

4.2 reserveUnrollMemoryForThisTask方法源碼如下,思路大致上是先從MemoryManager 申請攤開內存,若成功,則根據memoryMode在堆內或堆外記錄攤開內存的map上記錄新分配的內存。

4.3 releaseUnrollMemoryForThisTask方法如下,實現思路:先根據memoryMode獲取到對應記錄堆內或堆外內存的使用情況的map,然后在該task的攤開內存上減去這筆內存開銷,如果減完之后,task使用內存為0,則直接從map中移除對該task的內存記錄。

4.4 日志打印block攤開內存和當前內存使用情況

 

5. 獲取緩存的值:

思路:直接根據blockId從entries中取出MemoryEntry數據,然后根據MemoryEntry類型取出數據即可。

 

6. 移除Block或清除緩存,比較簡單,不做過多說明:

 

7. 嘗試驅逐block來釋放指定大小的內存空間來存儲給定的block,代碼如下:

 

該方法有三個參數:要分配內存的blockId,block大小,內存類型(堆內還是堆外)。

第 469~485 行:dropBlock 方法思路: 先從MemoryEntry中獲取data,再調用 BlockManager從內存中驅逐出該block,如果該block 的StorageLevel允許落地到磁盤,則先落到磁盤,再從內存中刪除之,最后更新該block的StorageLevel,最后檢查新的StorageLevel,若該block還在內存或磁盤中,則釋放鎖,否則,直接從BlockInfoManager中刪除之。

第 443 行: 找到block對應的rdd。

第451~467 行:先給entries上鎖,然后遍歷entries集合,檢查block 是否可以從內存中驅逐,若可以則把它加入到selectedBlocks集合中,並把該block大小累加到freedMemory中。

461行的 lockForWriting 方法,不堵塞,即如果第一次拿不到寫鎖,則一直不停地輪詢,直到可以拿到寫鎖為止。那么問題來了,為什么要先獲取寫鎖呢?因為寫鎖具有排他性並且不具備可重入性,一旦拿到寫鎖,其他鎖就不能再訪問該block了。

487行~ 528 行:若計划要釋放的內存小於存儲新block需要的內存大小,則直接釋放寫鎖,不從內存中驅逐之前選擇的block,直接返回。

若計划要釋放的內存不小於存儲新block需要的內存大小,則遍歷之前選擇的每一個block,獲取entry,並調用dropMemory方法,返回釋放的內存大小。finally 代碼塊是防止在dropMemory過程中,該線程被中斷,其余block寫鎖不能被釋放的情況。

其依賴的方法如下:

存儲內存失敗之后,會返回 PartiallySerializedBlock 或者 PartiallyUnrolledIterator。

PartiallyUnrolledIterator 是一個Iterator,可以用來遍歷block數據,同時負責釋放攤開內存。

PartiallySerializedBlock 它可以將失敗的block轉化成 PartiallyUnrolledIterator 用來遍歷,可以直接丟棄失敗的block,也可以把數據轉儲到給定的可以落地的outputstream中,同時釋放攤開內存。

 

總結:

本篇文章主要講解了Spark的內存存儲相關的內容,重點講解了BlockInfoManager實現的鎖機制、跟ValuesHolder中轉站相關的MemoryEntry、EmmoryEntryBuilder等相關內容以及內存存儲中的重頭戲 -- MemStore相關的Block存儲、Block釋放、為新Block驅逐內存等等功能。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM