本課主題
- BlockManager 運行實例
- BlockManager 原理流程圖
- BlockManager 源碼解析
引言
BlockManager 是管理整個Spark運行時的數據讀寫的,當然也包含數據存儲本身,在這個基礎之上進行讀寫操作,由於 Spark 本身是分布式的,所以 BlockManager 也是分布式的,
BlockManager 原理流程圖
[下圖是 BlockManager 原理流程圖]
BlockManager 運行實例
從 Application 啟動的角度來觀察BlockManager
- 在 Application 啟動的時候會在 spark-env.sh 中注冊 BlockMangerMaster 以及 MapOutputTracker,其中:
- BlockManagerMaster:對整集群的 Block 數據進行管理;
- MapOutputTracker:跟蹤所有的 Mapper 的輸出;
- BlockManagerMasterEndpoint 本身是一個消息體,會負責通過遠程消息通信的方式去管理所有節點的 BlockManager;
- 每個啟動一個 ExecutorBackend 都會實例化 BlockManager 並通過遠程通信的方式注冊給 BlockMangerMaster;實際上是 Executor 中的 BlockManager 注冊給 Driver 上的 BlockMangerMasterEndpoiont;(BlockManger 是 Driver 中的一個普通的對象而己,所以無法直接對一個對象做HA)
- MemoryStore 是 BlockManager 中專門負責內存數據存儲和讀寫的類,MemoryStore 是以 一個又一個 Block 為單位的
- DiskStore 是 BlockManager 中專門負責磁盤數據存儲和讀寫的類;
- DiskBlockManager:管理 LogicalBlock 與 Disk 上的 PhysicalBlock 之間的映射關聯並負責磁盤的文件的創建,讀寫等;
[下圖是 Spark-Shell 啟動時的日志信息-1]
[下圖是 Spark-Shell 啟動時的日志信息-2]
從 Job 運行的角度來觀察BlockManager
- 首先通過 MemoryStore 來存儲廣播變量
- 在 Driver 中是通過 BlockManagerInfo 來管理集群中每個 ExecutorBackend 中的 BlockManager 中的元數據信息的;
- 當改變了具體的 ExecutorBackend 上的 Block 的信息后就必需發消息給 Driver 中的 BlockManagerMaster 來更新相應的 BlockManagerInfo 的信息
- 當執行第二個 Stage 之后,第二個 Stage 會向 Driver 中的 MapOutputTrackerMasterEndpoint 發消息請求上一個 Stage 中相應的輸出,此時 MapOutputTrackerMaster 會把上一個 Stage 的輸出數據的元數據信息發送給當前請求的 Stage
[下圖是 Spark-Shell 作業運行時的日志信息-1]
[下圖是 Spark-Shell 作業運行時的日志信息-2]
BlockManager 源碼解析
- BlockManager 會運行在 driver 和 Executor 上面,在 driver 上面的 BlockManager 是負責管理整個集群所有 Executor 中的 BlockManager,BlockManager 本身也是 Master-Slave 結構的,所謂Master-Slave 結構就是一切的調度和工作都是由 Master 去觸發的,Slave本身就是專注於干活的,而 Executor 在啟動的時候,一定會實例化 BlockManager。
[下圖是 Executor.scala 中調用 blockManager.initialize 方法的實現]
[下圖是 SparkContext.scala 中調用 blockManager.initialize 方法的實現]
BlockManager主要提供了讀取和寫數據的接口,可以從本地或者是遠程讀取和寫數據,讀寫數據可以基於內存、磁盤或者是堆外空間 (OffHeap)。如果想使用 BlockManager 的話,必須調用 initialize 方法。程序進行 Shuffle 的時候是通過 BlockManager 去管理的。
[下圖是 BlockManager.scala 中 BlockManager 類] - 基於應用程序的 AppId 去初始化 BlockManager,這個 initialize 方法也會啟動 BlockTransferService 和 ShuffleClient,同時注冊 BlockManagerMaster,啟動 BlockManagerWorker endpoint,當 Executor 實例化的時候會通過 BlockManager.initialize 來實例化 Executor 上的 BlockManager 並且會創建 BlockManagerSlaveEndpoint 這個消息循環體來接受 Driver 中的 BlockManagerMaster 發過來的指令,例如刪除 Block 的指令。 當 BlockManagerSlaveEndpoint 實例化后,Executor 上的 BlockManager 需要向 Driver 上的 BlockManagerMasterEndpoint 注冊
[下圖是 BlockManager.scala 中 initialize 方法]
[下圖是 BlockManager.scala 中 slaveEndpoint 變量]
[下圖是 BlockManagerMaster.scala 中 registerBlockManager 方法]
[下圖是 BlockManagerMessage.scala 中 RegisterBlockManager case class] - 發送消息到 BlockManagerSlaveEndpoint
[下圖是 BlockManagerSlaveEndpoint.scala 中 reveiceAndReply 方法] - BlockManagerMasterEndpoint 接受到 Executor 上的注冊信息並進行處理,每一個 BlockManager 都會對應一個 BlockManagerInfo,然后通過 executorId 看看能不能找到 BlockManagerId,BlockManagerMaster 包含了集群中整個 BlockManager 注冊的信息。經過了這幾個步驟后完成了注冊的工作,這跟 Spark-Shell 啟動時的日志信息是一致的。
[下圖是 BlockManagerMasterEndpoint.scala 中 reveiceAndReply 方法]
[下圖是 BlockManagerMasterEndpoint.scala 中 register 方法]
[下圖是 BlockManagerMasterEndpoint.scala 中 blockManagerInfo 數據結構]
[下圖是 BlockManagerMasterEndpoint.scala 中 removeExecutor 方法]
[下圖是 BlockManagerMasterEndpoint.scala 中 removeBlockManager 方法] - BlockManagerMaster 只有一個 dropFromMemory 是指當我們內存不夠的話,我們嘗試釋放一些內存給要使用的應用程序。
- 當注冊本身沒有問題之后接下來的事情就把相關的功能完成
參考資料
資料來源來至 DT大數據夢工廠 大數據傳奇行動
第38課:BlockManager架構原理、運行流程圖和源碼解密
第39課:BlockManager解密進階:BlockManager初始化和注冊解密、BlockManagerMaster工作解密、BlockTransferService解密、本地數據讀寫解密、遠程數據讀寫解密
Spark源碼圖片取自於 Spark 1.6.0版本