[Spark內核] 第38課:BlockManager架構原理、運行流程圖和源碼解密


本課主題

  • BlockManager 運行實例
  • BlockManager 原理流程圖
  • BlockManager 源碼解析

 

引言

BlockManager 是管理整個Spark運行時的數據讀寫的,當然也包含數據存儲本身,在這個基礎之上進行讀寫操作,由於 Spark 本身是分布式的,所以 BlockManager 也是分布式的,

 

BlockManager 原理流程圖

[下圖是 BlockManager 原理流程圖]

 

BlockManager 運行實例

從 Application 啟動的角度來觀察BlockManager

  1. 在 Application 啟動的時候會在 spark-env.sh 中注冊 BlockMangerMaster 以及 MapOutputTracker,其中:
    • BlockManagerMaster:對整集群的 Block 數據進行管理;
    • MapOutputTracker:跟蹤所有的 Mapper 的輸出;
  2. BlockManagerMasterEndpoint 本身是一個消息體,會負責通過遠程消息通信的方式去管理所有節點的 BlockManager;
  3. 每個啟動一個 ExecutorBackend 都會實例化 BlockManager 並通過遠程通信的方式注冊給 BlockMangerMaster;實際上是 Executor 中的 BlockManager 注冊給 Driver 上的 BlockMangerMasterEndpoiont;(BlockManger 是 Driver 中的一個普通的對象而己,所以無法直接對一個對象做HA)
  4. MemoryStore 是 BlockManager 中專門負責內存數據存儲和讀寫的類,MemoryStore 是以 一個又一個 Block 為單位的
  5. DiskStore 是 BlockManager 中專門負責磁盤數據存儲和讀寫的類;
  6. DiskBlockManager:管理 LogicalBlock 與 Disk 上的 PhysicalBlock 之間的映射關聯並負責磁盤的文件的創建,讀寫等;

[下圖是 Spark-Shell 啟動時的日志信息-1]


[下圖是 Spark-Shell 啟動時的日志信息-2]

 

從 Job 運行的角度來觀察BlockManager

  1. 首先通過 MemoryStore 來存儲廣播變量
  2. 在 Driver 中是通過 BlockManagerInfo 來管理集群中每個 ExecutorBackend 中的 BlockManager 中的元數據信息的;
  3. 當改變了具體的 ExecutorBackend 上的 Block 的信息后就必需發消息給 Driver 中的 BlockManagerMaster 來更新相應的 BlockManagerInfo 的信息
  4. 當執行第二個 Stage 之后,第二個 Stage 會向 Driver 中的 MapOutputTrackerMasterEndpoint 發消息請求上一個 Stage 中相應的輸出,此時 MapOutputTrackerMaster 會把上一個 Stage 的輸出數據的元數據信息發送給當前請求的 Stage  

[下圖是 Spark-Shell 作業運行時的日志信息-1]

[下圖是 Spark-Shell 作業運行時的日志信息-2]

 

BlockManager 源碼解析

  1. BlockManager 會運行在 driver 和 Executor 上面,在 driver 上面的 BlockManager 是負責管理整個集群所有 Executor 中的 BlockManager,BlockManager 本身也是 Master-Slave 結構的,所謂Master-Slave 結構就是一切的調度和工作都是由 Master 去觸發的,Slave本身就是專注於干活的,而 Executor 在啟動的時候,一定會實例化 BlockManager。
    [下圖是 Executor.scala 中調用 blockManager.initialize 方法的實現]

    [下圖是 SparkContext.scala 中調用 blockManager.initialize 方法的實現]

    BlockManager主要提供了讀取和寫數據的接口,可以從本地或者是遠程讀取和寫數據,讀寫數據可以基於內存、磁盤或者是堆外空間 (OffHeap)。如果想使用 BlockManager 的話,必須調用 initialize 方法。
    程序進行 Shuffle 的時候是通過 BlockManager 去管理的。
    [下圖是 BlockManager.scala 中 BlockManager 類]
  2. 基於應用程序的 AppId 去初始化 BlockManager,這個 initialize 方法也會啟動 BlockTransferService 和 ShuffleClient,同時注冊 BlockManagerMaster,啟動 BlockManagerWorker endpoint,當 Executor 實例化的時候會通過 BlockManager.initialize 來實例化 Executor 上的 BlockManager 並且會創建 BlockManagerSlaveEndpoint 這個消息循環體來接受 Driver 中的 BlockManagerMaster 發過來的指令,例如刪除 Block 的指令。 當 BlockManagerSlaveEndpoint 實例化后,Executor 上的 BlockManager 需要向 Driver 上的 BlockManagerMasterEndpoint 注冊
    [下圖是 BlockManager.scala 中 initialize 方法]

    [下圖是 BlockManager.scala 中 slaveEndpoint 變量]

    [下圖是 BlockManagerMaster.scala 中 registerBlockManager 方法]

    [下圖是 BlockManagerMessage.scala 中 RegisterBlockManager case class]

  3. 發送消息到 BlockManagerSlaveEndpoint
    [下圖是 BlockManagerSlaveEndpoint.scala 中 reveiceAndReply 方法]

  4. BlockManagerMasterEndpoint 接受到 Executor 上的注冊信息並進行處理,每一個 BlockManager 都會對應一個 BlockManagerInfo,然后通過 executorId 看看能不能找到 BlockManagerId,BlockManagerMaster 包含了集群中整個 BlockManager 注冊的信息。經過了這幾個步驟后完成了注冊的工作,這跟 Spark-Shell 啟動時的日志信息是一致的。
    [下圖是 BlockManagerMasterEndpoint.scala 中 reveiceAndReply 方法]

    [下圖是 BlockManagerMasterEndpoint.scala 中 register 方法]

    [下圖是 BlockManagerMasterEndpoint.scala 中 blockManagerInfo 數據結構]

    [下圖是 BlockManagerMasterEndpoint.scala 中 removeExecutor 方法]

    [下圖是 BlockManagerMasterEndpoint.scala 中 removeBlockManager 方法]

  5. BlockManagerMaster 只有一個 dropFromMemory 是指當我們內存不夠的話,我們嘗試釋放一些內存給要使用的應用程序。
  6. 當注冊本身沒有問題之后接下來的事情就把相關的功能完成

 

參考資料 

資料來源來至 DT大數據夢工廠 大數據傳奇行動 
第38課:BlockManager架構原理、運行流程圖和源碼解密
第39課:BlockManager解密進階:BlockManager初始化和注冊解密、BlockManagerMaster工作解密、BlockTransferService解密、本地數據讀寫解密、遠程數據讀寫解密

Spark源碼圖片取自於 Spark 1.6.0版本

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM