Spark源碼分析 – SparkEnv


SparkEnv在兩個地方會被創建, 由於SparkEnv中包含了很多重要的模塊, 比如BlockManager, 所以SparkEnv很重要
Driver端, 在SparkContext初始化的時候, SparkEnv會被創建

  // Create the Spark execution environment (cache, map output tracker, etc)
  private[spark] val env = SparkEnv.createFromSystemProperties(
    "<driver>",  // 表示是driver, 下面的executor則是executorid
    System.getProperty("spark.driver.host"),
    System.getProperty("spark.driver.port").toInt,
    true,
    isLocal)
  SparkEnv.set(env)

Executor端, 在executor初始化時被創建

  // Initialize Spark environment (using system properties read above)
  val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
  SparkEnv.set(env)

 

SparkEnv Class

用於hold所有Spark運行時的環境對象, serializer, Akka actor system, block manager, and map output tracker等

/**
 * Holds all the runtime environment objects for a running Spark instance (either master or worker),
 * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
 * Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
 * objects needs to have the right SparkEnv set. You can get the current environment with
 * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
 */
class SparkEnv (
    val executorId: String,
    val actorSystem: ActorSystem,
    val serializerManager: SerializerManager,
    val serializer: Serializer,
    val closureSerializer: Serializer,
    val cacheManager: CacheManager,
    val mapOutputTracker: MapOutputTracker,
    val shuffleFetcher: ShuffleFetcher,
    val broadcastManager: BroadcastManager,
    val blockManager: BlockManager,
    val connectionManager: ConnectionManager,
    val httpFileServer: HttpFileServer,
    val sparkFilesDir: String,
    val metricsSystem: MetricsSystem) {
}

SparkEnv Object

scala使用伴生object當作類接口
除了基本的get和set
就是在createFromSystemProperties中創建了一堆很關鍵的對象

object SparkEnv extends Logging {
  private val env = new ThreadLocal[SparkEnv] // ThreadLocal,所以每個線程各訪問各的
  @volatile private var lastSetSparkEnv : SparkEnv = _ // 緩存最新更新的SparkEnv,並且volatile,便於其他線程獲得

  def set(e: SparkEnv) {
    lastSetSparkEnv = e
    env.set(e)
  }

  /**
   * Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
   * previously set in any thread.
   */
  def get: SparkEnv = {
    Option(env.get()).getOrElse(lastSetSparkEnv) // 沒有local時, 可以用lastSetSparkEnv 
  }

  /**
   * Returns the ThreadLocal SparkEnv.
   */
  def getThreadLocal : SparkEnv = {
    env.get() // 只取到local的
  }

  def createFromSystemProperties(
      executorId: String,
      hostname: String,
      port: Int,
      isDriver: Boolean,
      isLocal: Boolean): SparkEnv = {

    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)

    val classLoader = Thread.currentThread.getContextClassLoader

    // Create an instance of the class named by the given Java system property, or by
    // defaultClassName if the property is not set, and return it as a T
    def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
      val name = System.getProperty(propertyName, defaultClassName)
      Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]
    }

    val serializerManager = new SerializerManager

    val serializer = serializerManager.setDefault(
      System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))

    val closureSerializer = serializerManager.get(
      System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))

    val connectionManager = blockManager.connectionManager

    val broadcastManager = new BroadcastManager(isDriver)

    val cacheManager = new CacheManager(blockManager)
    // BlockManager 
    val blockManagerMaster = new BlockManagerMaster(registerOrLookup( // registerOrLookup表示只有在master上創建Actor對象, slave上只是創建ref
      "BlockManagerMaster",
      new BlockManagerMasterActor(isLocal)))
    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
    // MapOutputTracker
    val mapOutputTracker = new MapOutputTracker()
    mapOutputTracker.trackerActor = registerOrLookup( // 同樣只有在master創建actor對象
      "MapOutputTracker",
      new MapOutputTrackerActor(mapOutputTracker))
    
    // ShuffleFetcher
    val shuffleFetcher = instantiateClass[ShuffleFetcher](
      "spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher")

    val httpFileServer = new HttpFileServer()
    httpFileServer.initialize()
    System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)

    val metricsSystem = if (isDriver) {
      MetricsSystem.createMetricsSystem("driver")
    } else {
      MetricsSystem.createMetricsSystem("executor")
    }
    metricsSystem.start()

    new SparkEnv(
      executorId,
      actorSystem,
      serializerManager,
      serializer,
      closureSerializer,
      cacheManager,
      mapOutputTracker,
      shuffleFetcher,
      broadcastManager,
      blockManager,
      connectionManager,
      httpFileServer,
      sparkFilesDir,
      metricsSystem)
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM