[Spark內核] 第40課:CacheManager徹底解密:CacheManager運行原理流程圖和源碼詳解


本課主題 

  • CacheManager 運行原理圖
  • CacheManager 源碼解析

 

CacheManager 運行原理圖

[下圖是CacheManager的運行原理圖]

首先 RDD 是通過 iterator 來進行計算:

  • CacheManager 會通過 BlockManager 從 Local 或者 Remote 獲取數據直接通過 RDD 的 compute 進行計算,有可能需要考慮 checkpoint;
  • 通過 BlockManager 首先從本地獲取數據,如果獲得不到數據的話會從遠程獲取數據
  • 首先檢查看當前的 RDD 是否進行了 CheckPoint ,如果進行了話就直接讀取 checkpoint 的數據,否則的話就必需進行計算;因為此時 RDD 需要緩存,所以計算如果需要通過 BlockManager 再次進行持久
  • 如果持久化的時候只是緩存到磁盤中,就直接使用 BlockManager 的 doPut 方法寫入磁盤即可(需要考慮 Replication)。
  • 如果指定了內存做緩存的話,優先保存到內存中,此時會使用MemoryStore.unrollSafely 方法來嘗試安全的將數據保存在內存中,如果內存不夠的話,會使用一個方法來整理一部份內存空間,然后基於整理出來的內存空間放入我們想緩存的最新數據;
  • 直接通過 RDD 的 compute 進行計算,有可能需要考慮 checkpoint;

  

CacheManager 源碼解析 

  1. CacheManager 管理的是緩存中的數據,緩存可以是基於內存的緩存,也可以是基於磁盤的緩存
  2. CacheManager 需要通過 BlockManager 來操作數據;
  3. 每當 Task 運行的時候會調用 RDD 的 Compute 方法進行計算,而 Compute 方法會調用 iterator 方法;
    [下圖是 MapPartitionRDD.scala 的 compute 方法]

    這個方法是 final 級別不能覆寫但可以被子類去使用,可以看見 RDD 是優先使用內存的,這個方法很關鍵!!如果存儲級別不等於 NONE 的情況下,程序會先找 CacheManager 獲得數據,否則的話會看有沒有進行 Checkpoint
    [下圖是 RDD.scala 的 iterator 方法]

    以下是 Spark 中的 StorageLevel
    [下圖是 StorageLevel.scala 的 StorageLevel 對象]

  4. Cache 在工作的時候會最大化的保留數據,但是數據不一定絕對完整,因為當前的計算如果需要內存空間的話,那么內存中的數據必需讓出空間,這是因為執行比緩存重要!此時如何在RDD 持久化的時候同時指定了可以把數據放左Disk 上,那么部份 Cache 的數據可以從內存轉入磁盤,否則的話,數據就會丟失!
    假設現在 Cache 了一百萬個數據分片,但是我下一個步驟計算的時候,我需要內存,思考題:你覺得是我現在需要的內存重要呢,還是你曾經 Cache 占用的空間重要呢?亳無疑問,肯定是現在計算重要。所以 Cache 占用的空間需要從內存中除掉,如果你程序的 StorageLevel 是 MEMEORY_AND_DISK 的話,這時候在內存可能是 Drop 到磁盤上,如果你程序的 StorageLevel 是 MEMEORY_ONLY 的話,那就會出去數據丟失的情況。
    你進行Cache時,BlockManager 會幫你進行管理,我們可以通過 Key 到 BlockManager 中找出曾經緩存的數據。
    [下圖是 CacheManager.scala 的 getOrCompute 方法]

    [下圖是 CacheManager.scala 的 getOrCompute 方法內部具體的實現]

    如果有 BlockManager.get() 方法沒有返回任何數據,就調用 acquireLockForPartition 方法,因為會有可能多條線程在操作數據,Spark 有一個東西叫慢任務StraggleTask 推遲,StraggleTask 推遲的時候一般都會運行兩個任務在兩台機器上,你可能在你當前機器上沒有發現這個內容,同時有遠程也沒有發現這個內容,只不過在你返回的那一刻,別人已經算完啦!
    [下圖是 CacheManager.scala 的 getOrCompute 方法內部具體的實現]

    [下圖是 CacheManager.scala 的 getOrCompute 方法內部具體的實現]

    最后還是通過 BlockManager.get 來獲得數據
    [下圖是 CacheManager.scala 的 acquireLockForPartition 方法]
  5. 具體 CacheManager 在獲得緩存數據的時候會通過 BlockManager 來抓到數據,優先在本地找數據或者的話就遠程抓取數據
    [下圖是 BlockManager.scala 的 get 方法]

    BlockManger.getLocal 然后轉過來調用 doGetLocal 方法,在 doGetLocal 的實現中看到緩存其實不竟竟在內存中,可以在內存、磁盤、也可以在 OffHeap (Tachyon) 中
    [下圖是 BlockManager.scala 的 getLocal 方法]
  6. 在第5步調用了 getLocal 方法后轉過調用了 doGetLocal
    [下圖是 BlockManager.scala 的 doGetLocal 方法]






  7. 在第5步中如果本地沒有緩存的話就調用 getRemote 方法從遠程抓取數據
    [下圖是 BlockManager.scala 的 getRemote 方法]


  8. 如果 CacheManager 沒有通過 BlockManager 獲得緩存內容的話,其實會通過 RDD 的 computeOrReadCheckpoint 方法來獲得數據。
    [下圖是 RDD.scala 的 computeOrReadChcekpoint 方法]

    上述首先檢查看當前的 RDD 是否進行了 Checkpoint ,如果進行了話就直接讀取 checkpoint 的數據,否則的話就必需進行計算; Checkpoint 本身很重要;計算之后通過 putInBlockManager 會把數據按照 StorageLevel 重新緩存起來
    [下圖是 CacheManager.scala 的 putInBlockManager 方法]


  9. 你如果把數據緩存在內存中,你需要注意的是內存空間夠不夠,此時會調用 memoryStore 中的 unrollSafety 方法,里面有一個循環在內存中放數據。
    [下圖是 MemoryStore.scala 中的 unrollSafely 方法]

 

 

參考資料 

資料來源來至 DT大數據夢工廠 大數據傳奇行動 第40課:CacheManager徹底解密:CacheManager運行原理流程圖和源碼詳解

Spark源碼圖片取自於 Spark 1.6.0版本

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM