Apache Spark源碼走讀之6 -- 存儲子系統分析


歡迎轉載,轉載請注明出處,徽滬一郎。

楔子

Spark計算速度遠勝於Hadoop的原因之一就在於中間結果是緩存在內存而不是直接寫入到disk,本文嘗試分析Spark中存儲子系統的構成,並以數據寫入和數據讀取為例,講述清楚存儲子系統中各部件的交互關系。

存儲子系統概覽

上圖是Spark存儲子系統中幾個主要模塊的關系示意圖,現簡要說明如下

  • CacheManager  RDD在進行計算的時候,通過CacheManager來獲取數據,並通過CacheManager來存儲計算結果
  • BlockManager   CacheManager在進行數據讀取和存取的時候主要是依賴BlockManager接口來操作,BlockManager決定數據是從內存(MemoryStore)還是從磁盤(DiskStore)中獲取
  • MemoryStore   負責將數據保存在內存或從內存讀取
  • DiskStore        負責將數據寫入磁盤或從磁盤讀入
  • BlockManagerWorker  數據寫入本地的MemoryStore或DiskStore是一個同步操作,為了容錯還需要將數據復制到別的計算結點,以防止數據丟失的時候還能夠恢復,數據復制的操作是異步完成,由BlockManagerWorker來處理這一部分事情
  • ConnectionManager 負責與其它計算結點建立連接,並負責數據的發送和接收
  • BlockManagerMaster 注意該模塊只運行在Driver Application所在的Executor,功能是負責記錄下所有BlockIds存儲在哪個SlaveWorker上,比如RDD Task運行在機器A,所需要的BlockId為3,但在機器A上沒有BlockId為3的數值,這個時候Slave worker需要通過BlockManager向BlockManagerMaster詢問數據存儲的位置,然后再通過ConnectionManager去獲取,具體參看“數據遠程獲取一節”

支持的操作

由於BlockManager起到實際的存儲管控作用,所以在講支持的操作的時候,以BlockManager中的public api為例

  • put  數據寫入
  • get      數據讀取
  • remoteRDD 數據刪除,一旦整個job完成,所有的中間計算結果都可以刪除

啟動過程分析

上述的各個模塊由SparkEnv來創建,創建過程在SparkEnv.create中完成

    val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
      "BlockManagerMaster",
      new BlockManagerMasterActor(isLocal, conf)), conf)
    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)

    val connectionManager = blockManager.connectionManager
    val broadcastManager = new BroadcastManager(isDriver, conf)
    val cacheManager = new CacheManager(blockManager)

這段代碼容易讓人疑惑,看起來像是在所有的cluster node上都創建了BlockManagerMasterActor,其實不然,仔細看registerOrLookup函數的實現。如果當前節點是driver則創建這個actor,否則建立到driver的連接。

    def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
      if (isDriver) {
        logInfo("Registering " + name)
        actorSystem.actorOf(Props(newActor), name = name)
      } else {
        val driverHost: String = conf.get("spark.driver.host", "localhost")
        val driverPort: Int = conf.getInt("spark.driver.port", 7077)
        Utils.checkHost(driverHost, "Expected hostname")
        val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
        val timeout = AkkaUtils.lookupTimeout(conf)
        logInfo(s"Connecting to $name: $url")
        Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
      }
    }

初始化過程中一個主要的動作就是BlockManager需要向BlockManagerMaster發起注冊

數據寫入過程分析

數據寫入的簡要流程

  1. RDD.iterator是與storage子系統交互的入口
  2. CacheManager.getOrCompute調用BlockManager的put接口來寫入數據
  3. 數據優先寫入到MemoryStore即內存,如果MemoryStore中的數據已滿則將最近使用次數不頻繁的數據寫入到磁盤
  4. 通知BlockManagerMaster有新的數據寫入,在BlockManagerMaster中保存元數據
  5. 將寫入的數據與其它slave worker進行同步,一般來說在本機寫入的數據,都會另先一台機器來進行數據的備份,即replicanumber=1

序列化與否

寫入的具體內容可以是序列化之后的bytes也可以是沒有序列化的value. 此處有一個對scala的語法中Either, Left, Right關鍵字的理解。

數據讀取過程分析

 def get(blockId: BlockId): Option[Iterator[Any]] = {
    val local = getLocal(blockId)
    if (local.isDefined) {
      logInfo("Found block %s locally".format(blockId))
      return local
    }
    val remote = getRemote(blockId)
    if (remote.isDefined) {
      logInfo("Found block %s remotely".format(blockId))
      return remote
    }
    None
  }

本地讀取

首先在查詢本機的MemoryStore和DiskStore中是否有所需要的block數據存在,如果沒有則發起遠程數據獲取。

遠程讀取

遠程獲取調用路徑, getRemote->doGetRemote, 在doGetRemote中最主要的就是調用BlockManagerWorker.syncGetBlock來從遠程獲得數據

def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
    val blockManager = blockManagerWorker.blockManager
    val connectionManager = blockManager.connectionManager
    val blockMessage = BlockMessage.fromGetBlock(msg)
    val blockMessageArray = new BlockMessageArray(blockMessage)
    val responseMessage = connectionManager.sendMessageReliablySync(
        toConnManagerId, blockMessageArray.toBufferMessage)
    responseMessage match {
      case Some(message) => {
        val bufferMessage = message.asInstanceOf[BufferMessage]
        logDebug("Response message received " + bufferMessage)
        BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => {
            logDebug("Found " + blockMessage)
            return blockMessage.getData
          })
      }
      case None => logDebug("No response message received")
    }
    null
  }

上述這段代碼中最有意思的莫過於sendMessageReliablySync,遠程數據讀取毫無疑問是一個異步i/o操作,這里的代碼怎么寫起來就像是在進行同步的操作一樣呢。也就是說如何知道對方發送回來響應的呢?

別急,繼續去看看sendMessageReliablySync的定義

def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
      : Future[Option[Message]] = {
    val promise = Promise[Option[Message]]
    val status = new MessageStatus(
      message, connectionManagerId, s => promise.success(s.ackMessage))
    messageStatuses.synchronized {
      messageStatuses += ((message.id, status))
    }
    sendMessage(connectionManagerId, message)
    promise.future
  }

要是我說秘密在這里,你肯定會說我在扯淡,但確實在此處。注意到關鍵字Promise和Future沒。

如果這個future執行完畢,返回s.ackMessage。我們再看看這個ackMessage是在什么地方被寫入的呢。看一看ConnectionManager.handleMessage中的代碼片段


case bufferMessage: BufferMessage => {
        if (authEnabled) {
          val res = handleAuthentication(connection, bufferMessage)
          if (res == true) {
            // message was security negotiation so skip the rest
            logDebug("After handleAuth result was true, returning")
            return
          }
        }
        if (bufferMessage.hasAckId) {
          val sentMessageStatus = messageStatuses.synchronized {
            messageStatuses.get(bufferMessage.ackId) match {
              case Some(status) => {
                messageStatuses -= bufferMessage.ackId
                status
              }
              case None => {
                throw new Exception("Could not find reference for received ack message " +
                  message.id)
                null
              }
            }
          }
          sentMessageStatus.synchronized {
            sentMessageStatus.ackMessage = Some(message)
            sentMessageStatus.attempted = true
            sentMessageStatus.acked = true
            sentMessageStaus.markDone()
          }

注意,此處的所調用的sentMessageStatus.markDone就會調用在sendMessageReliablySync中定義的promise.Success. 不妨看看MessageStatus的定義。

 class MessageStatus(
      val message: Message,
      val connectionManagerId: ConnectionManagerId,
      completionHandler: MessageStatus => Unit) {

    var ackMessage: Option[Message] = None
    var attempted = false
    var acked = false

    def markDone() { completionHandler(this) }
  }

我想至此調用關系搞清楚了,scala中的Future和Promise理解起來還有有點費勁。

TachyonStore

 在Spark的最新源碼中,Storage子系統引入了TachyonStore. TachyonStore是在內存中實現了hdfs文件系統的接口,主要目的就是盡可能的利用內存來作為數據持久層,避免過多的磁盤讀寫操作。

有關該模塊的功能介紹,可以參考http://www.meetup.com/spark-users/events/117307472/

小結

一點點疑問,目前在Spark的存儲子系統中,通信模塊里傳遞的數據即有“心跳檢測消息”,“數據同步的消息”又有“數據獲取之類的信息流”。如果可能的話,要將心跳檢測與數據同步即數據獲取所使用的網卡分離以提高可靠性。

參考資料

  1. Spark源碼分析之-Storage模塊 http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/

  2. Tachyon  http://www.slideshare.net/rxin/a-tachyon-2013-0509sparkmeetup?qid=39ee582d-e0bf-41d2-ab01-dc2439abc626&v=default&b=&from_search=2

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM