《深入理解SPARK:核心思想與源碼分析》——SparkContext的初始化(仲篇)——SparkUI、環境變量及調度


《深入理解Spark:核心思想與源碼分析》一書前言的內容請看鏈接《深入理解SPARK:核心思想與源碼分析》一書正式出版上市

《深入理解Spark:核心思想與源碼分析》一書第一章的內容請看鏈接《第1章 環境准備》

《深入理解Spark:核心思想與源碼分析》一書第二章的內容請看鏈接《第2章 SPARK設計理念與基本架構》

由於本書的第3章內容較多,所以打算分別開辟四篇隨筆分別展現。

《深入理解Spark:核心思想與源碼分析》一書第三章第一部分的內容請看鏈接《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(伯篇)》

本文展現第3章第二部分的內容:

 

3.4 SparkUI詳解

  任何系統都需要提供監控功能,用瀏覽器能訪問具有樣式及布局,並提供豐富監控數據的頁面無疑是一種簡單、高效的方式。SparkUI就是這樣的服務,它的構成如圖3-1所示。

  在大型分布式系統中,采用事件監聽機制是最常見的。為什么要使用事件監聽機制?假如SparkUI采用Scala的函數調用方式,那么隨着整個集群規模的增加,對函數的調用會越來越多,最終會受到Driver所在JVM的線程數量限制而影響監控數據的更新,甚至出現監控數據無法及時顯示給用戶的情況。由於函數調用多數情況下是同步調用,這就導致線程被阻塞,在分布式環境中,還可能因為網絡問題,導致線程被長時間占用。將函數調用更換為發送事件,事件的處理是異步的,當前線程可以繼續執行后續邏輯,線程池中的線程還可以被重用,這樣整個系統的並發度會大大增加。發送的事件會存入緩存,由定時調度器取出后,分配給監聽此事件的監聽器對監控數據進行更新。

 

圖3-1        SparkUI架構

  我們先將圖3-1中的各個組件作簡單介紹:DAGScheduler是主要的產生各類SparkListenerEvent的源頭,它將各種SparkListenerEvent發送到listenerBus的事件隊列中,listenerBus通過定時器將SparkListenerEvent事件匹配到具體的SparkListener,改變SparkListener中的統計監控數據,最終由SparkUI的界面展示。從圖3-1中還可以看到Spark里定義了很多監聽器SparkListener的實現,包括JobProgressListener、EnviromentListener、StorageListener、ExecutorsListener幾種,它們的類繼承體系如圖3-2所示。

 

圖3-2        SparkListener繼承體系

3.4.1 listenerBus詳解

  listenerBus的類型是LiveListenerBus,LiveListenerBus實現了監聽器模型,通過監聽事件觸發對各種監聽器監聽狀態信息的修改,達到UI界面的數據刷新效果。LiveListenerBus由以下部分組成:

事件阻塞隊列:類型為LinkedBlockingQueue[SparkListenerEvent],固定大小是10000;

監聽器數組:類型為ArrayBuffer[SparkListener],存放各類監聽器SparkListener。SparkListener是;

事件匹配監聽器的線程:此Thread不斷拉取LinkedBlockingQueue中的事情,遍歷監聽器,調用監聽器的方法。任何事件都會在LinkedBlockingQueue中存在一段時間,然后Thread處理了此事件后,會將其清除。因此使用listener bus這個名字再合適不過了,到站就下車。listenerBus的實現,見代碼清單3-15。

代碼清單3-15         LiveListenerBus的事件處理實現

private val EVENT_QUEUE_CAPACITY = 10000
  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
  private var queueFullErrorMessageLogged = false
  private var started = false
  // A counter that represents the number of events produced and consumed in the queue
  private val eventLock = new Semaphore(0)

  private val listenerThread = new Thread("SparkListenerBus") {
    setDaemon(true)
    override def run(): Unit = Utils.logUncaughtExceptions {
      while (true) {
        eventLock.acquire()
        // Atomically remove and process this event
        LiveListenerBus.this.synchronized {
          val event = eventQueue.poll
          if (event == SparkListenerShutdown) {
            // Get out of the while loop and shutdown the daemon thread
            return
          }
          Option(event).foreach(postToAll)
        }
      }
    }
  }

  def start() {
    if (started) {
      throw new IllegalStateException("Listener bus already started!")
    }
    listenerThread.start()
    started = true
  }
def post(event: SparkListenerEvent) {
    val eventAdded = eventQueue.offer(event)
    if (eventAdded) {
      eventLock.release()
    } else {
      logQueueFullErrorMessage()
    }
  }
  
  def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }

  def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }

  def stop() {
    if (!started) {
      throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
    }
    post(SparkListenerShutdown)
    listenerThread.join()
  }

 

LiveListenerBus中調用的postToAll方法實際定義在父類SparkListenerBus中,如代碼清單3-16所示。

代碼清單3-16         SparkListenerBus中的監聽器調用

  protected val sparkListeners = new ArrayBuffer[SparkListener]
    with mutable.SynchronizedBuffer[SparkListener]

  def addListener(listener: SparkListener) {
    sparkListeners += listener
  }

  def postToAll(event: SparkListenerEvent) {
    event match {
      case stageSubmitted: SparkListenerStageSubmitted =>
        foreachListener(_.onStageSubmitted(stageSubmitted))
      case stageCompleted: SparkListenerStageCompleted =>
        foreachListener(_.onStageCompleted(stageCompleted))
      case jobStart: SparkListenerJobStart =>
        foreachListener(_.onJobStart(jobStart))
      case jobEnd: SparkListenerJobEnd =>
        foreachListener(_.onJobEnd(jobEnd))
      case taskStart: SparkListenerTaskStart =>
        foreachListener(_.onTaskStart(taskStart))
      case taskGettingResult: SparkListenerTaskGettingResult =>
        foreachListener(_.onTaskGettingResult(taskGettingResult))
      case taskEnd: SparkListenerTaskEnd =>
        foreachListener(_.onTaskEnd(taskEnd))
      case environmentUpdate: SparkListenerEnvironmentUpdate =>
        foreachListener(_.onEnvironmentUpdate(environmentUpdate))
      case blockManagerAdded: SparkListenerBlockManagerAdded =>
        foreachListener(_.onBlockManagerAdded(blockManagerAdded))
      case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
        foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))
      case unpersistRDD: SparkListenerUnpersistRDD =>
        foreachListener(_.onUnpersistRDD(unpersistRDD))
      case applicationStart: SparkListenerApplicationStart =>
        foreachListener(_.onApplicationStart(applicationStart))
      case applicationEnd: SparkListenerApplicationEnd =>
        foreachListener(_.onApplicationEnd(applicationEnd))
      case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
        foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
      case SparkListenerShutdown =>
    }
  }

  private def foreachListener(f: SparkListener => Unit): Unit = {
    sparkListeners.foreach { listener =>
      try {
        f(listener)
      } catch {
        case e: Exception =>
          logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
      }
    }
  }

 

3.4.2 構造JobProgressListener

  我們以JobProgressListener為例來講解SparkListener。JobProgressListener是SparkContext中一個重要的組成部分,通過監聽listenerBus中的事件更新任務進度。SparkStatusTracker和SparkUI實際上也是通過JobProgressListener來實現任務狀態跟蹤的。創建JobProgressListener的代碼如下。

  private[spark] val jobProgressListener = new JobProgressListener(conf)
  listenerBus.addListener(jobProgressListener)

  val statusTracker = new SparkStatusTracker(this)

JobProgressListener的作用是通過HashMap、ListBuffer等數據結構存儲JobId及對應的JobUIData信息,並按照激活、完成、失敗等job狀態統計。對於StageId、StageInfo等信息按照激活、完成、忽略、失敗等stage狀態統計。並且存儲StageId與JobId的一對多關系。這些統計信息最終會被JobPage和StagePage等頁面訪問和渲染。JobProgressListener的數據結構見代碼清單3-17。

代碼清單3-17         JobProgressListener維護的信息

class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {

  import JobProgressListener._

  type JobId = Int
  type StageId = Int
  type StageAttemptId = Int
  type PoolName = String
  type ExecutorId = String

  // Jobs:
  val activeJobs = new HashMap[JobId, JobUIData]
  val completedJobs = ListBuffer[JobUIData]()
  val failedJobs = ListBuffer[JobUIData]()
  val jobIdToData = new HashMap[JobId, JobUIData]

  // Stages:
  val activeStages = new HashMap[StageId, StageInfo]
  val completedStages = ListBuffer[StageInfo]()
  val skippedStages = ListBuffer[StageInfo]()
  val failedStages = ListBuffer[StageInfo]()
  val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
  val stageIdToInfo = new HashMap[StageId, StageInfo]
  val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]]
  val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]()
  var numCompletedStages = 0 // 總共完成的Stage數量
  var numFailedStages = 0 / 總共失敗的Stage數量

  // Misc:
  val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()
  def blockManagerIds = executorIdToBlockManagerId.values.toSeq

  var schedulingMode: Option[SchedulingMode] = None

  // number of non-active jobs and stages (there is no limit for active jobs and stages):
  val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
  val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS) 

JobProgressListener 實現了onJobStart、onJobEnd、onStageCompleted、onStageSubmitted、onTaskStart、onTaskEnd等方法,這些方法正是在listenerBus的驅動下,改變JobProgressListener中的各種Job、Stage相關的數據。

3.4.3 SparkUI的創建與初始化

創建SparkUI的實現,見代碼清單3-18。

代碼清單3-18         SparkUI的聲明

  private[spark] val ui: Option[SparkUI] =
    if (conf.getBoolean("spark.ui.enabled", true)) {
      Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
        env.securityManager,appName))
    } else {
      None
    }

  ui.foreach(_.bind())

可以看到如果不需要提供SparkUI服務,可以將屬性spark.ui.enabled修改為false。其中createLiveUI實際是調用了create方法,見代碼清單3-19。

 

代碼清單3-19         SparkUI的創建

  def createLiveUI(
      sc: SparkContext,
      conf: SparkConf,
      listenerBus: SparkListenerBus,
      jobProgressListener: JobProgressListener,
      securityManager: SecurityManager,
      appName: String): SparkUI =  {
    create(Some(sc), conf, listenerBus, securityManager, appName,
      jobProgressListener = Some(jobProgressListener))
  }

在create方法里,除了JobProgressListener是外部傳入的之外,又增加了一些SparkListener。例如,用於對JVM參數、Spark屬性、Java系統屬性、classpath等進行監控的EnvironmentListener;用於維護executor的存儲狀態的StorageStatusListener;用於准備將executor的信息展示在ExecutorsTab的ExecutorsListener;用於准備將executor相關存儲信息展示在BlockManagerUI的StorageListener等。最后創建SparkUI,參見代碼清單3-20。

 

代碼清單3-20         create方法的實現

  

  private def create(
      sc: Option[SparkContext],
      conf: SparkConf,
      listenerBus: SparkListenerBus,
      securityManager: SecurityManager,
      appName: String,
      basePath: String = "",
      jobProgressListener: Option[JobProgressListener] = None): SparkUI = {

    val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
      val listener = new JobProgressListener(conf)
      listenerBus.addListener(listener)
      listener
    }

    val environmentListener = new EnvironmentListener
    val storageStatusListener = new StorageStatusListener
    val executorsListener = new ExecutorsListener(storageStatusListener)
    val storageListener = new StorageListener(storageStatusListener)

    listenerBus.addListener(environmentListener)
    listenerBus.addListener(storageStatusListener)
    listenerBus.addListener(executorsListener)
    listenerBus.addListener(storageListener)

    new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
      executorsListener, _jobProgressListener, storageListener, appName, basePath)
  }

 

SparkUI服務默認是可以被殺掉的,通過修改屬性spark.ui.killEnabled為false可以保證不被殺死。initialize方法,會組織前端頁面各個Tab和Page的展示及布局,參見代碼清單3-21。

代碼清單3-21         SparkUI的初始化

private[spark] class SparkUI private (
    val sc: Option[SparkContext],
    val conf: SparkConf,
    val securityManager: SecurityManager,
    val environmentListener: EnvironmentListener,
    val storageStatusListener: StorageStatusListener,
    val executorsListener: ExecutorsListener,
    val jobProgressListener: JobProgressListener,
    val storageListener: StorageListener,
    var appName: String,
    val basePath: String)
  extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
  with Logging {

  val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)

  /** Initialize all components of the server. */
  def initialize() {
    attachTab(new JobsTab(this))
    val stagesTab = new StagesTab(this)
    attachTab(stagesTab)
    attachTab(new StorageTab(this))
    attachTab(new EnvironmentTab(this))
    attachTab(new ExecutorsTab(this))
    attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
    attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
    attachHandler(
      createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest))
  }
  initialize()

 

3.4.4 SparkUI的頁面布局及展示

  SparkUI究竟是如何實現頁面布局及展示的?JobsTab展示所有Job的進度、狀態信息,這里我們以它為例來說明。JobsTab會復用SparkUI的killEnabled、SparkContext、jobProgressListener,包括AllJobsPage和JobPage兩個頁面,見代碼清單3-22。

代碼清單3-22         JobsTab的實現

private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
  val sc = parent.sc
  val killEnabled = parent.killEnabled
  def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
  val listener = parent.jobProgressListener

  attachPage(new AllJobsPage(this))
  attachPage(new JobPage(this))
} 

AllJobsPage由render方法渲染,利用jobProgressListener中的統計監控數據生成激活、完成、失敗等狀態的Job摘要信息,並調用jobsTable方法生成表格等html元素,最終使用UIUtils的headerSparkPage封裝好css、js、header及頁面布局等,見代碼清單3-23。

 

代碼清單3-23         AllJobsPage的實現

def render(request: HttpServletRequest): Seq[Node] = {
    listener.synchronized {
      val activeJobs = listener.activeJobs.values.toSeq
      val completedJobs = listener.completedJobs.reverse.toSeq
      val failedJobs = listener.failedJobs.reverse.toSeq
      val now = System.currentTimeMillis

      val activeJobsTable =
        jobsTable(activeJobs.sortBy(_.startTime.getOrElse(-1L)).reverse)
      val completedJobsTable =
        jobsTable(completedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
      val failedJobsTable =
        jobsTable(failedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)

      val summary: NodeSeq =
        <div>
          <ul class="unstyled">
            {if (startTime.isDefined) {
              // Total duration is not meaningful unless the UI is live
              <li>
                <strong>Total Duration: </strong>
                {UIUtils.formatDuration(now - startTime.get)}
              </li>
            }}
            <li>
              <strong>Scheduling Mode: </strong>
              {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
            </li>
            <li>
              <a href="#active"><strong>Active Jobs:</strong></a>
              {activeJobs.size}
            </li>
            <li>
              <a href="#completed"><strong>Completed Jobs:</strong></a>
              {completedJobs.size}
            </li>
            <li>
              <a href="#failed"><strong>Failed Jobs:</strong></a>
              {failedJobs.size}
            </li>
          </ul>
        </div> 

jobsTable用來生成表格數據,見代碼清單3-24。

 

代碼清單3-24         jobsTable處理表格的實現

private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = {
    val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)

    val columns: Seq[Node] = {
      <th>{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"}</th>
      <th>Description</th>
      <th>Submitted</th>
      <th>Duration</th>
      <th class="sorttable_nosort">Stages: Succeeded/Total</th>
      <th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
    }

    <table class="table table-bordered table-striped table-condensed sortable">
      <thead>{columns}</thead>
      <tbody>
        {jobs.map(makeRow)}
      </tbody>
    </table>
  }

 

表格中每行數據又是通過makeRow方法渲染的,參見代碼清單3-25。

 

代碼清單3-25         生成表格中的行

def makeRow(job: JobUIData): Seq[Node] = {
      val lastStageInfo = Option(job.stageIds)
        .filter(_.nonEmpty)
        .flatMap { ids => listener.stageIdToInfo.get(ids.max) }
      val lastStageData = lastStageInfo.flatMap { s =>
        listener.stageIdToData.get((s.stageId, s.attemptId))
      }
      val isComplete = job.status == JobExecutionStatus.SUCCEEDED
      val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
      val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
      val duration: Option[Long] = {
        job.startTime.map { start =>
          val end = job.endTime.getOrElse(System.currentTimeMillis())
          end - start
        }
      }
      val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
      val formattedSubmissionTime = job.startTime.map(UIUtils.formatDate).getOrElse("Unknown")
      val detailUrl =
        "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
      <tr>
        <td sorttable_customkey={job.jobId.toString}>
          {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
        </td>
        <td>
          <div><em>{lastStageDescription}</em></div>
          <a href={detailUrl}>{lastStageName}</a>
        </td>
        <td sorttable_customkey={job.startTime.getOrElse(-1).toString}>
          {formattedSubmissionTime}
        </td>
        <td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
        <td class="stage-progress-cell">
          {job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
          {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
          {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
        </td>
        <td class="progress-cell">
          {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
           failed = job.numFailedTasks, skipped = job.numSkippedTasks,
           total = job.numTasks - job.numSkippedTasks)}
        </td>
      </tr>
    } 

代碼清單3-22中的attachPage方法存在於JobsTab的父類WebUITab中,WebUITab維護有ArrayBuffer[WebUIPage]的數據結構,AllJobsPage和JobPage將被放入此ArrayBuffer中,參見代碼清單3-26。

 

代碼清單3-26         WebUITab的實現

private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
  val pages = ArrayBuffer[WebUIPage]()
  val name = prefix.capitalize

  /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */
  def attachPage(page: WebUIPage) {
    page.prefix = (prefix + "/" + page.prefix).stripSuffix("/")
    pages += page
  }

  /** Get a list of header tabs from the parent UI. */
  def headerTabs: Seq[WebUITab] = parent.getTabs

  def basePath: String = parent.getBasePath
}

JobsTab創建之后,將被attachTab方法加入SparkUI的ArrayBuffer[WebUITab]中,並且通過attachPage方法,給每一個page生成org.eclipse.jetty.servlet.ServletContextHandler,最后調用attachHandler方法將ServletContextHandler綁定到SparkUI,即加入到handlers :ArrayBuffer[ServletContextHandler]和樣例類ServerInfo樣例類的rootHandler(ContextHandlerCollection)中。SparkUI繼承自WebUI,attachTab方法在WebUI中實現,參見代碼清單3-27。

 

代碼清單3-27         WebUI的實現

private[spark] abstract class WebUI( securityManager: SecurityManager, port: Int,
    conf: SparkConf, basePath: String = "", name: String = "") extends Logging {

  protected val tabs = ArrayBuffer[WebUITab]()
  protected val handlers = ArrayBuffer[ServletContextHandler]()
  protected var serverInfo: Option[ServerInfo] = None
  protected val localHostName = Utils.localHostName()
  protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
  private val className = Utils.getFormattedClassName(this)

  def getBasePath: String = basePath
  def getTabs: Seq[WebUITab] = tabs.toSeq
  def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
  def getSecurityManager: SecurityManager = securityManager

  /** Attach a tab to this UI, along with all of its attached pages. */
  def attachTab(tab: WebUITab) {
    tab.pages.foreach(attachPage)
    tabs += tab
  }

  /** Attach a page to this UI. */
  def attachPage(page: WebUIPage) {
    val pagePath = "/" + page.prefix
    attachHandler(createServletHandler(pagePath,
      (request: HttpServletRequest) => page.render(request), securityManager, basePath))
    attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json",
      (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath))
  }

  /** Attach a handler to this UI. */
  def attachHandler(handler: ServletContextHandler) {
    handlers += handler
    serverInfo.foreach { info =>
      info.rootHandler.addHandler(handler)
      if (!handler.isStarted) {
        handler.start()
      }
    }
  }

由於代碼清單3-27所在的類中使用import org.apache.spark.ui.JettyUtils._導入了JettyUtils的靜態方法,所以createServletHandler方法實際是JettyUtils 的靜態方法createServletHandler。createServletHandler實際創建了javax.servlet.http.HttpServlet的匿名內部類實例,此實例實際使用(request: HttpServletRequest) => page.render(request)這個函數參數來處理請求,進而渲染頁面呈現給用戶。有關createServletHandler的實現,及Jetty的相關信息,請參閱附錄C。

3.4.5 SparkUI啟動

  parkUI創建好后,需要調用父類WebUI的bind方法,綁定服務和端口,bind方法中主要的代碼實現如下。

      serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))

JettyUtils的靜態方法startJettyServer的實現請參閱附錄C。最終啟動了Jetty提供的服務,默認端口是4040。

3.5 Hadoop相關配置及Executor環境變量

3.5.1 Hadoop相關配置信息

  默認情況下,Spark使用HDFS作為分布式文件系統,所以需要獲取Hadoop相關配置信息的代碼如下。

  val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

  獲取的配置信息包括:

Amazon S3文件系統AccessKeyId和SecretAccessKey加載到Hadoop的Configuration;

將SparkConf中所有spark.hadoop.開頭的屬性都復制到Hadoop的Configuration;

將SparkConf的屬性spark.buffer.size復制為Hadoop的Configuration的配置io.file.buffer.size。


 注意:如果指定了SPARK_YARN_MODE屬性,則會使用YarnSparkHadoopUtil,否則默認為SparkHadoopUtil。


 

3.5.2 Executor環境變量

  對Executor的環境變量的處理,參見代碼清單3-28。executorEnvs 包含的環境變量將會在7.2.2節中介紹的注冊應用的過程中發送給Master,Master給Worker發送調度后,Worker最終使用executorEnvs提供的信息啟動Executor。可以通過配置spark.executor.memory指定Executor占用的內存大小,也可以配置系統變量SPARK_EXECUTOR_MEMORY或者SPARK_MEM對其大小進行設置。

代碼清單3-28         Executor 環境變量的處理

private[spark] val executorMemory = conf.getOption("spark.executor.memory")
    .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
    .orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem))
    .map(Utils.memoryStringToMb)
    .getOrElse(512)

  // Environment variables to pass to our executors.
  private[spark] val executorEnvs = HashMap[String, String]()

  for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
    value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
    executorEnvs(envKey) = value
  }
  Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
    executorEnvs("SPARK_PREPEND_CLASSES") = v
  }
  // The Mesos scheduler backend relies on this environment variable to set executor memory.
  executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
  executorEnvs ++= conf.getExecutorEnv

  // Set SPARK_USER for user who is running SparkContext.
  val sparkUser = Option {
    Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name"))
  }.getOrElse {
    SparkContext.SPARK_UNKNOWN_USER
  }
  executorEnvs("SPARK_USER") = sparkUser 

3.6 創建任務調度器TaskScheduler

  TaskScheduler也是SparkContext的重要組成部分,負責任務的提交,並且請求集群管理器對任務調度。TaskScheduler也可以看做任務調度的客戶端。創建TaskScheduler的代碼如下。

  private[spark] var (schedulerBackend, taskScheduler) =
    SparkContext.createTaskScheduler(this, master)

createTaskScheduler方法會根據master的配置匹配部署模式,創建TaskSchedulerImpl,並生成不同的SchedulerBackend。本章為了使讀者更容易理解Spark的初始化流程,故以local模式為例,其余模式將在第6章詳解。master匹配local模式的代碼如下。

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

3.6.1 創建TaskSchedulerImpl

  TaskSchedulerImpl的構造過程如下:

1) 從SparkConf中讀取配置信息,包括每個任務分配的CPU數、調度模式(調度模式有FAIR和FIFO兩種,默認為FIFO,可以修改屬性spark.scheduler.mode來改變)等。

2) 創建TaskResultGetter,它的作用是通過線程池(Executors.newFixedThreadPool創建的,默認4個線程,線程名字以task-result-getter開頭,線程工廠默認是Executors.defaultThreadFactory),對slave發送的task的執行結果進行處理。

TaskSchedulerImpl的主要組成,見代碼清單3-29。

代碼清單3-29         TaskSchedulerImpl的實現

  var dagScheduler: DAGScheduler = null
  var backend: SchedulerBackend = null
  val mapOutputTracker = SparkEnv.get.mapOutputTracker
  var schedulableBuilder: SchedulableBuilder = null
  var rootPool: Pool = null
  // default scheduler is FIFO
  private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
  val schedulingMode: SchedulingMode = try {
    SchedulingMode.withName(schedulingModeConf.toUpperCase)
  } catch {
    case e: java.util.NoSuchElementException =>
      throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf")
  }

  // This is a var so that we can reset it for testing purposes.
  private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)

 

TaskSchedulerImpl的調度模式有FAIR和FIFO兩種。任務的最終調度實際都是落實到接口SchedulerBackend的具體實現上的。為方便分析,我們先來看看local模式中SchedulerBackend的實現LocalBackend。LocalBackend依賴於LocalActor與ActorSystem進行消息通信。LocalBackend參見代碼清單3-30。

代碼清單3-30         LocalBackend的實現

private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)
  extends SchedulerBackend with ExecutorBackend {

  private val appId = "local-" + System.currentTimeMillis
  var localActor: ActorRef = null

  override def start() {
    localActor = SparkEnv.get.actorSystem.actorOf(
      Props(new LocalActor(scheduler, this, totalCores)),
      "LocalBackendActor")
  }

  override def stop() {
    localActor ! StopExecutor
  }

  override def reviveOffers() {
    localActor ! ReviveOffers
  }

  override def defaultParallelism() =
    scheduler.conf.getInt("spark.default.parallelism", totalCores)

  override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) {
    localActor ! KillTask(taskId, interruptThread)
  }

  override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
    localActor ! StatusUpdate(taskId, state, serializedData)
  }

  override def applicationId(): String = appId
} 

3.6.2 TaskSchedulerImpl的初始化

  創建完TaskSchedulerImpl和LocalBackend后,對TaskSchedulerImpl調用方法initialize進行初始化。初始化過程如下:

1) 使TaskSchedulerImpl持有LocalBackend的引用。

2) 創建Pool,Pool中緩存了調度隊列、調度算法及TaskSetManager集合等信息。

3) 創建FIFOSchedulableBuilder,FIFOSchedulableBuilder用來操作Pool中的調度隊列。

Initialize方法的實現見代碼清單3-31。

代碼清單3-31         TaskSchedulerImpl的初始化

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
      }
    }
    schedulableBuilder.buildPools()
  } 

3.7 創建和啟動DAGScheduler

  DAGScheduler主要用於在任務正式交給TaskSchedulerImpl提交之前做一些准備工作,包括:創建Job,將DAG中的RDD划分到不同的Stage、提交Stage,等等。創建DAGScheduler的代碼如下。

@volatile private[spark] var dagScheduler: DAGScheduler = _
    dagScheduler = new DAGScheduler(this)

DAGScheduler的數據結構主要維護jobId和stageId的關系、Stage、ActiveJob,以及緩存的RDD的partitions的位置信息,見代碼清單3-32。

代碼清單3-32         DAGScheduler維護的數據結構

  private[scheduler] val nextJobId = new AtomicInteger(0)
  private[scheduler] def numTotalJobs: Int = nextJobId.get()
  private val nextStageId = new AtomicInteger(0)

  private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
  private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
  private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]
  private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]

  // Stages we need to run whose parents aren't done
  private[scheduler] val waitingStages = new HashSet[Stage]
  // Stages we are running right now
  private[scheduler] val runningStages = new HashSet[Stage]
  // Stages that must be resubmitted due to fetch failures
  private[scheduler] val failedStages = new HashSet[Stage]

  private[scheduler] val activeJobs = new HashSet[ActiveJob]

 // Contains the locations that each RDD's partitions are cached on
  private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
  private val failedEpoch = new HashMap[String, Long]

  private val dagSchedulerActorSupervisor =
    env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))

  private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()

在構造DAGScheduler的時候會調用initializeEventProcessActor方法創建DAGSchedulerEventProcessActor,見代碼清單3-33。

代碼清單3-33         DAGSchedulerEventProcessActor的初始化

  private[scheduler] var eventProcessActor: ActorRef = _
  private def initializeEventProcessActor() {
    // blocking the thread until supervisor is started, which ensures eventProcessActor is
    // not null before any job is submitted
    implicit val timeout = Timeout(30 seconds)
    val initEventActorReply =
      dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))
    eventProcessActor = Await.result(initEventActorReply, timeout.duration).
      asInstanceOf[ActorRef]
  }

  initializeEventProcessActor() 

這里的DAGSchedulerActorSupervisor主要作為DAGSchedulerEventProcessActor的監管者,負責生成DAGSchedulerEventProcessActor。從代碼清單3-34可以看出,DAGSchedulerActorSupervisor對於DAGSchedulerEventProcessActor采用了Akka的一對一監管策略。DAGSchedulerActorSupervisor一旦生成DAGSchedulerEventProcessActor,並注冊到ActorSystem,ActorSystem就會調用DAGSchedulerEventProcessActor的preStart,taskScheduler於是就持有了dagScheduler,見代碼清單3-35。從代碼清單3-35我們還看到DAGSchedulerEventProcessActor所能處理的消息類型,比如handleJobSubmitted、handleBeginEvent、handleTaskCompletion等。DAGSchedulerEventProcessActor接受這些消息后會有不同的處理動作,在本章,讀者只需要理解到這里即可,后面章節用到時會詳細分析。

代碼清單3-34         DAGSchedulerActorSupervisor的監管策略

private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
  extends Actor with Logging {

  override val supervisorStrategy =
    OneForOneStrategy() {
      case x: Exception =>
        logError("eventProcesserActor failed; shutting down SparkContext", x)
        try {
          dagScheduler.doCancelAllJobs()
        } catch {
          case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
        }
        dagScheduler.sc.stop()
        Stop
    }

  def receive = {
    case p: Props => sender ! context.actorOf(p)
    case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor")
  }
} 

代碼清單3-35         DAGSchedulerEventProcessActor的實現

private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)
  extends Actor with Logging {
  override def preStart() {
    dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
  }
  /**
   * The main event loop of the DAG scheduler.
   */
  def receive = {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
        listener, properties)
    case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)
    case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)
    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)
    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()
    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)
    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)
    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)
    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)
    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)
    case TaskSetFailed(taskSet, reason) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason)
    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }
  override def postStop() {
    // Cancel any active jobs in postStop hook
    dagScheduler.cleanUpAfterSchedulerStop()
  }

 

未完待續。。。

 

后記:自己犧牲了7個月的周末和下班空閑時間,通過研究Spark源碼和原理,總結整理的《深入理解Spark:核心思想與源碼分析》一書現在已經正式出版上市,目前亞馬遜、京東、當當、天貓等網站均有銷售,歡迎感興趣的同學購買。我開始研究源碼時的Spark版本是1.2.0,經過7個多月的研究和出版社近4個月的流程,Spark自身的版本迭代也很快,如今最新已經是1.6.0。目前市面上另外2本源碼研究的Spark書籍的版本分別是0.9.0版本和1.2.0版本,看來這些書的作者都與我一樣,遇到了這種問題。由於研究和出版都需要時間,所以不能及時跟上Spark的腳步,還請大家見諒。但是Spark核心部分的變化相對還是很少的,如果對版本不是過於追求,依然可以選擇本書。

 

京東(現有滿100減30活動):http://item.jd.com/11846120.html 

當當:http://product.dangdang.com/23838168.html 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM