導入:
1)Spark Web UI主要依賴於流行的Servlet容器Jetty實現;
2)Spark Web UI(Spark2.3之前)是展示運行狀況、資源狀態和監控指標的前端,而這些數據都是由度量系統(MetricsSystem)收集來的;
3)Spark Web UI(spark2.3之后)呈現的數據應該與事件總線和ElementTrackingStore關系緊密,而MetricsSystem是一個向外部提供測量指標的存在
具體Spark UI存儲更改可以通過spark issue查看:
Key-value store abstraction and implementation for storing application data
Use key-value store to keep History Server application listing
Hook up Spark UI to the new key-value store backend
Implement listener for saving application status data in key-value store
Make Environment page use new app state store
Make Executors page use new app state store
Spark UI界面可以包含選項卡:Jobs,Stages,Storage,Enviroment,Executors,SQL

Spark UI(http server)是如何被啟動?
接下來讓我們從源碼入手查看下Spark UI(http server)是如何被啟動的,頁面中的數據從哪里獲取到。
Spark UI中用到的http server是jetty,jetty采用java編寫,是比較不錯的servlet engine和http server,能嵌入到用戶程序中執行,不用用tomcat或jboss那樣需要運行在獨立jvm進程中。
1)SparkContext初始化時啟動SparkUI
Spark UI(http server)在SparkContext初始化的時候被創建:
...
private var _listenerBus: LiveListenerBus = _
private var _statusStore: AppStatusStore = _
... private[spark] def ui: Option[SparkUI] = _ui _listenerBus = new LiveListenerBus(_conf) // Initialize the app status store and listener before SparkEnv is created so that it gets // all events. _statusStore = AppStatusStore.createLiveStore(conf) listenerBus.addToStatusQueue(_statusStore.listener.get)
。。。。
_ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime)) } else { // For tests, do not enable the UI None } // Bind the UI before starting the task scheduler to communicate // the bound port to the cluster manager properly _ui.foreach(_.bind()) 。。。 _ui.foreach(_.setAppId(_applicationId))
...
其中,_statusStore是AppStatusStore初始化對象,它內部包裝了KVStore和AppStatusListener:
- KVStore用於存儲監控數據,
- AppStatusListener注冊到事件總線中的appStatus隊列中。
_env.securityManager則是SparkEnv中初始化的安全管理器。
SparkContext通過調用SparkUI伴生對象中的create()方法來直接new出SparkUI實例,然后調用bind()方法將SparkUI綁定到Jetty服務。
2)SparkUI類對象初始化
SparkUI調用create方法后會初始化一個SparkUI對象,在SparkUI對象被初始化時,會調用SparkUI的initialize()方法
private[spark] class SparkUI private ( val store: AppStatusStore, val sc: Option[SparkContext], val conf: SparkConf, securityManager: SecurityManager, var appName: String, val basePath: String, val startTime: Long, val appSparkVersion: String) extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf), conf, basePath, "SparkUI") with Logging with UIRoot { val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) var appId: String = _ private var streamingJobProgressListener: Option[SparkListener] = None /** Initialize all components of the server. */ def initialize(): Unit = { val jobsTab = new JobsTab(this, store) attachTab(jobsTab) val stagesTab = new StagesTab(this, store) attachTab(stagesTab) attachTab(new StorageTab(this, store)) attachTab(new EnvironmentTab(this, store)) attachTab(new ExecutorsTab(this)) addStaticHandler(SparkUI.STATIC_RESOURCE_DIR) attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath)) attachHandler(ApiRootResource.getServletHandler(this)) // These should be POST only, but, the YARN AM proxy won't proxy POSTs attachHandler(createRedirectHandler( "/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST"))) attachHandler(createRedirectHandler( "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest, httpMethods = Set("GET", "POST"))) }
SparkUI類中有3個屬性成員:
- killEnabled由配置項spark.ui.killEnable控制。如果為true,則會在Spark Web UI界面中展示強行殺掉Spark Application Job的開關;
- appId就是當前的Application ID;
- streamingJobProgressListener是用於Spark Streaming作業進度的監聽器。
在initialize()方法中,
- 首先,會創建JobsTab、StagesTab、StorageTab、EnvironmentTab、ExecutorsTab這5個Tab,並調用了attachTab()方法注冊到Web UI。這里的Tab是Spark UI中的標簽頁,參考上圖,名稱也是一一對應。
- 然后,調用addStaticHandler()方法創建靜態資源的ServletContextHandler,又調用createRedirectHandler()創建一些重定向的ServletContextHandler。
- 最后,逐一調用attachHandler()方法注冊到Web UI。
備注:ServletContextHandler是Jetty中一個功能完善的處理器,負責接收並處理HTTP請求,再投遞給Servlet。

上邊每個JobsTab、StagesTab、StorageTab、EnvironmentTab、ExecutorsTab除了包含有渲染頁面類,還包含資源html&js&css&其他(圖片)(https://github.com/apache/spark/tree/branch-2.4/core/src/main/resources/org/apache/spark/ui/static)
3)執行bind()方法啟動jetty服務
在上邊SparkContext初始化時,創建了SparkUI對象,將會調用bind()方法將SparkUI綁定到Jetty服務,這個bind()方法SparkUI子類WebUI中的一個方法。
WebUI屬性成員和Getter方法
protected val tabs = ArrayBuffer[WebUITab]() protected val handlers = ArrayBuffer[ServletContextHandler]() protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]] protected var serverInfo: Option[ServerInfo] = None protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse( conf.get(DRIVER_HOST_ADDRESS)) private val className = Utils.getFormattedClassName(this) def getBasePath: String = basePath def getTabs: Seq[WebUITab] = tabs def getHandlers: Seq[ServletContextHandler] = handlers def getSecurityManager: SecurityManager = securityManager
https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala
WebUI屬性成員有6個:
- tabls:持有WebUITab(Web UI中的tab頁)的緩存;
- handlers:持有Jetty ServletContextHandler的緩存;
- pageToHandlers:保存WebUIPage(WebUITab的下一級組件)與其對應的ServletContextHandler的映射關系;
- serverInfo:當前Web UI對應的Jetty服務器信息;
- publicHostName:當前Web UI對應的Jetty服務主機名。先通過系統環境變量SPARK_PUBLIC_DNS獲取,在通過spark.driver.host配置項獲取。
- className:當前類的名稱,用Utils.getFormattedClassName()方法格式化過。
Getter方法有4個:
- getTabs()和getHandlers()都是簡單地獲得對應屬性的值;
- getBasePath()獲得構造參數中定義的Web UI基本路勁;
- getSecurityManager()則取得構造參數中傳入的安全管理器。
WebUI提供的attache/detach類方法
這些方法都是成對出現,一共有3對:
- attachTab/detachTab:用於注冊和移除WebUIPage;
- attachPage/detachPage:用於注冊和移除WebUIPage;
- attachHandler/detaHandler:用於注冊和移除ServletContextPage。
/** Attaches a tab to this UI, along with all of its attached pages. */ def attachTab(tab: WebUITab): Unit = { tab.pages.foreach(attachPage) tabs += tab } /** Detaches a tab from this UI, along with all of its attached pages. */ def detachTab(tab: WebUITab): Unit = { tab.pages.foreach(detachPage) tabs -= tab }
/** Detaches a page from this UI, along with all of its attached handlers. */ def detachPage(page: WebUIPage): Unit = { pageToHandlers.remove(page).foreach(_.foreach(detachHandler)) } /** Attaches a page to this UI. */ def attachPage(page: WebUIPage): Unit = { val pagePath = "/" + page.prefix val renderHandler = createServletHandler(pagePath, (request: HttpServletRequest) => page.render(request), securityManager, conf, basePath) val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json", (request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath) attachHandler(renderHandler) attachHandler(renderJsonHandler) val handlers = pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]()) handlers += renderHandler }
/** Attaches a handler to this UI. */ def attachHandler(handler: ServletContextHandler): Unit = { handlers += handler serverInfo.foreach(_.addHandler(handler)) } /** Detaches a handler from this UI. */ def detachHandler(handler: ServletContextHandler): Unit = { handlers -= handler serverInfo.foreach(_.removeHandler(handler)) } def detachHandler(path: String): Unit = { handlers.find(_.getContextPath() == path).foreach(detachHandler) }
def addStaticHandler(resourceBase: String, path: String = "/static"): Unit = { attachHandler(JettyUtils.createStaticHandler(resourceBase, path)) }
https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala
attachPage()方法流程:
- 1)調用Jetty工具類JettyUtils的createServletHandler()方法,為WebUIPage的兩個渲染方法render()和readerJson()創建ServletContextHandler,也就是一個WebUIPage需要對應兩個處理器。
- 2)然后,調用上述attachHandler()方法向Jetty注冊處理器,並將映射關系寫入handlers結構中。
WebUI綁定到Jetty服務
/** Binds to the HTTP server behind this web interface. */ def bind(): Unit = { assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!") try { val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name)) logInfo(s"Bound $className to $host, and started at $webUrl") } catch { case e: Exception => logError(s"Failed to bind $className", e) System.exit(1) } }
https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala
這個bind()方法,包含節點信息:
a)其中調用startJettyServer(...)方法,該方法是JettyUtils.scala中的一個方法,這點也說明了SparkUI運行時基於jetty實現的。
b)調用startjettyServer(...)方法傳遞了host,port參數,這兩個參數也是Spark UI訪問的ip和端口,我們需要了解下這兩個參數具體的配置在哪里。
√ host的獲取代碼:
val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
spark-env.sh包含了參數(SPARK_LOCAL_IP、SPARK_PUBLIC_DNS):
# Options read when launching programs locally with # ./bin/run-example or ./bin/spark-submit # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public dns name of the driver program # Options read by executors and drivers running inside the cluster # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program # - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data # - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos
https://github.com/apache/spark/blob/branch-2.4/conf/spark-env.sh.template
√ ip的獲取代碼在SparkUI object靜態類中
private[spark] object SparkUI { val DEFAULT_PORT = 4040 val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" val DEFAULT_POOL_NAME = "default" def getUIPort(conf: SparkConf): Int = { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) } /** * Create a new UI backed by an AppStatusStore. */ def create( sc: Option[SparkContext], store: AppStatusStore, conf: SparkConf, securityManager: SecurityManager, appName: String, basePath: String, startTime: Long, appSparkVersion: String = org.apache.spark.SPARK_VERSION): SparkUI = { new SparkUI(store, sc, conf, securityManager, appName, basePath, startTime, appSparkVersion) } }
Spark Web UI渲染
Spark Web UI實際上是一個三層的樹形結構,樹根節點為WebUI,中層節點是WebUITab,葉子節點是WebUIPage。
UI界面的展示就主要靠WebUITab與WebUIPage來實現。在Spark UI界面中,一個Tab(WebUITab)可以包含一個或多個Page(WebUIPage),且Tab(WebUITab)是可選的。
WebUI定義:上邊有講解,這里就不再貼代碼。https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala
WebUITab定義:
/** * A tab that represents a collection of pages. * The prefix is appended to the parent address to form a full path, and must not contain slashes. */ private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) { val pages = ArrayBuffer[WebUIPage]() val name = prefix.capitalize /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */ def attachPage(page: WebUIPage) { page.prefix = (prefix + "/" + page.prefix).stripSuffix("/") pages += page } /** Get a list of header tabs from the parent UI. */ def headerTabs: Seq[WebUITab] = parent.getTabs def basePath: String = parent.getBasePath }
https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala下定義的。
- 由於一個Tab(WebUITab)可以包含多個Page(WebUIPage),因此WebUITab中屬性val pages = ArrayBuffer[WebUIPage]()數組就是用來緩存該Tab(WebUITab)下所有的Page(WebUIPage)。
- attachPage(...)方法就用於將Tab(WebUITab)的路徑前綴與Page(WebUIPage)的路徑前綴拼接在一起,並將其寫入pages數組中。
WebUIPage定義:
/** * A page that represents the leaf node in the UI hierarchy. * * The direct parent of a WebUIPage is not specified as it can be either a WebUI or a WebUITab. * If the parent is a WebUI, the prefix is appended to the parent's address to form a full path. * Else, if the parent is a WebUITab, the prefix is appended to the super prefix of the parent * to form a relative path. The prefix must not contain slashes. */ private[spark] abstract class WebUIPage(var prefix: String) { def render(request: HttpServletRequest): Seq[Node] def renderJson(request: HttpServletRequest): JValue = JNothing }
https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala下定義的。
- render(...)方法用於渲染頁面;
- renderJson(...)方法用於生成渲染頁面對應的JSON字符串。
WebUITab與WebUIPage各有一系列實現類,具體請參考代碼:https://github.com/apache/spark/tree/branch-2.4/core/src/main/scala/org/apache/spark/ui/exec
渲染SparkUI頁面
以Executors這個Tab頁為例,因為這個頁面具有代表性,一個Tab下可以展示兩個Page(ExecutorsPage、ExecutorThreadDumpPage)
在Spark UI中Tab下包含頁面如下:
executors->ExecutorsPage
http://ip:8088/proxy/application_1558494459870_0005/executors/threadDump/?executorId=1

executors->ExecutorThreadDumpPage
http://ip:8088/proxy/application_1558494459870_0005/executors/threadDump/?executorId=[executorId或者driver]

首先看下ExecutorsTab的代碼
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") { init() private def init(): Unit = { val threadDumpEnabled = parent.sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true) attachPage(new ExecutorsPage(this, threadDumpEnabled)) if (threadDumpEnabled) { attachPage(new ExecutorThreadDumpPage(this, parent.sc)) } } }
其中SparkUITab就是對WebUITab的簡單封裝,加上了Application名稱和Spark版本屬性。ExecutorsTab類包含了init()方法,在構造函數中調用了該init()方法,init()方法內部調用了SparkUITab類預定義好的attachPage(...)方法,將ExecutorsPage加入,當屬性threadDumpEnabled為true時,也將ExecutorThreadDumpPage加入。
再來看下ExecutorsPage的代碼
private[ui] class ExecutorsPage( parent: SparkUITab, threadDumpEnabled: Boolean) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { val content = <div> { <div id="active-executors" class="row-fluid"></div> ++ <script src={UIUtils.prependBaseUri(request, "/static/utils.js")}></script> ++ <script src={UIUtils.prependBaseUri(request, "/static/executorspage.js")}></script> ++ <script>setThreadDumpEnabled({threadDumpEnabled})</script> } </div> UIUtils.headerSparkPage(request, "Executors", content, parent, useDataTables = true) } }
render()方法用來渲染頁面內容,其流程如下:
1)將content內容封裝好;
2)調用UIUtils.headerSparkPage()方法,將content內容響應給瀏覽器;
3)瀏覽器加載過程中會調用executorspage.js,該JS內部會通過Rest服務器根據當前applicationId,去獲取allexecutos等信息,並將allexecutos信息按照模板executorspage-template.html渲染到executors頁面上。

spark rest 服務實現代碼路徑:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/status/api/v1/
返回allexecutors信息的方法:
@GET @Path("allexecutors") def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false))
這里的store正式SparkUI的store。從該方法可以看出來:實際上,spark rest服務提供的數據是存儲在SparkContext的AppStatusStore對象store上。
最后看下ExecutorThreadDumpPage的代碼
private[ui] class ExecutorThreadDumpPage( parent: SparkUITab, sc: Option[SparkContext]) extends WebUIPage("threadDump") { // stripXSS is called first to remove suspicious characters used in XSS attacks def render(request: HttpServletRequest): Seq[Node] = { val executorId = Option(UIUtils.stripXSS(request.getParameter("executorId"))).map { executorId => UIUtils.decodeURLParameter(executorId) }.getOrElse { throw new IllegalArgumentException(s"Missing executorId parameter") } val time = System.currentTimeMillis() val maybeThreadDump = sc.get.getExecutorThreadDump(executorId) val content = maybeThreadDump.map { threadDump => val dumpRows = threadDump.map { thread => val threadId = thread.threadId val blockedBy = thread.blockedByThreadId match { case Some(_) => <div> Blocked by <a href={s"#${thread.blockedByThreadId}_td_id"}> Thread {thread.blockedByThreadId} {thread.blockedByLock}</a> </div> case None => Text("") } val heldLocks = thread.holdingLocks.mkString(", ") <tr id={s"thread_${threadId}_tr"} class="accordion-heading" onclick={s"toggleThreadStackTrace($threadId, false)"} onmouseover={s"onMouseOverAndOut($threadId)"} onmouseout={s"onMouseOverAndOut($threadId)"}> <td id={s"${threadId}_td_id"}>{threadId}</td> <td id={s"${threadId}_td_name"}>{thread.threadName}</td> <td id={s"${threadId}_td_state"}>{thread.threadState}</td> <td id={s"${threadId}_td_locking"}>{blockedBy}{heldLocks}</td> <td id={s"${threadId}_td_stacktrace"} class="hidden">{thread.stackTrace.html}</td> </tr> } <div class="row-fluid"> <p>Updated at {UIUtils.formatDate(time)}</p> { // scalastyle:off <p><a class="expandbutton" onClick="expandAllThreadStackTrace(true)"> Expand All </a></p> <p><a class="expandbutton hidden" onClick="collapseAllThreadStackTrace(true)"> Collapse All </a></p> <div class="form-inline"> <div class="bs-example" data-example-id="simple-form-inline"> <div class="form-group"> <div class="input-group"> Search: <input type="text" class="form-control" id="search" oninput="onSearchStringChange()"></input> </div> </div> </div> </div> <p></p> // scalastyle:on } <table class={UIUtils.TABLE_CLASS_STRIPED + " accordion-group" + " sortable"}> <thead> <th onClick="collapseAllThreadStackTrace(false)">Thread ID</th> <th onClick="collapseAllThreadStackTrace(false)">Thread Name</th> <th onClick="collapseAllThreadStackTrace(false)">Thread State</th> <th onClick="collapseAllThreadStackTrace(false)">Thread Locks</th> </thead> <tbody>{dumpRows}</tbody> </table> </div> }.getOrElse(Text("Error fetching thread dump")) UIUtils.headerSparkPage(request, s"Thread dump for executor $executorId", content, parent) } }
該頁面主要展示當前executor中thread運行情況。
參考:
《Apache Spark源碼走讀之21 -- WEB UI和Metrics初始化及數據更新過程分析》
《Spark Structrued Streaming源碼分析--(四)ProgressReporter每個流處理進度計算、StreamQueryManager管理運行的流》
《Spark Core源碼精讀計划#13:度量系統MetricsSystem的建立》
《Spark Core源碼精讀計划#14:Spark Web UI界面的實現》
