Spark2.1.0——內置Web框架詳解
任何系統都需要提供監控功能,否則在運行期間發生一些異常時,我們將會束手無策。也許有人說,可以增加日志來解決這個問題。日志只能解決你的程序邏輯在運行期的監控,進而發現Bug,以及提供對業務有幫助的調試信息。當你的JVM進程奔潰或者程序響應速度很慢時,這些日志將毫無用處。好在JVM提供了jstat、jstack、jinfo、jmap、jhat等工具幫助我們分析,更有VisualVM的可視化界面以更加直觀的方式對JVM運行期的狀況進行監控。此外,像Tomcat、Hadoop等服務都提供了基於Web的監控頁面,用瀏覽器能訪問具有樣式及布局,並提供豐富監控數據的頁面無疑是一種簡單、高效的方式。
Spark自然也提供了Web頁面來瀏覽監控數據,而且Master、Worker、Driver根據自身功能提供了不同內容的Web監控頁面。無論是Master、Worker,還是Driver,它們都使用了統一的Web框架WebUI。Master、Worker及Driver分別使用MasterWebUI、WorkerWebUI及SparkUI提供的Web界面服務,后三者都繼承自WebUI,並增加了個性化的功能。此外,在Yarn或Mesos模式下還有WebUI的另一個擴展實現HistoryServer。HistoryServer將會展現已經運行完成的應用程序信息。本章以SparkUI為例,並深入分析WebUI的框架體系。
SparkUI概述
在大型分布式系統中,采用事件監聽機制是最常見的。為什么要使用事件監聽機制?假如Spark UI采用Scala的函數調用方式,那么隨着整個集群規模的增加,對函數的調用會越來越多,最終會受到Driver所在JVM的線程數量限制而影響監控數據的更新,甚至出現監控數據無法及時顯示給用戶的情況。由於函數調用多數情況下是同步調用,這就導致線程被阻塞,在分布式環境中,還可能因為網絡問題,導致線程被長時間占用。將函數調用更換為發送事件,事件的處理是異步的,當前線程可以繼續執行后續邏輯進而被快速釋放。線程池中的線程還可以被重用,這樣整個系統的並發度會大大增加。發送的事件會存入緩存,由定時調度器取出后,分配給監聽此事件的監聽器對監控數據進行更新。Spark UI就是這樣的服務,它的構成如圖1所示。

圖1 SparkUI的組成
圖1展示了SparkUI中的各個組件,這里對這些組件作簡單介紹:
- SparkListenerEvent事件的來源:包括DAGScheduler、SparkContext、DriverEndpoint、BlockManagerMasterEndpoint以及LocalSchedulerBackend等,這些組件將會產生各種SparkListenerEvent,並發送到listenerBus的事件隊列中。DriverEndpoint是Driver在Standalone或local-cluster模式下與其他組件進行通信的組件,在《Spark內核設計的藝術》一書的第9.9.2節有詳細介紹。BlockManagerMasterEndpoint是Driver對分配給應用的所有Executor及其BlockManager進行統一管理的組件,在《Spark內核設計的藝術》一書的6.8節詳細介紹。LocalSchedulerBackend是local模式下的調度后端接口,用於給任務分配資源或對任務的狀態進行更新,在《Spark內核設計的藝術》一書的7.8.2節詳細介紹。
- 事件總線listenerBus。根據3.3節對事件總線的介紹,我們知道listenerBus通過定時器將SparkListenerEvent事件匹配到具體的SparkListener,進而改變各個SparkListener中的統計監控數據。
- Spark UI的界面。各個SparkListener內的統計監控數據將會被各種標簽頁和具體頁面展示到Web界面。標簽頁有StagesTab、JobsTab、ExecutorsTab、EnvironmentTab以及StorageTab。每個標簽頁中包含若干個頁面,例如StagesTab標簽頁中包含了AllStagesPage、StagePage及PoolPage三個頁面。
- 控制台的展示。細心的讀者會發現圖1中還有SparkStatusTracker(Spark狀態跟蹤器)和ConsoleProgressBar(控制台進度條)兩個組件。SparkStatusTracker負責對Job和Stage的監控,其實際也是使用了JobProgressListener中的監控數據,並額外進行了一些加工。ConsoleProgressBar負責將SparkStatusTracker提供的數據打印到控制台上。從最終展現的角度來看,SparkStatusTracker和ConsoleProgressBar不應該屬於SparkUI的組成部分,但是由於其實現與JobProgressListener密切相關,所以將它們也放在了SparkUI的內容中。
WebUI框架體系
Spark UI構建在WebUI的框架體系之上,因此應當首先了解WebUI。WebUI定義了一種Web界面展現的框架,並提供返回Json格式數據的Web服務。WebUI用於展示一組標簽頁,WebUITab定義了標簽頁的規范。每個標簽頁中包含着一組頁面,WebUIPage定義了頁面的規范。我們將首先了解WebUIPage和WebUITab,最后從整體來看WebUI。
WebUIPage的定義
任何的Web界面往往由多個頁面組成,每個頁面都將提供不同的內容展示。WebUIPage是WebUI框架體系的頁節點,定義了所有頁面應當遵循的規范。抽象類WebUIPage的定義見代碼清單1。
代碼清單1 WebUIPage的定義
private[spark] abstract class WebUIPage(var prefix: String) {
def render(request: HttpServletRequest): Seq[Node]
def renderJson(request: HttpServletRequest): JValue = JNothing
}
WebUIPage定義了兩個方法。
- render:渲染頁面;
- renderJson:生成JSON。
WebUIPage在WebUI框架體系中的上一級節點(也可以稱為父親)可以是WebUI或者WebUITab,其成員屬性prefix將與上級節點的路徑一起構成當前WebUIPage的訪問路徑。
WebUITab的定義
有時候Web界面需要將多個頁面作為一組內容放置在一起,這時候標簽頁是常見的展現形式。標簽頁WebUITab定義了所有標簽頁的規范,並用於展現一組WebUIPage。抽象類WebUITab的定義見代碼清單2。
代碼清單2 WebUITab的定義
private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
val pages = ArrayBuffer[WebUIPage]()
val name = prefix.capitalize
def attachPage(page: WebUIPage) {
page.prefix = (prefix + "/" + page.prefix).stripSuffix("/")
pages += page
}
def headerTabs: Seq[WebUITab] = parent.getTabs
def basePath: String = parent.getBasePath
}
根據代碼清單2,可以看到WebUITab有四個成員屬性:
- parent:上一級節點,即父親。WebUITab的父親只能是WebUI。
- prefix:當前WebUITab的前綴。prefix將與上級節點的路徑一起構成當前WebUITab的訪問路徑。
- pages:當前WebUITab所包含的WebUIPage的緩沖數組。
- name:當前WebUITab的名稱。name實際是對prefix的首字母轉換成大寫字母后取得。
此外,WebUITab還有三個成員方法,下面介紹它們的作用:
- attachPage:首先將當前WebUITab的前綴與WebUIPage的前綴拼接,作為WebUIPage的訪問路徑。然后向pages中添加WebUIPage。
- headerTabs:獲取父親WebUI中的所有WebUITab。此方法實際通過調用父親WebUI的getTabs方法實現,getTabs方法請參閱下一小節——WebUI的定義。
- basePath:獲取父親WebUI的基本路徑。此方法實際通過調用父親WebUI的getBasePath方法實現,getBasePath方法請參閱下一小節——WebUI的定義。。
WebUI的定義
WebUI是Spark實現的用於提供Web界面展現的框架,凡是需要頁面展現的地方都可以繼承它來完成。WebUI定義了WebUI框架體系的規范。為便於理解,首先明確WebUI中各個成員屬性的含義:
- securityManager:SparkEnv中創建的安全管理器SecurityManager,5.2節對SecurityManager有詳細介紹。
- sslOptions:使用SecurityManager獲取spark.ssl.ui屬性指定的WebUI的SSL(Secure Sockets Layer 安全套接層)選項。
- port:WebUI對外服務的端口。可以使用spark.ui.port屬性進行配置。
- conf:即SparkConf。
- basePath:WebUI的基本路徑。basePath默認為空字符串。
- name:WebUI的名稱。Spark UI的name為SparkUI。
- tabs:WebUITab的緩沖數組。
- handlers:ServletContextHandler的緩沖數組。ServletContextHandler是Jetty提供的API,負責對ServletContext進行處理。ServletContextHandler的使用及Jetty的更多內容可以參閱附錄C。
- pageToHandlers:WebUIPage與ServletContextHandler緩沖數組之間的映射關系。由於WebUIPage的兩個方法render和renderJson分別需要由一個對應的ServletContextHandler處理。所以一個WebUIPage對應兩個ServletContextHandler。
- serverInfo:用於緩存ServerInfo,即WebUI的Jetty服務器信息。
- publicHostName:當前WebUI的Jetty服務的主機名。優先采用系統環境變量SPARK_PUBLIC_DNS指定的主機名,否則采用spark.driver.host屬性指定的host,在沒有前兩個配置的時候,將默認使用工具類Utils的localHostName方法(詳見附錄A)返回的主機名。
- className:過濾了$符號的當前類的簡單名稱。className 是通過Utils的getFormattedClassName方法得到的。getFormattedClassName方法的實現請看附錄A。
了解了WebUI的成員屬性,現在就可以理解其提供的各個方法了。WebUI提供的方法有:
- getBasePath:獲取basePath。
- getTabs:獲取tabs中的所有WebUITab,並以Scala的序列返回。
- getHandlers:獲取handlers中的所有ServletContextHandler,並以Scala的序列返回。
- getSecurityManager:獲取securityManager。
- attachHandler:給handlers緩存數組中添加ServletContextHandler,並且將此ServletContextHandler通過ServerInfo的addHandler方法添加到Jetty服務器中。attachHandler的實現見代碼清單3。ServerInfo的addHandler方法的請參閱附錄C。
代碼清單3 attachHandler的實現
def attachHandler(handler: ServletContextHandler) {
handlers += handler
serverInfo.foreach(_.addHandler(handler))
}
- detachHandler:從handlers緩存數組中移除ServletContextHandler,並且將此ServletContextHandler通過ServerInfo的removeHandler方法從Jetty服務器中移除。detachHandler的實現見代碼清單4。ServerInfo的removeHandler方法的請參閱附錄C。
代碼清單4 detachHandler的實現
def detachHandler(handler: ServletContextHandler) {
handlers -= handler
serverInfo.foreach(_.removeHandler(handler))
}
- attachPage:首先調用工具類JettyUtils[1]的createServletHandler方法給WebUIPage創建與render和renderJson兩個方法分別關聯的ServletContextHandler,然后通過attachHandler方法添加到handlers緩存數組與Jetty服務器中,最后把WebUIPage與這兩個ServletContextHandler的映射關系更新到pageToHandlers中。attachPage的實現見代碼清單5。
代碼清單5 attachPage的實現
def attachPage(page: WebUIPage) {
val pagePath = "/" + page.prefix
val renderHandler = createServletHandler(pagePath,
(request: HttpServletRequest) => page.render(request), securityManager, conf, basePath)
val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json",
(request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath)
attachHandler(renderHandler)
attachHandler(renderJsonHandler)
val handlers = pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
handlers += renderHandler
}
- detachPage:作用與attachPage相反。detachPage的實現見代碼清單6。
代碼清單6 detachPage的實現
def detachPage(page: WebUIPage) {
pageToHandlers.remove(page).foreach(_.foreach(detachHandler))
}
- attachTab:首先向tabs中添加WebUITab,然后給WebUITab中的每個WebUIPage施加attachPage方法。attachTab的實現見代碼清單7。
代碼清單7 attachTab的實現
def attachTab(tab: WebUITab) {
tab.pages.foreach(attachPage)
tabs += tab
}
- detachTab:作用與attachTab相反。detachTab的實現見代碼清單8。
代碼清單8 detachTab的實現
def detachTab(tab: WebUITab) {
tab.pages.foreach(detachPage)
tabs -= tab
}
- addStaticHandler:首先調用工具類JettyUtils的createStaticHandler方法創建靜態文件服務的ServletContextHandler,然后施加attachHandler方法。addStaticHandler的實現見代碼清單9。JettyUtils的createStaticHandler方法的實現見附錄C。
代碼清單9 addStaticHandler的實現
def addStaticHandler(resourceBase: String, path: String): Unit = {
attachHandler(JettyUtils.createStaticHandler(resourceBase, path))
}
- removeStaticHandler:作用與addStaticHandler相反。removeStaticHandler的實現見代碼清單10。
代碼清單10 removeStaticHandler的實現
def removeStaticHandler(path: String): Unit = {
handlers.find(_.getContextPath() == path).foreach(detachHandler)
}
- initialize:用於初始化WebUI服務中的所有組件。WebUI中此方法未實現,需要子類實現。
- bind:啟動與WebUI綁定的Jetty服務。bind方法的實現見代碼清單11。
代碼清單11 bind的實現
def bind() {
assert(!serverInfo.isDefined, s"Attempted to bind $className more than once!")
try {
val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name))
logInfo(s"Bound $className to $host, and started at $webUrl")
} catch {
case e: Exception =>
logError(s"Failed to bind $className", e)
System.exit(1)
}
}
- webUrl:獲取WebUI的Web界面的URL。webUrl的實現如下:
def webUrl: String = shttp://$publicHostName:$boundPort
- boundPort:獲取WebUI的Jetty服務的端口。boundPort的實現如下:
def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
- stop:停止WebUI。實際是停止WebUI底層的Jetty服務。stop方法的實現見代碼清單12。
代碼清單12 stop方法的實現
def stop() {
assert(serverInfo.isDefined,
s"Attempted to stop $className before binding to a server!")
serverInfo.get.stop()
}
創建SparkUI
在SparkContext的初始化過程中,會創建SparkUI。有了對WebUI的總體認識,現在是時候了解SparkContext是如何構造SparkUI的了。SparkUI是WebUI框架的使用范例,了解了SparkUI的創建過程,讀者對MasterWebUI、WorkerWebUI及HistoryServer的創建過程也必然了然於心。創建SparkUI的代碼如下:
_statusTracker = new SparkStatusTracker(this)
_progressBar =
if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
}
_ui.foreach(_.bind())
這段代碼的執行步驟如下。
1) 創建Spark狀態跟蹤器SparkStatusTracker。
2) 創建ConsoleProgressBar。可以配置spark.ui.showConsoleProgress屬性為false取消對ConsoleProgressBar的創建,此屬性默認為true。
3) 調用SparkUI的createLiveUI方法創建SparkUI。
4) 給SparkUI綁定端口。SparkUI繼承自WebUI,因此調用了代碼清單4-12中WebUI的bind方法啟動SparkUI底層的Jetty服務。
上述步驟中,第1)、2)、4)步都很簡單,所以着重來分析第3)步。SparkUI的createLiveUI的實現如下。
def createLiveUI(
sc: SparkContext,
conf: SparkConf,
listenerBus: SparkListenerBus,
jobProgressListener: JobProgressListener,
securityManager: SecurityManager,
appName: String,
startTime: Long): SparkUI = {
create(Some(sc), conf, listenerBus, securityManager, appName,
jobProgressListener = Some(jobProgressListener), startTime = startTime)
}
可以看到SparkUI的createLiveUI方法中調用了create方法。create的實現如下。
private def create(
sc: Option[SparkContext],
conf: SparkConf,
listenerBus: SparkListenerBus,
securityManager: SecurityManager,
appName: String,
basePath: String = "",
jobProgressListener: Option[JobProgressListener] = None,
startTime: Long): SparkUI = {
val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
val listener = new JobProgressListener(conf)
listenerBus.addListener(listener)
listener
}
val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)
listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
listenerBus.addListener(operationGraphListener)
new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
executorsListener, _jobProgressListener, storageListener, operationGraphListener,
appName, basePath, startTime)
}
可以看到create方法里除了JobProgressListener是外部傳入的之外,又增加了一些SparkListener,例如用於對JVM參數、Spark屬性、Java系統屬性、classpath等進行監控的EnvironmentListener;用於維護Executor的存儲狀態的StorageStatusListener;用於准備將Executor的信息展示在ExecutorsTab的ExecutorsListener;用於准備將Executor相關存儲信息展示在BlockManagerUI的StorageListener;用於構建RDD的DAG(有向無關圖)的RDDOperationGraphListener等。這5個SparkListener的實現添加到listenerBus的監聽器列表中。最后使用SparkUI的構造器創建SparkUI。
SparkUI的初始化
調用SparkUI的構造器創建SparkUI,實際也是對SparkUI的初始化過程。在介紹初始化之前,先來看看SparkUI中的兩個成員屬性。
- killEnabled:標記當前SparkUI能否提供殺死Stage或者Job的鏈接。
- appId:當前應用的ID。
SparkUI的構造過程中會執行initialize方法,其實現見代碼清單13。
代碼清單13 SparkUI的初始化
def initialize() {
val jobsTab = new JobsTab(this)
attachTab(jobsTab)
val stagesTab = new StagesTab(this)
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
attachHandler(ApiRootResource.getServletHandler(this))
// These should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler(
"/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
attachHandler(createRedirectHandler(
"/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
httpMethods = Set("GET", "POST")))
}
initialize()
根據代碼清單13,SparkUI的初始化步驟如下。
1) 構建頁面布局並給每個WebUITab中的所有WebUIPage創建對應的ServletContextHandler。這一步使用了代碼清單4-8中展示的attachTab方法。
2) 調用JettyUtils的createStaticHandler方法創建對靜態目錄org/apache/spark/ui/static提供文件服務的ServletContextHandler,並使用attachHandler方法追加到SparkUI的服務中。
3) 調用JettyUtils的createRedirectHandler方法創建幾個將用戶對源路徑的請求重定向到目標路徑的ServletContextHandler。例如,將用戶對根路徑"/"的請求重定向到目標路徑"/jobs/"的ServletContextHandler。
SparkUI的頁面布局與展示
SparkUI究竟是如何實現頁面布局及展示的? 由於所有標簽頁都繼承了SparkUITab,所以我們先來看看SparkUITab的實現:
private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)
extends WebUITab(parent, prefix) {
def appName: String = parent.getAppName
}
根據上述代碼,我們知道SparkUITab繼承了WebUITab,並在實現中增加了一個用於獲取當前應用名稱的方法appName。EnvironmentTab是用於展示JVM、Spark屬性、系統屬性、類路徑等相關信息的標簽頁,由於其實現簡單且能說明問題,所以本節挑選EnvironmentTab作為示例解答本節一開始提出的問題。
EnvironmentTab的實現見代碼清單14。
代碼清單14 EnvironmentTab的實現
private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") {
val listener = parent.environmentListener
attachPage(new EnvironmentPage(this))
}
根據代碼清單14,我們知道EnvironmentTab引用了SparkUI的environmentListener(類型為EnvironmentListener),並且包含EnvironmentPage這個頁面。EnvironmentTab通過調用attachPage方法將EnvironmentPage與Jetty服務關聯起來。根據代碼清單5中attachPage的實現,創建的renderHandler將采用偏函數(request: HttpServletRequest) => page.render(request) 處理請求,因而會調用EnvironmentPage的render方法。EnvironmentPage的render方法將會渲染頁面元素。EnvironmentPage的實現見代碼清單15。
代碼清單15 EnvironmentPage的實現
private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") {
private val listener = parent.listener
private def removePass(kv: (String, String)): (String, String) = {
if (kv._1.toLowerCase.contains("password") || kv._1.toLowerCase.contains("secret")) {
(kv._1, "******")
} else kv
}
def render(request: HttpServletRequest): Seq[Node] = {
// 調用UIUtils的listingTable方法生成JVM運行時信息、Spark屬性信息、系統屬性信息、類路徑信息的表格
val runtimeInformationTable = UIUtils.listingTable(
propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
val sparkPropertiesTable = UIUtils.listingTable(
propertyHeader, propertyRow, listener.sparkProperties.map(removePass), fixedWidth = true)
val systemPropertiesTable = UIUtils.listingTable(
propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true)
val classpathEntriesTable = UIUtils.listingTable(
classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true)
val content =
<span>
<h4>Runtime Information</h4> {runtimeInformationTable}
<h4>Spark Properties</h4> {sparkPropertiesTable}
<h4>System Properties</h4> {systemPropertiesTable}
<h4>Classpath Entries</h4> {classpathEntriesTable}
</span>
// 調用UIUtils的headerSparkPage方法封裝好css、js、header及頁面布局等
UIUtils.headerSparkPage("Environment", content, parent)
}
// 定義JVM運行時信息、Spark屬性信息、系統屬性信息的表格頭部propertyHeader和類路徑信息的表格頭部
// classPathHeaders
private def propertyHeader = Seq("Name", "Value")
private def classPathHeaders = Seq("Resource", "Source")
// 定義JVM運行時信息的表格中每行數據的生成方法jvmRow
private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
}
根據代碼清單15,EnvironmentPage的render方法利用從父節點EnvironmentTab中得到的EnvironmentListener中的統計監控數據生成JVM運行時、Spark屬性、系統屬性以及類路徑等狀態的摘要信息。以JVM運行時為例,頁面渲染的步驟如下:
1) 定義JVM運行時信息、Spark屬性信息、系統屬性信息的表格頭部propertyHeader和類路徑信息的表格頭部classPathHeaders。
2) 定義JVM運行時信息的表格中每行數據的生成方法jvmRow。
3) 調用UIUtils的listingTable方法生成JVM運行時信息、Spark屬性信息、系統屬性信息、類路徑信息的表格。
4) 調用UIUtils的headerSparkPage方法封裝好css、js、header及頁面布局等。
UIUtils工具類的實現細節留給感興趣的讀者自行查閱,本文不多贅述。
[1]本節內容用到JettyUtils中的很多方法,讀者可以在附錄C中找到相應的實現與說明。
關於《Spark內核設計的藝術 架構設計與實現》
經過近一年的准備,基於Spark2.1.0版本的《Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:

紙質版售賣鏈接如下:
