《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(伯篇)——執行環境與元數據清理器


《深入理解Spark:核心思想與源碼分析》一書前言的內容請看鏈接《深入理解SPARK:核心思想與源碼分析》一書正式出版上市

《深入理解Spark:核心思想與源碼分析》一書第一章的內容請看鏈接《第1章 環境准備》

《深入理解Spark:核心思想與源碼分析》一書第二章的內容請看鏈接《第2章 SPARK設計理念與基本架構》

由於本書的第3章內容較多,所以打算分別開辟四篇隨筆分別展現。本文展現第3章第一部分的內容:

第3章 SparkContext的初始化

“道生一,一生二,二生三,三生萬物。”

——《道德經》

n 本章導讀:

  SparkContext的初始化是Driver應用程序提交執行的前提,本章內容以local模式為主,並按照代碼執行順序講解,這將有助於首次接觸Spark的讀者理解源碼。讀者朋友如果能邊跟蹤代碼,邊學習本章內容,也許是快速理解SparkContext初始化過程的便捷途徑。已經熟練使用Spark的開發人員可以選擇跳過本章內容。

  本章將在介紹SparkContext初始化過程的同時,向讀者介紹各個組件的作用,為閱讀后面的章節打好基礎。Spark中的組件很多,就其功能而言涉及到網絡通信、分布式、消息、存儲、計算、緩存、測量、清理、文件服務、Web UI的方方面面。

 

3.1 SparkContext概述

  Spark Driver用於提交用戶應用程序,實際可以看作Spark的客戶端。了解Spark Driver的初始化,有助於讀者理解用戶應用程序在客戶端的處理過程。

  Spark Driver的初始化始終圍繞着SparkContext的初始化。SparkContext可以算得上是所有Spark應用程序的發動機引擎,轎車要想跑起來,發動機首先要啟動。SparkContext初始化完畢,才能向Spark集群提交任務。在平坦的公路上,發動機只需以較低的轉速,較低的功率就可以游刃有余;在山區,你可能需要一台能夠提供大功率的發動機,這樣才能滿足你轉山的體驗。這些參數都是通過駕駛員操作油門、檔位等傳送給發動機的,而SparkContext的配置參數則由SparkConf負責,SparkConf就是你的操作面板。

SparkConf的構造很簡單,主要是通過ConcurrentHashMap來維護各種Spark的配置屬性。SparkConf代碼結構見代碼清單3-1。Spark的配置屬性都是以“spark.”開頭的字符串。

 

代碼清單3-1  SparkConf代碼結構

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
  import SparkConf._
  def this() = this(true)
  private val settings = new ConcurrentHashMap[String, String]()
  if (loadDefaults) {
    // 加載任何以spark.開頭的系統屬性
    for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
      set(key, value)
    }
  }
//其余代碼省略

 

現在開始介紹SparkContext,SparkContext的初始化步驟如下:

1)         創建Spark執行環境SparkEnv;

2)         創建RDD清理器metadataCleaner;

3)         創建並初始化Spark UI;

4)         Hadoop相關配置及Executor環境變量的設置

5)         創建任務調度TaskScheduler;

6)         創建和啟動DAGScheduler;

7)         TaskScheduler的啟動;

8)         初始化塊管理器BlockManager(BlockManager是存儲體系的主要組件之一,將在第4章介紹);

9)         啟動測量系統MetricsSystem;

10)     創建和啟動Executor分配管理器ExecutorAllocationManager;

11)     ContextCleaner的創建與啟動;

12)     Spark環境更新;

13)     創建DAGSchedulerSource和BlockManagerSource;

14)     將SparkContext標記為激活。

 

SparkContext的主構造器參數為SparkConf,其實現如下。

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
private val creationSite: CallSite = Utils.getCallSite()
  private val allowMultipleContexts: Boolean =
    config.getBoolean("spark.driver.allowMultipleContexts", false)
  SparkContext.markPartiallyConstructed(this, allowMultipleContexts)

上面代碼中的CallSite存儲了線程棧中最靠近棧頂的用戶類及最靠近棧底的Scala或者Spark核心類信息。Utils.getCallSite的詳細信息見附錄A。SparkContext默認只有一個實例(由屬性spark.driver.allowMultipleContexts來控制,用戶需要多個SparkContext實例時,可以將其設置為true),方法markPartiallyConstructed用來確保實例的唯一性,並將當前SparkContext標記為正在構建中。

  接下來會對SparkConf進行拷貝,然后對各種配置信息進行校驗,代碼如下。

  private[spark] val conf = config.clone()
  conf.validateSettings()

  if (!conf.contains("spark.master")) {
    throw new SparkException("A master URL must be set in your configuration")
  }
  if (!conf.contains("spark.app.name")) {
    throw new SparkException("An application name must be set in your configuration")
  }

 

從上面校驗的代碼看到必須指定屬性spark.master 和spark.app.name,否則會拋出異常,結束初始化過程。spark.master用於設置部署模式,spark.app.name指定應用程序名稱。

3.2 創建執行環境SparkEnv

       SparkEnv.createDriverEnv方法有三個參數,conf、isLocal和 listenerBus。

  val isLocal = (master == "local" || master.startsWith("local["))
  private[spark] val listenerBus = new LiveListenerBus
  conf.set("spark.executor.id", "driver")

  private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
  SparkEnv.set(env)

  

上面代碼中的conf是對SparkConf的拷貝,isLocal標識是否是單機模式,listenerBus采用監聽器模式維護各類事件的處理,在3.14節會詳細介紹。

SparkEnv的方法createDriverEnv最終調用create創建SparkEnv。SparkEnv的構造步驟如下:

1)         創建安全管理器SecurityManager;

2)         創建基於Akka的分布式消息系統ActorSystem;

3)         創建Map任務輸出跟蹤器mapOutputTracker;

4)         實例化ShuffleManager;

5)         創建ShuffleMemoryManager;

6)         創建塊傳輸服務BlockTransferService;

7)         創建BlockManagerMaster;

8)         創建塊管理器BlockManager;

9)         創建廣播管理器BroadcastManager;

10)     創建緩存管理器CacheManager;

11)     創建HTTP文件服務器HttpFileServer;

12)     創建測量系統MetricsSystem;

13)     創建SparkEnv;

 

3.2.1 安全管理器SecurityManager

  SecurityManager主要對權限、賬號進行設置,如果使用Hadoop YARN作為集群管理器,則需要使用證書生成 secret key登錄,最后給當前系統設置默認的口令認證實例,此實例采用匿名內部類實現,參見代碼清單3-2。

代碼清單3-2  SecurityManager的實現

  private val secretKey = generateSecretKey()

  // 使用HTTP連接設置口令認證
  if (authOn) {
    Authenticator.setDefault(
      new Authenticator() {
        override def getPasswordAuthentication(): PasswordAuthentication = {
          var passAuth: PasswordAuthentication = null
          val userInfo = getRequestingURL().getUserInfo()
          if (userInfo != null) {
            val  parts = userInfo.split(":", 2)
            passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
          }
          return passAuth
        }
      }
    )
  }

   

3.2.2 基於Akka的分布式消息系統ActorSystem

  ActorSystem是Spark中最基礎的設施,Spark既使用它發送分布式消息,又用它實現並發編程。怎么,消息系統可以實現並發?要解釋清楚這個問題,首先應該簡單的介紹下Scala語言的Actor並發編程模型:Scala認為Java線程通過共享數據以及通過鎖來維護共享數據的一致性是糟糕的做法,容易引起鎖的爭用,降低並發程序的性能,甚至會引入死鎖的問題。在Scala中只需要自定義類型繼承Actor,並且提供act方法,就如同Java里實現Runnable接口,需要實現run方法一樣。但是不能直接調用act方法,而是通過發送消息的方式(Scala發送消息是異步的),傳遞數據。如:

  Actor ! message

  Akka是Actor編程模型的高級類庫,類似於JDK 1.5之后越來越豐富的並發工具包,簡化了程序員並發編程的難度。ActorSystem便是Akka提供的用於創建分布式消息通信系統的基礎類。Akka的具體信息見附錄B。

  正式因為Actor輕量級的並發編程、消息發送以及ActorSystem支持分布式消息發送等特點,Spark選擇了ActorSystem。

  SparkEnv中創建ActorSystem時用到了AkkaUtils工具類,見代碼清單3-3。AkkaUtils.createActorSystem方法用於啟動ActorSystem,見代碼清單3-4。AkkaUtils使用了Utils的靜態方法startServiceOnPort, startServiceOnPort最終會回調方法startService: Int => (T, Int),此處的startService實際是方法doCreateActorSystem。真正啟動ActorSystem是由doCreateActorSystem方法完成的,doCreateActorSystem的具體實現細節請見附錄B。Spark的Driver中Akka的默認訪問地址是akka://sparkDriver,Spark的Executor中Akka的默認訪問地址是akka:// sparkExecutor。如果不指定ActorSystem的端口,那么所有節點的ActorSystem端口在每次啟動時隨機產生。關於startServiceOnPort的實現,請見附錄A。

代碼清單3-3  ActorSystem的創建和啟動

    val (actorSystem, boundPort) =
      Option(defaultActorSystem) match {
        case Some(as) => (as, port)
        case None =>
          val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
          AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
      }

  

代碼清單3-4  ActorSystem的創建和啟動

def createActorSystem(
      name: String,
      host: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager): (ActorSystem, Int) = {
    val startService: Int => (ActorSystem, Int) = { actualPort =>
      doCreateActorSystem(name, host, actualPort, conf, securityManager)
    }
    Utils.startServiceOnPort(port, startService, conf, name)
  }

  

3.2.3 map任務輸出跟蹤器mapOutputTracker

  mapOutputTracker用於跟蹤map階段任務的輸出狀態,此狀態便於reduce階段任務獲取地址及中間輸出結果。每個map任務或者reduce任務都會有其唯一標識,分別為mapId和reduceId。每個reduce任務的輸入可能是多個map任務的輸出,reduce會到各個map任務的所在節點上拉取Block,這一過程叫做shuffle。每批shuffle過程都有唯一的標識shuffleId。

  這里先介紹下MapOutputTrackerMaster。MapOutputTrackerMaster內部使用mapStatuses:TimeStampedHashMap[Int, Array[MapStatus]]來維護跟蹤各個map任務的輸出狀態。其中key對應shuffleId,Array存儲各個map任務對應的狀態信息MapStatus。由於MapStatus維護了map輸出Block的地址BlockManagerId,所以reduce任務知道從何處獲取map任務的中間輸出。MapOutputTrackerMaster還使用cachedSerializedStatuses:TimeStampedHashMap[Int, Array[Byte]]維護序列化后的各個map任務的輸出狀態。其中key對應shuffleId,Array存儲各個序列化MapStatus生成的字節數組。

  Driver和Executor處理MapOutputTrackerMaster的方式有所不同:

如果當前應用程序是Driver,則創建MapOutputTrackerMaster,然后創建MapOutputTrackerMasterActor,並且注冊到ActorSystem中。

如果當前應用程序是Executor,則創建MapOutputTrackerWorker,並從ActorSystem中找到MapOutputTrackerMasterActor。

無論是Driver還是Executor,最后都由mapOutputTracker的屬性trackerActor持有MapOutputTrackerMasterActor的引用,參見代碼清單3-5。

代碼清單3-5  registerOrLookup方法用於查找或者注冊Actor的實現

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
      if (isDriver) {
        logInfo("Registering " + name)
        actorSystem.actorOf(Props(newActor), name = name)
      } else {
        AkkaUtils.makeDriverRef(name, conf, actorSystem)
      }
    }

    val mapOutputTracker =  if (isDriver) {
      new MapOutputTrackerMaster(conf)
    } else {
      new MapOutputTrackerWorker(conf)
}

    mapOutputTracker.trackerActor = registerOrLookup(
      "MapOutputTracker",
      new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

  

在后面章節大家會知道map任務的狀態正是由Executor向持有的MapOutputTrackerMasterActor發送消息,將map任務狀態同步到mapOutputTracker的mapStatuses和cachedSerializedStatuses的。Executor究竟是如何找到MapOutputTrackerMasterActor的?registerOrLookup方法通過調用AkkaUtils.makeDriverRef找到MapOutputTrackerMasterActor,實際正是利用ActorSystem提供的分布式消息機制實現的,具體細節參見附錄B。這里第一次使用到了Akka提供的功能,以后大家會漸漸感覺到使用Akka的便捷。

 

3.2.4 實例化ShuffleManager

  ShuffleManager負責管理本地及遠程的block數據的shuffle操作。ShuffleManager默認為通過反射方式生成的SortShuffleManager的實例,可以指定屬性spark.shuffle.manager來顯示控制使用HashShuffleManager。SortShuffleManager通過持有的IndexShuffleBlockManager間接操作BlockManager中的DiskBlockManager將map結果寫入本地,並根據shuffleId、mapId寫入索引文件,也能通過MapOutputTrackerMaster中維護的mapStatuses從本地或者其他遠程節點讀取文件。有讀者可能會問,為什么需要shuffle?Spark作為並行計算框架,同一個作業會被划分為多個任務在多個節點上並行執行,reduce的輸入可能存在於多個節點上,因此需要通過“洗牌”將所有reduce的輸入匯總起來,這個過程就是shuffle。這個問題以及對ShuffleManager的具體使用會在第5章和第6章詳述。ShuffleManager的實例化見代碼清單3-6。代碼清單3-6最后創建的ShuffleMemoryManager,將在3.2.5節介紹。

代碼清單3-6  ShuffleManager的實例化及ShuffleMemoryManager的創建

 

    val shortShuffleMgrNames = Map(
      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass = shortShuffleMgrNames.get
OrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

    val shuffleMemoryManager = new ShuffleMemoryManager(conf)

   

3.2.5 shuffle線程內存管理器ShuffleMemoryManager

  ShuffleMemoryManager負責管理shuffle線程占有內存的分配與釋放,並通過threadMemory:mutable.HashMap[Long, Long]緩存每個線程的內存字節數,見代碼清單3-7。

代碼清單3-7  ShuffleMemoryManager的數據結構

private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
  private val threadMemory = new mutable.HashMap[Long, Long]()  // threadId -> memory bytes
  def this(conf: SparkConf) = this(ShuffleMemoryManager.getMaxMemory(conf))

   

getMaxMemory方法用於獲取shuffle所有線程占用的最大內存,實現如下。

def getMaxMemory(conf: SparkConf): Long = {
    val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
    val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
    (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
  }

  

從上面代碼可以看出,shuffle所有線程占用的最大內存的計算公式為:

Java運行時最大內存 * Spark的shuffle最大內存占比 * Spark的安全內存占比

可以配置屬性spark.shuffle.memoryFraction修改Spark的shuffle最大內存占比,配置屬性spark.shuffle.safetyFraction修改Spark的安全內存占比。


 

注意:ShuffleMemoryManager通常運行在Executor中, Driver中的ShuffleMemoryManager 只有在local模式下才起作用。


 

 

3.2.6 塊傳輸服務BlockTransferService

  BlockTransferService默認為NettyBlockTransferService(可以配置屬性spark.shuffle.blockTransferService使用NioBlockTransferService),它使用Netty提供的異步事件驅動的網絡應用框架,提供web服務及客戶端,獲取遠程節點上Block的集合。

val blockTransferService =
      conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
        case "netty" =>
          new NettyBlockTransferService(conf, securityManager, numUsableCores)
        case "nio" =>
          new NioBlockTransferService(conf, securityManager)
      }

  

NettyBlockTransferService的具體實現將在第4章詳細介紹。這里大家可能覺得奇怪,這樣的網絡應用為何也要放在存儲體系?大家不妨先帶着疑問,直到你真正了解存儲體系。

3.2.7 BlockManagerMaster介紹

BlockManagerMaster負責對Block的管理和協調,具體操作依賴於BlockManagerMasterActor。Driver和Executor處理BlockManagerMaster的方式不同:

如果當前應用程序是Driver,則創建BlockManagerMasterActor,並且注冊到ActorSystem中。

如果當前應用程序是Executor,則從ActorSystem中找到BlockManagerMasterActor。

無論是Driver還是Executor,最后BlockManagerMaster的屬性driverActor將持有對BlockManagerMasterActor的引用。BlockManagerMaster的創建代碼如下。

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
      "BlockManagerMaster",
      new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)

  

registerOrLookup已在3.2.3節介紹過了,不再贅述。BlockManagerMaster及BlockManagerMasterActor的具體實現將在第4章詳細介紹。

3.2.8 創建塊管理器BlockManager

  BlockManager負責對Block的管理,只有在BlockManager的初始化方法initialize被調用后,它才是有效的。BlockManager作為存儲系統的一部分,具體實現見第4章。BlockManager的創建代碼如下。

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
      serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
      numUsableCores)

  

3.2.9 創建廣播管理器BroadcastManager

  BroadcastManager用於將配置信息和序列化后的RDD、Job以及ShuffleDependency等信息在本地存儲。如果為了容災,也會復制到其他節點上。創建BroadcastManager的代碼實現如下。

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

  

BroadcastManager必須在其初始化方法initialize被調用后,才能生效。Initialize方法實際利用反射生成廣播工廠實例broadcastFactory(可以配置屬性spark.broadcast.factory指定,默認為org.apache.spark.broadcast.TorrentBroadcastFactory)。BroadcastManager的廣播方法newBroadcast實際代理了工廠broadcastFactory的newBroadcast方法來生成廣播或者非廣播對象。BroadcastManager的Initialize及newBroadcast方法見代碼清單3-8。

代碼清單3-8  BroadcastManager的實現

  

  private def initialize() {
    synchronized {
      if (!initialized) {
        val broadcastFactoryClass = conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")
        broadcastFactory =
          Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
        broadcastFactory.initialize(isDriver, conf, securityManager)
        initialized = true
      }
    }
  }

  private val nextBroadcastId = new AtomicLong(0)

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }

  def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
    broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
  }
}

  

3.2.10 創建緩存管理器CacheManager

  CacheManager用於緩存RDD某個分區計算后中間結果,緩存計算結果發生在迭代計算的時候,將在6.1節講到。而CacheManager將在4.14節詳細描述。創建CacheManager的代碼如下。

val cacheManager = new CacheManager(blockManager)

  

3.2.11 HTTP文件服務器HttpFileServer

         參見代碼清單3-9。HttpFileServer主要提供對jar及其他文件的http訪問,這些jar包包括用戶上傳的jar包。端口由屬性spark.fileserver.port配置,默認為0,表示隨機生成端口號。

代碼清單3-9  HttpFileServer的創建

  val httpFileServer =
      if (isDriver) {
        val fileServerPort = conf.getInt("spark.fileserver.port", 0)
        val server = new HttpFileServer(conf, securityManager, fileServerPort)
        server.initialize()
        conf.set("spark.fileserver.uri",  server.serverUri)
        server
      } else {
        null
      }

  

HttpFileServer的初始化過程,見代碼清單3-10,主要包括以下步驟:

1)         使用Utils工具類創建文件服務器的根目錄及臨時目錄(臨時目錄在運行時環境關閉時會刪除)。Utils工具的詳細介紹,見附錄A。

2)         創建存放jar包及其他文件的文件目錄。

3)         創建並啟動HTTP服務。

代碼清單3-10         HttpFileServer的初始化

  def initialize() {
    baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
    fileDir = new File(baseDir, "files")
    jarDir = new File(baseDir, "jars")
    fileDir.mkdir()
    jarDir.mkdir()
    logInfo("HTTP File server directory is " + baseDir)
    httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
    httpServer.start()
    serverUri = httpServer.uri
    logDebug("HTTP file server started at: " + serverUri)
  }

  

HttpServer的構造和start方法的實現中,再次使用了Utils的靜態方法startServiceOnPort,因此會回調doStart方法,見代碼清單3-11。有關jetty的API使用參見附錄C。

代碼清單3-11         HttpServer的啟動

  def start() {
    if (server != null) {
      throw new ServerStateException("Server is already started")
    } else {
      logInfo("Starting HTTP Server")
      val (actualServer, actualPort) =
        Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName)
      server = actualServer
      port = actualPort
    }
  }

   

doStart方法中啟動內嵌的jetty所提供的HTTP服務,見代碼清單3-12。

代碼清單3-12         HttpServer的啟動功能實現

  private def doStart(startPort: Int): (Server, Int) = {
    val server = new Server()
    val connector = new SocketConnector
    connector.setMaxIdleTime(60 * 1000)
    connector.setSoLingerTime(-1)
    connector.setPort(startPort)
    server.addConnector(connector)

    val threadPool = new QueuedThreadPool
    threadPool.setDaemon(true)
    server.setThreadPool(threadPool)
    val resHandler = new ResourceHandler
    resHandler.setResourceBase(resourceBase.getAbsolutePath)

    val handlerList = new HandlerList
    handlerList.setHandlers(Array(resHandler, new DefaultHandler))

    if (securityManager.isAuthenticationEnabled()) {
      logDebug("HttpServer is using security")
      val sh = setupSecurityHandler(securityManager)
      // make sure we go through security handler to get resources
      sh.setHandler(handlerList)
      server.setHandler(sh)
    } else {
      logDebug("HttpServer is not using security")
      server.setHandler(handlerList)
    }

    server.start()
    val actualPort = server.getConnectors()(0).getLocalPort

    (server, actualPort)
  }

  

3.2.12 創建測量系統MetricsSystem

  MetricsSystem是Spark的測量系統,創建MetricsSystem的代碼如下。

  val metricsSystem = if (isDriver) {
      MetricsSystem.createMetricsSystem("driver", conf, securityManager)
    } else {
      conf.set("spark.executor.id", executorId)
      val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
      ms.start()
      ms
    }

  

上面調用的createMetricsSystem方法實際創建了MetricsSystem,代碼如下。

  def createMetricsSystem(
      instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
    new MetricsSystem(instance, conf, securityMgr)
  }

  

構造MetricsSystem的過程最重要的是調用了MetricsConfig的initialize方法,見代碼清單3-13。

代碼清單3-13         MetricsConfig的初始化

def initialize() {
    setDefaultProperties(properties)

    var is: InputStream = null
    try {
      is = configFile match {
        case Some(f) => new FileInputStream(f)
        case None => Utils.getSparkClassLoader.getResourceAsStream(METRICS_CONF)
      }

      if (is != null) {
        properties.load(is)
      }
    } catch {
      case e: Exception => logError("Error loading configure file", e)
    } finally {
      if (is != null) is.close()
    }

    propertyCategories = subProperties(properties, INSTANCE_REGEX)
    if (propertyCategories.contains(DEFAULT_PREFIX)) {
      import scala.collection.JavaConversions._

      val defaultProperty = propertyCategories(DEFAULT_PREFIX)
      for { (inst, prop) <- propertyCategories
            if (inst != DEFAULT_PREFIX)
            (k, v) <- defaultProperty
            if (prop.getProperty(k) == null) } {
        prop.setProperty(k, v)
      }
    }
  }

   

從以上實現可以看出,MetricsConfig的initialize方法主要負責加載metrics.properties文件中的屬性配置,並對屬性進行初始化轉換。

例如:將屬性

{*.sink.servlet.path=/metrics/json, applications.sink.servlet.path=/metrics/applications/json, *.sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, master.sink.servlet.path=/metrics/master/json}

轉換為

Map(applications -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/applications/json}, master -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/master/json}, * -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json})

 

3.2.13 創建SparkEnv

  當所有的基礎組件准備好后,最終使用下面的代碼創建執行環境SparkEnv。

new SparkEnv(executorId, actorSystem, serializer, closureSerializer, cacheManager,
      mapOutputTracker, shuffleManager, broadcastManager, blockTransferService,
 blockManager, securityManager, httpFileServer, sparkFilesDir, 
metricsSystem, shuffleMemoryManager, conf)

  


 

注意:serializer和closureSerializer都是使用Class.forName反射生成的org.apache.spark.serializer.JavaSerializer類的實例,其中closureSerializer實例特別用來對Scala中的閉包進行序列化。


 

3.3 創建metadataCleaner

  SparkContext為了保持對所有持久化的RDD的跟蹤,使用類型是TimeStampedWeakValueHashMap的persistentRdds緩存。metadataCleaner的功能是清除過期的持久化RDD。創建metadataCleaner的代碼如下。

  private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
  private[spark] val metadataCleaner =
    new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)

  

我們仔細看看MetadataCleaner的實現,見代碼清單3-14。

代碼清單3-14         MetadataCleaner的實現

private[spark] class MetadataCleaner(
    cleanerType: MetadataCleanerType.MetadataCleanerType,
    cleanupFunc: (Long) => Unit,
    conf: SparkConf)
  extends Logging
{
  val name = cleanerType.toString

  private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)
  private val periodSeconds = math.max(10, delaySeconds / 10)
  private val timer = new Timer(name + " cleanup timer", true)

  private val task = new TimerTask {
    override def run() {
      try {
        cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
        logInfo("Ran metadata cleaner for " + name)
      } catch {
        case e: Exception => logError("Error running cleanup task for " + name, e)
      }
    }
  }

  if (delaySeconds > 0) {
    timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)
  }

  def cancel() {
    timer.cancel()
  }
}

  

從MetadataCleaner的實現可以看出其實質是一個用TimerTask實現的定時器,不斷調用cleanupFunc: (Long) => Unit這樣的函數參數。構造metadataCleaner時的函數參數是cleanup,用於清理persistentRdds中的過期內容,代碼如下。

  private[spark] def cleanup(cleanupTime: Long) {
    persistentRdds.clearOldValues(cleanupTime)
  }

  

 未完待續。。。

 

后記:自己犧牲了7個月的周末和下班空閑時間,通過研究Spark源碼和原理,總結整理的《深入理解Spark:核心思想與源碼分析》一書現在已經正式出版上市,目前亞馬遜、京東、當當、天貓等網站均有銷售,歡迎感興趣的同學購買。我開始研究源碼時的Spark版本是1.2.0,經過7個多月的研究和出版社近4個月的流程,Spark自身的版本迭代也很快,如今最新已經是1.6.0。目前市面上另外2本源碼研究的Spark書籍的版本分別是0.9.0版本和1.2.0版本,看來這些書的作者都與我一樣,遇到了這種問題。由於研究和出版都需要時間,所以不能及時跟上Spark的腳步,還請大家見諒。但是Spark核心部分的變化相對還是很少的,如果對版本不是過於追求,依然可以選擇本書。

 

京東(現有滿100減30活動):http://item.jd.com/11846120.html 

當當:http://product.dangdang.com/23838168.html 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM