歡迎轉載,轉載請注明出處,徽滬一郎.
概要
WEB UI和Metrics子系統為外部觀察監測Spark內部運行情況提供了必要的窗口,本文將簡略的過一下其內部代碼實現。
WEB UI
先上圖感受一下spark webui 假設當前已經在本機運行standalone cluster模式,輸入http://127.0.0.1:8080將會看到如下頁面
driver application默認會打開4040端口進行http監聽,可以看到application相關的詳細信息
顯示每個stage的詳細信息
啟動過程
本節要討論的重點是http server是如何啟動的,頁面中的數據是從哪里獲取到的?Spark中用到的http server是jetty, jetty采用java編寫,是非常輕巧的servlet engine和http server。能夠嵌入到用戶程序中執行,不用像tomcat或jboss那樣需要自己獨立的jvm進程。
SparkUI在SparkContext初始化的時候創建
// Initialize the Spark UI , registering all
associated listeners
private [spark] val ui = new SparkUI (this)
ui.bind ()
initialize的主要工作是注冊頁面處理句柄,WebUI的子類需要實現自己的initialize函數
bind將真正啟動jetty server.
def bind () {
assert (! serverInfo .isDefined , " Attempted to bind %
s more than once!". format ( className ))
try {
// 啟 動 JettyServer
serverInfo = Some( startJettyServer (" 0.0.0.0 ",
port , handlers , conf))
logInfo (" Started %s at http ://%s:%d". format (
className , publicHostName , boundPort ))
} catch {
case e: Exception =>
logError (" Failed to bind %s". format ( className )
, e)
System .exit (1)
}
}
在startJettyServer函數中將JettyServer運行起來的關鍵處理函數是connect
def connect(currentPort: Int): (Server, Int) = {
val server = new Server(new InetSocketAddress(hostName, currentPort))
val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
server.setHandler(collection)
Try {
server.start()
} match {
case s: Success[_] =>
(server, server.getConnectors.head.getLocalPort)
case f: Failure[_] =>
val nextPort = (currentPort + 1) % 65536
server.stop()
pool.stop()
val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort."
if (f.toString.contains("Address already in use")) {
logWarning(s"$msg - $f")
} else {
logError(msg, f.exception)
}
connect(nextPort)
}
}
val (server, boundPort) = connect(port)
ServerInfo(server, boundPort, collection)
}
數據獲取
頁面中的數據是如何獲取的呢,這就要歸功於SparkListener了,典型的觀察者設計模式。當有與stage及task相關的事件發生時,這些Listener都將收到通知,並進行數據更新。
需要指出的是,數據盡管得以自動更新,但頁面並沒有,還是需要手工刷新才能得到最新的數據。
上圖顯示的是SparkUI中注冊了哪些SparkListener子類。來看一看這些子類是在什么時候注冊進去的, 注意研究一下SparkUI.initialize函
def initialize() {
listenerBus.addListener(storageStatusListener)
val jobProgressTab = new JobProgressTab(this)
attachTab(jobProgressTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))
attachHandler(
createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))
if (live) {
sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
}
}
舉一個實際例子來看看Notifier發送Event的時刻,比如有任務提交的時 resourceOffer->taskStarted->handleBeginEvent
private [ scheduler ] def handleBeginEvent (task: Task[_
], taskInfo : TaskInfo ) {
listenerBus .post( SparkListenerTaskStart (task.
stageId , taskInfo ))
submitWaitingStages ()
}
post其實是向listenerBus的消息隊列中添加一個消息,真正將消息發送 出去的時另一個處理線程listenerThread
override def run (): Unit = Utils.
logUncaughtExceptions {
while (true) {
eventLock . acquire ()
// Atomically remove and process this event
LiveListenerBus .this. synchronized {
val event = eventQueue .poll
if (event == SparkListenerShutdown ) {
// Get out of the while loop and shutdown
the daemon thread
return
}
Option (event). foreach ( postToAll )
}
}
}
Option(event).foreach(postToAll)負責將事件通知給各個Observer.postToAll的函數實現如下
def postToAll(event: SparkListenerEvent) {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
foreachListener(_.onStageSubmitted(stageSubmitted))
case stageCompleted: SparkListenerStageCompleted =>
foreachListener(_.onStageCompleted(stageCompleted))
case jobStart: SparkListenerJobStart =>
foreachListener(_.onJobStart(jobStart))
case jobEnd: SparkListenerJobEnd =>
foreachListener(_.onJobEnd(jobEnd))
case taskStart: SparkListenerTaskStart =>
foreachListener(_.onTaskStart(taskStart))
case taskGettingResult: SparkListenerTaskGettingResult =>
foreachListener(_.onTaskGettingResult(taskGettingResult))
case taskEnd: SparkListenerTaskEnd =>
foreachListener(_.onTaskEnd(taskEnd))
case environmentUpdate: SparkListenerEnvironmentUpdate =>
foreachListener(_.onEnvironmentUpdate(environmentUpdate))
case blockManagerAdded: SparkListenerBlockManagerAdded =>
foreachListener(_.onBlockManagerAdded(blockManagerAdded))
case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))
case unpersistRDD: SparkListenerUnpersistRDD =>
foreachListener(_.onUnpersistRDD(unpersistRDD))
case applicationStart: SparkListenerApplicationStart =>
foreachListener(_.onApplicationStart(applicationStart))
case applicationEnd: SparkListenerApplicationEnd =>
foreachListener(_.onApplicationEnd(applicationEnd))
case SparkListenerShutdown =>
}
}
Metrics
在系統設計中,測量模塊是不可或缺的組成部分。通過這些測量數據來感知系統的運行情況。
在Spark中,測量模塊由MetricsSystem來擔任,MetricsSystem中有三個重要的概念,分述如下。
- instance 表示誰在使用metrics system, 目前已知的有master, worker, executor和client driver會創建metrics system用以測量
- source 表示數據源,從哪里獲取數據
- sinks 數據目的地,將從source獲取的數據發送到哪
Spark目前支持將測量數據保存或發送到如下目的地
- ConsoleSink 輸出到console
- CSVSink 定期保存成為CSV文件
- JmxSink 注冊到JMX,以通過JMXConsole來查看
- MetricsServlet 在SparkUI中添加MetricsServlet用以查看Task運行時的測量數據
- GraphiteSink 發送給Graphite以對整個系統(不僅僅包括spark)進行監控
下面從MetricsSystem的創建,數據源的添加,數據更新與發送幾個方面來跟蹤一下源碼。
初始化過程
MetricsSystem依賴於由codahale提供的第三方庫Metrics,可以在metrics.codahale.com找到更為詳細的介紹。
以Driver Application為例,driver application首先會初始化SparkContext,在SparkContext的初始化過程中就會創建MetricsSystem,具體調用關系如下。 SparkContext.init->SparkEnv.init->MetricsSystem.createMetricsSystem
注冊數據源,繼續以SparkContext為例
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
private def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
}
initDriverMetrics()
數據讀取
數據讀取由Sink來完成,在Spark中創建的Sink子類如下圖所示
讀取最新的數據,以CsvSink為例,最主要的就是創建CsvReporter,啟動之后會定期更新最近的數據到console。不同類型的Sink所使用的Reporter是不一樣的。
val reporter: CsvReporter = CsvReporter.forRegistry(registry)
.formatFor(Locale.US)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build(new File(pollDir))
override def start() {
reporter.start(pollPeriod, pollUnit)
}
Spark中關於metrics子系統的配置文件詳見conf/metrics.properties. 默認的Sink是MetricsServlet,在任務提交執行之后,輸入http://127.0.0.1:4040/metrics/json會得到以json格式保存的metrics信息。
