當Spark程序在運行時,會提供一個Web頁面查看Application運行狀態信息。是否開啟UI界面由參數spark.ui.enabled(默認為true)
來確定。下面列出Spark UI一些相關配置參數,默認值,以及其作用。
參數 | 默認值 | 作用描述 |
---|---|---|
spark.ui.enabled | true | 是否開啟UI界面 |
spark.ui.port | 4040(順序探查空閑端口) | UI界面的訪問端口號 |
spark.ui.retainedJobs | 1000 | UI界面顯示的Job個數 |
spark.ui.retailedStages | 1000 | UI界面上顯示的Stage個數 |
spark.ui.timeline.tasks.maximum | 1000 | Stage頁面顯示的Tasks個數 |
spark.ui.killEnabled | true | 是否運行頁面上kill任務 |
spark.ui.threadDumpsEnabled | true | Executors頁面是否可以展示線程運行狀況 |
本文接下來分成兩個部分,第一部分基於Spark-1.6.0的源碼,結合第二部分的圖片內容來描述UI界面在Spark中的實現方式。第二部分以實例展示Spark UI界面顯示的內容。
一、Spark UI界面實現方式
1、UI組件結構
這部分先講UI界面的實現方式,UI界面的實例在本文最后一部分。如果對這部分中的某些概念不清楚,那么最好先把第二部分了解一下。
從下面UI界面的實例可以看出,不同的內容以Tab的形式展現在界面上,對應每一個Tab在下方顯示具體內容。基本上Spark UI界面也是按這個層次關系實現的。
以SparkUI類為容器,各個Tab,如JobsTab, StagesTab, ExecutorsTab等鑲嵌在SparkUI上,對應各個Tab,有頁面內容實現類JobPage, StagePage, ExecutorsPage等頁面。這些類的繼承和包含關系如下圖所示:
2、初始化過程
從上面可以看出,SparkUI類型的對象是UI界面的根對象,它是在SparkContext類中構造出來的。
private var _ui: Option[SparkUI] = None //定義
_ui = //SparkUI對象的生成
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
}
_ui.foreach(_.bind()) //啟動jetty。bind方法繼承自WebUI,該類負責和真實的Jetty Server API打交道
上面這段代碼中可以看到SparkUI對象的生成過程,結合上面的類結構圖,可以看到bind方法繼承自WebUI類,進入WebUI類中,
protected val handlers = ArrayBuffer[ServletContextHandler]() // 這個對象在下面bind方法中會使用到。
protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]] // 將page綁定到handlers上
/** 將Http Server綁定到這個Web頁面 */
def bind() {
assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
} catch {
case e: Exception =>
logError("Failed to bind %s".format(className), e)
System.exit(1)
}
}
上面代碼中handlers對象維持了WebUIPage和Jetty之間的關系,org.eclipse.jetty.servlet.ServletContextHandler
是標准jetty容器的handler。而對象pageToHandlers
維持了WebUIPage到ServletContextHandler的對應關系。
各Tab頁以及該頁內容的實現,基本上大同小異。接下來以AllJobsPage頁面為例仔細梳理頁面展示的過程。
3、SparkUI中Tab的綁定
從上面的類結構圖中看到WebUIPage提供了兩個重要的方法,render和renderJson用於相應頁面請求,在WebUIPage的實現類中,具體實現了這兩個方法。
在SparkContext中構造出SparkUI的實例后,會執行SparkUI#initialize方法進行初始化。如下面代碼中,調用SparkUI從WebUI繼承的attacheTab方法,將各Tab頁面綁定到UI上。
def initialize() {
attachTab(new JobsTab(this))
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
attachHandler(ApiRootResource.getServletHandler(this))
// This should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler( "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest, httpMethods = Set("GET", "POST")))
}
4、頁面內容綁定到Tab
在上一節中,JobsTab標簽綁定到SparkUI上之后,在JobsTab上綁定了AllJobsPage和JobPage類。AllJobsPage頁面即訪問SparkUI頁面時列舉出所有Job的那個頁面,JobPage頁面則是點擊單個Job時跳轉的頁面。通過調用JobsTab從WebUITab繼承的attachPage方法與JobsTab進行綁定。
private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
val sc = parent.sc
val killEnabled = parent.killEnabled
val jobProgresslistener = parent.jobProgressListener
val executorListener = parent.executorsListener
val operationGraphListener = parent.operationGraphListener
def isFairScheduler: Boolean =
jobProgresslistener.schedulingMode.exists(_ == SchedulingMode.FAIR)
attachPage(new AllJobsPage(this))
attachPage(new JobPage(this))
}
5、頁面內容的展示
知道了AllJobsPage頁面如何綁定到SparkUI界面后,接下來分析這個頁面的內容是如何顯示的。進入AllJobsPage類,主要觀察render方法。在頁面展示上Spark直接利用了Scala對html/xml的語法支持,將頁面的Html代碼嵌入Scala程序中。具體的頁面生成過程可以查看下面源碼中的注釋。這里可以結合第二部分的實例進行查看。
def render(request: HttpServletRequest): Seq[Node] = {
val listener = parent.jobProgresslistener //獲取jobProgresslistener對象,頁面展示的數據都是從這里讀取
listener.synchronized {
val startTime = listener.startTime // 獲取application的開始時間,默認值為-1L
val endTime = listener.endTime // 獲取application的結束時間,默認值為-1L
val activeJobs = listener.activeJobs.values.toSeq // 獲取當前application中處於active狀態的job
val completedJobs = listener.completedJobs.reverse.toSeq // 獲取當前application中完成狀態的job
val failedJobs = listener.failedJobs.reverse.toSeq // 獲取當前application中失敗狀態的job
val activeJobsTable =
jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse)
val completedJobsTable =
jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
val failedJobsTable =
jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
val shouldShowActiveJobs = activeJobs.nonEmpty
val shouldShowCompletedJobs = completedJobs.nonEmpty
val shouldShowFailedJobs = failedJobs.nonEmpty
val completedJobNumStr = if (completedJobs.size == listener.numCompletedJobs) {
s"${completedJobs.size}"
} else {
s"${listener.numCompletedJobs}, only showing ${completedJobs.size}"
}
val summary: NodeSeq =
<div>
<ul class="unstyled">
<li>
<strong>Total Uptime:</strong> // 顯示當前Spark應用運行時間
{// 如果還沒有結束,就用系統當前時間減開始時間。如果已經結束,就用結束時間減開始時間
if (endTime < 0 && parent.sc.isDefined) {
UIUtils.formatDuration(System.currentTimeMillis() - startTime)
} else if (endTime > 0) {
UIUtils.formatDuration(endTime - startTime)
}
}
</li>
<li>
<strong>Scheduling Mode: </strong> // 顯示調度模式,FIFO或FAIR
{listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
</li>
{
if (shouldShowActiveJobs) { // 如果有active狀態的job,則顯示Active Jobs有多少個
<li>
<a href="#active"><strong>Active Jobs:</strong></a>
{activeJobs.size}
</li>
}
}
{
if (shouldShowCompletedJobs) { // 如果有完成狀態的job,則顯示Completed Jobs的個數
<li id="completed-summary">
<a href="#completed"><strong>Completed Jobs:</strong></a>
{completedJobNumStr}
</li>
}
}
{
if (shouldShowFailedJobs) { // 如果有失敗狀態的job,則顯示Failed Jobs的個數
<li>
<a href="#failed"><strong>Failed Jobs:</strong></a>
{listener.numFailedJobs}
</li>
}
}
</ul>
</div>
var content = summary // 將上面的html代碼寫入content變量,在最后統一顯示content中的內容
val executorListener = parent.executorListener // 這里獲取EventTimeline中的信息
content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs,
executorListener.executorIdToData, startTime)
// 然后根據當前application中是否存在active, failed, completed狀態的job,將這些信息顯示在頁面上。
if (shouldShowActiveJobs) {
content ++= <h4 id="active">Active Jobs ({activeJobs.size})</h4> ++
activeJobsTable // 生成active狀態job的展示表格,具體形式可參看第二部分。按提交時間倒序排列
}
if (shouldShowCompletedJobs) {
content ++= <h4 id="completed">Completed Jobs ({completedJobNumStr})</h4> ++
completedJobsTable
}
if (shouldShowFailedJobs) {
content ++= <h4 id ="failed">Failed Jobs ({failedJobs.size})</h4> ++
failedJobsTable
}
val helpText = """A job is triggered by an action, like count() or saveAsTextFile().""" +
" Click on a job to see information about the stages of tasks inside it."
UIUtils.headerSparkPage("Spark Jobs", content, parent, helpText = Some(helpText)) // 最后將content中的所有內容全部展示在頁面上
}
}
接下來以activeJobsTable代碼為例分析Jobs信息展示表格的生成。這里主要的方法是makeRow,接收的是上面代碼中的activeJobs, completedJobs, failedJobs。這三個對象都是包含在JobProgressListener對象中的,在JobProgressListener中的定義如下:
// 這三個對象用於存儲數據的主要是JobUIData類型,
val activeJobs = new HashMap[JobId, JobUIData]
val completedJobs = ListBuffer[JobUIData]()
val failedJobs = ListBuffer[JobUIData]()
將上面三個對象傳入到下面這段代碼中,繼續執行。
private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = {
val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
val columns: Seq[Node] = { // 顯示的信息包括,Job Id(Job Group)以及Job描述,Job提交時間,Job運行時間,總的Stage/Task數,成功的Stage/Task數,以及一個進度條
<th>{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"}</th>
<th>Description</th>
<th>Submitted</th>
<th>Duration</th>
<th class="sorttable_nosort">Stages: Succeeded/Total</th>
<th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
}
def makeRow(job: JobUIData): Seq[Node] = {
val (lastStageName, lastStageDescription) = getLastStageNameAndDescription(job)
val duration: Option[Long] = {
job.submissionTime.map { start => // Job運行時長為系統時間,或者結束時間減去開始時間
val end = job.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
}
val formattedDuration = duration.map(d => // 格式化任務運行時間,顯示為a h:b m:c s格式UIUtils.formatDuration(d)).getOrElse("Unknown")
val formattedSubmissionTime = // 獲取Job提交時間job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
val jobDescription = UIUtils.makeDescription(lastStageDescription, parent.basePath) // 獲取任務描述
val detailUrl = // 點擊單個Job下面鏈接跳轉到JobPage頁面,傳入參數為jobId
"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
<tr id={"job-" + job.jobId}>
<td sorttable_customkey={job.jobId.toString}>
{job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
</td>
<td>
{jobDescription}
<a href={detailUrl} class="name-link">{lastStageName}</a>
</td>
<td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
{formattedSubmissionTime}
</td>
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
<td class="stage-progress-cell">
{job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
{if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
{if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
</td>
<td class="progress-cell"> // 進度條
{UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
failed = job.numFailedTasks, skipped = job.numSkippedTasks,
total = job.numTasks - job.numSkippedTasks)}
</td>
</tr>
}
<table class="table table-bordered table-striped table-condensed sortable">
<thead>{columns}</thead> // 顯示列名
<tbody>
{jobs.map(makeRow)} // 調用上面的row生成方法,具體顯示Job信息
</tbody>
</table>
}
從上面這些代碼中可以看到,Job頁面顯示的所有數據,都是從JobProgressListener對象中獲得的。SparkUI可以理解成一個JobProgressListener對象的消費者,頁面上顯示的內容都是JobProgressListener內在的展現。
在接下來一篇文章 Spark-1.6.0之Application運行信息記錄器JobProgressListener中會分析運行狀態數據是如何寫入JobProgressListener中的。
二、Spark UI界面實例
默認情況下,當一個Spark Application運行起來后,可以通過訪問hostname:4040端口來訪問UI界面。hostname是提交任務的Spark客戶端ip地址,端口號由參數spark.ui.port(默認值4040,如果被占用則順序往后探查)
來確定。由於啟動一個Application就會生成一個對應的UI界面,所以如果啟動時默認的4040端口號被占用,則嘗試4041端口,如果還是被占用則嘗試4042,一直找到一個可用端口號為止。
下面啟動一個Spark ThriftServer服務,並用beeline命令連接該服務,提交sql語句運行。則ThriftServer對應一個Application,每個sql語句對應一個Job,按照Job的邏輯划分Stage和Task。
1、Jobs頁面
連接上該端口后,顯示的就是上面的頁面,也是Job的主頁面。這里會顯示所有Active,Completed, Cancled以及Failed狀態的Job。默認情況下總共顯示1000條Job信息,這個數值由參數spark.ui.retainedJobs(默認值1000)
來確定。
從上面還看到,除了Jobs選項卡之外,還可顯示Stages, Storage, Enviroment, Executors, SQL以及JDBC/ODBC Server選項卡。分別如下圖所示。
2、Stages頁面
3、Storage頁面
4、Enviroment頁面
5、Executors頁面
6、單個Job包含的Stages頁面
7、Task頁面