Spark(四十七):Spark UI 數據可視化


導入:

1)Spark Web UI主要依賴於流行的Servlet容器Jetty實現;

2)Spark Web UI(Spark2.3之前)是展示運行狀況、資源狀態和監控指標的前端,而這些數據都是由度量系統(MetricsSystem)收集來的;

3)Spark Web UI(spark2.3之后)呈現的數據應該與事件總線和ElementTrackingStore關系緊密,而MetricsSystem是一個向外部提供測量指標的存在

具體Spark UI存儲更改可以通過spark issue查看:

Key-value store abstraction and implementation for storing application data

Use key-value store to keep History Server application listing

Hook up Spark UI to the new key-value store backend

Implement listener for saving application status data in key-value store

Make Environment page use new app state store

Make Executors page use new app state store

Make the Storage page use new app state store

Make Jobs and Stages pages use the new app state store

Spark UI界面可以包含選項卡:Jobs,Stages,Storage,Enviroment,Executors,SQL

 

Spark UI(http server)是如何被啟動?

接下來讓我們從源碼入手查看下Spark UI(http server)是如何被啟動的,頁面中的數據從哪里獲取到。

Spark UI中用到的http server是jetty,jetty采用java編寫,是比較不錯的servlet engine和http server,能嵌入到用戶程序中執行,不用用tomcat或jboss那樣需要運行在獨立jvm進程中。

1)SparkContext初始化時啟動SparkUI

Spark UI(http server)在SparkContext初始化的時候被創建:

  ...

private var _listenerBus: LiveListenerBus = _
private var _statusStore: AppStatusStore = _

  ...
  private[spark] def ui: Option[SparkUI] = _ui
 _listenerBus = new LiveListenerBus(_conf) // Initialize the app status store and listener before SparkEnv is created so that it gets // all events.
    _statusStore = AppStatusStore.createLiveStore(conf) listenerBus.addToStatusQueue(_statusStore.listener.get)
。。。。
_ui
= if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime)) } else { // For tests, do not enable the UI None } // Bind the UI before starting the task scheduler to communicate // the bound port to the cluster manager properly _ui.foreach(_.bind()) 。。。 _ui.foreach(_.setAppId(_applicationId))
...

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/SparkContext.scala

其中,_statusStore是AppStatusStore初始化對象,它內部包裝了KVStore和AppStatusListener:

  • KVStore用於存儲監控數據,
  • AppStatusListener注冊到事件總線中的appStatus隊列中。

_env.securityManager則是SparkEnv中初始化的安全管理器。

SparkContext通過調用SparkUI伴生對象中的create()方法來直接new出SparkUI實例,然后調用bind()方法將SparkUI綁定到Jetty服務。

2)SparkUI類對象初始化

SparkUI調用create方法后會初始化一個SparkUI對象,在SparkUI對象被初始化時,會調用SparkUI的initialize()方法

private[spark] class SparkUI private (
    val store: AppStatusStore,
    val sc: Option[SparkContext],
    val conf: SparkConf,
    securityManager: SecurityManager,
    var appName: String,
    val basePath: String,
    val startTime: Long,
    val appSparkVersion: String)
  extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf),
    conf, basePath, "SparkUI")
  with Logging
  with UIRoot {

  val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)

  var appId: String = _

  private var streamingJobProgressListener: Option[SparkListener] = None

  /** Initialize all components of the server. */
  def initialize(): Unit = {
    val jobsTab = new JobsTab(this, store)
    attachTab(jobsTab)
    val stagesTab = new StagesTab(this, store)
    attachTab(stagesTab)
    attachTab(new StorageTab(this, store))
    attachTab(new EnvironmentTab(this, store))
    attachTab(new ExecutorsTab(this))
    addStaticHandler(SparkUI.STATIC_RESOURCE_DIR)
    attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
    attachHandler(ApiRootResource.getServletHandler(this))

    // These should be POST only, but, the YARN AM proxy won't proxy POSTs
    attachHandler(createRedirectHandler(
      "/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
    attachHandler(createRedirectHandler(
      "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
      httpMethods = Set("GET", "POST")))
  }

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/SparkUI.scala

SparkUI類中有3個屬性成員:

  • killEnabled由配置項spark.ui.killEnable控制。如果為true,則會在Spark Web UI界面中展示強行殺掉Spark Application Job的開關;
  • appId就是當前的Application ID;
  • streamingJobProgressListener是用於Spark Streaming作業進度的監聽器。

在initialize()方法中,

  • 首先,會創建JobsTab、StagesTab、StorageTab、EnvironmentTab、ExecutorsTab這5個Tab,並調用了attachTab()方法注冊到Web UI。這里的Tab是Spark UI中的標簽頁,參考上圖,名稱也是一一對應。
  • 然后,調用addStaticHandler()方法創建靜態資源的ServletContextHandler,又調用createRedirectHandler()創建一些重定向的ServletContextHandler。
  • 最后,逐一調用attachHandler()方法注冊到Web UI。

備注:ServletContextHandler是Jetty中一個功能完善的處理器,負責接收並處理HTTP請求,再投遞給Servlet。

上邊每個JobsTab、StagesTab、StorageTab、EnvironmentTab、ExecutorsTab除了包含有渲染頁面類,還包含資源html&js&css&其他(圖片)(https://github.com/apache/spark/tree/branch-2.4/core/src/main/resources/org/apache/spark/ui/static

3)執行bind()方法啟動jetty服務

在上邊SparkContext初始化時,創建了SparkUI對象,將會調用bind()方法將SparkUI綁定到Jetty服務,這個bind()方法SparkUI子類WebUI中的一個方法。

WebUI屬性成員和Getter方法

  protected val tabs = ArrayBuffer[WebUITab]()
  protected val handlers = ArrayBuffer[ServletContextHandler]()
  protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
  protected var serverInfo: Option[ServerInfo] = None
  protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
    conf.get(DRIVER_HOST_ADDRESS))
  private val className = Utils.getFormattedClassName(this)

  def getBasePath: String = basePath
  def getTabs: Seq[WebUITab] = tabs
  def getHandlers: Seq[ServletContextHandler] = handlers
  def getSecurityManager: SecurityManager = securityManager

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala

WebUI屬性成員有6個:

  • tabls:持有WebUITab(Web UI中的tab頁)的緩存;
  • handlers:持有Jetty ServletContextHandler的緩存;
  • pageToHandlers:保存WebUIPage(WebUITab的下一級組件)與其對應的ServletContextHandler的映射關系;
  • serverInfo:當前Web UI對應的Jetty服務器信息;
  • publicHostName:當前Web UI對應的Jetty服務主機名。先通過系統環境變量SPARK_PUBLIC_DNS獲取,在通過spark.driver.host配置項獲取。
  • className:當前類的名稱,用Utils.getFormattedClassName()方法格式化過。

Getter方法有4個:

  • getTabs()和getHandlers()都是簡單地獲得對應屬性的值;
  • getBasePath()獲得構造參數中定義的Web UI基本路勁;
  • getSecurityManager()則取得構造參數中傳入的安全管理器。

WebUI提供的attache/detach類方法

 這些方法都是成對出現,一共有3對:

  • attachTab/detachTab:用於注冊和移除WebUIPage;
  • attachPage/detachPage:用於注冊和移除WebUIPage;
  • attachHandler/detaHandler:用於注冊和移除ServletContextPage。
  /** Attaches a tab to this UI, along with all of its attached pages. */
  def attachTab(tab: WebUITab): Unit = {
    tab.pages.foreach(attachPage)
    tabs += tab
  }
  /** Detaches a tab from this UI, along with all of its attached pages. */
  def detachTab(tab: WebUITab): Unit = {
    tab.pages.foreach(detachPage)
    tabs -= tab
  }
/** Detaches a page from this UI, along with all of its attached handlers. */ def detachPage(page: WebUIPage): Unit = { pageToHandlers.remove(page).foreach(_.foreach(detachHandler)) } /** Attaches a page to this UI. */ def attachPage(page: WebUIPage): Unit = { val pagePath = "/" + page.prefix val renderHandler = createServletHandler(pagePath, (request: HttpServletRequest) => page.render(request), securityManager, conf, basePath) val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json", (request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath) attachHandler(renderHandler) attachHandler(renderJsonHandler) val handlers = pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]()) handlers += renderHandler }
/** Attaches a handler to this UI. */ def attachHandler(handler: ServletContextHandler): Unit = { handlers += handler serverInfo.foreach(_.addHandler(handler)) } /** Detaches a handler from this UI. */ def detachHandler(handler: ServletContextHandler): Unit = { handlers -= handler serverInfo.foreach(_.removeHandler(handler)) } def detachHandler(path: String): Unit = { handlers.find(_.getContextPath() == path).foreach(detachHandler) }
def addStaticHandler(resourceBase: String, path: String = "/static"): Unit = { attachHandler(JettyUtils.createStaticHandler(resourceBase, path)) }

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala

attachPage()方法流程:

  • 1)調用Jetty工具類JettyUtils的createServletHandler()方法,為WebUIPage的兩個渲染方法render()和readerJson()創建ServletContextHandler,也就是一個WebUIPage需要對應兩個處理器。
  • 2)然后,調用上述attachHandler()方法向Jetty注冊處理器,並將映射關系寫入handlers結構中。

WebUI綁定到Jetty服務

/** Binds to the HTTP server behind this web interface. */ def bind(): Unit = { assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!") try { val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name)) logInfo(s"Bound $className to $host, and started at $webUrl") } catch { case e: Exception => logError(s"Failed to bind $className", e) System.exit(1) } }

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala

這個bind()方法,包含節點信息:

a)其中調用startJettyServer(...)方法,該方法是JettyUtils.scala中的一個方法,這點也說明了SparkUI運行時基於jetty實現的。

詳細實現請參考:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

b)調用startjettyServer(...)方法傳遞了host,port參數,這兩個參數也是Spark UI訪問的ip和端口,我們需要了解下這兩個參數具體的配置在哪里。

√  host的獲取代碼:

val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")

spark-env.sh包含了參數(SPARK_LOCAL_IP、SPARK_PUBLIC_DNS):

# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program

# Options read by executors and drivers running inside the cluster
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos

https://github.com/apache/spark/blob/branch-2.4/conf/spark-env.sh.template

√ ip的獲取代碼在SparkUI object靜態類中

private[spark] object SparkUI {
  val DEFAULT_PORT = 4040
  val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
  val DEFAULT_POOL_NAME = "default" def getUIPort(conf: SparkConf): Int = { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) } /**
   * Create a new UI backed by an AppStatusStore.
   */
  def create(
      sc: Option[SparkContext],
      store: AppStatusStore,
      conf: SparkConf,
      securityManager: SecurityManager,
      appName: String,
      basePath: String,
      startTime: Long,
      appSparkVersion: String = org.apache.spark.SPARK_VERSION): SparkUI = {

    new SparkUI(store, sc, conf, securityManager, appName, basePath, startTime, appSparkVersion)
  }

}

Spark Web UI渲染

 Spark Web UI實際上是一個三層的樹形結構,樹根節點為WebUI,中層節點是WebUITab,葉子節點是WebUIPage。

UI界面的展示就主要靠WebUITab與WebUIPage來實現。在Spark UI界面中,一個Tab(WebUITab)可以包含一個或多個Page(WebUIPage),且Tab(WebUITab)是可選的。

WebUI定義:上邊有講解,這里就不再貼代碼。https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala

WebUITab定義:

/**
 * A tab that represents a collection of pages.
 * The prefix is appended to the parent address to form a full path, and must not contain slashes.
 */
private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
  val pages = ArrayBuffer[WebUIPage]()
  val name = prefix.capitalize

  /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */
  def attachPage(page: WebUIPage) {
    page.prefix = (prefix + "/" + page.prefix).stripSuffix("/")
    pages += page
  }

  /** Get a list of header tabs from the parent UI. */
  def headerTabs: Seq[WebUITab] = parent.getTabs

  def basePath: String = parent.getBasePath
}

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala下定義的。

  • 由於一個Tab(WebUITab)可以包含多個Page(WebUIPage),因此WebUITab中屬性val pages = ArrayBuffer[WebUIPage]()數組就是用來緩存該Tab(WebUITab)下所有的Page(WebUIPage)。
  • attachPage(...)方法就用於將Tab(WebUITab)的路徑前綴與Page(WebUIPage)的路徑前綴拼接在一起,並將其寫入pages數組中。

WebUIPage定義:

/**
 * A page that represents the leaf node in the UI hierarchy.
 *
 * The direct parent of a WebUIPage is not specified as it can be either a WebUI or a WebUITab.
 * If the parent is a WebUI, the prefix is appended to the parent's address to form a full path.
 * Else, if the parent is a WebUITab, the prefix is appended to the super prefix of the parent
 * to form a relative path. The prefix must not contain slashes.
 */
private[spark] abstract class WebUIPage(var prefix: String) {
  def render(request: HttpServletRequest): Seq[Node]
  def renderJson(request: HttpServletRequest): JValue = JNothing
}

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/ui/WebUI.scala下定義的。

  • render(...)方法用於渲染頁面;
  • renderJson(...)方法用於生成渲染頁面對應的JSON字符串。

 WebUITab與WebUIPage各有一系列實現類,具體請參考代碼:https://github.com/apache/spark/tree/branch-2.4/core/src/main/scala/org/apache/spark/ui/exec

渲染SparkUI頁面

以Executors這個Tab頁為例,因為這個頁面具有代表性,一個Tab下可以展示兩個Page(ExecutorsPage、ExecutorThreadDumpPage)

在Spark UI中Tab下包含頁面如下:

executors->ExecutorsPage

http://ip:8088/proxy/application_1558494459870_0005/executors/threadDump/?executorId=1

executors->ExecutorThreadDumpPage

http://ip:8088/proxy/application_1558494459870_0005/executors/threadDump/?executorId=[executorId或者driver]

首先看下ExecutorsTab的代碼

private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {

  init()

  private def init(): Unit = {
    val threadDumpEnabled =
      parent.sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true)

    attachPage(new ExecutorsPage(this, threadDumpEnabled))
    if (threadDumpEnabled) {
      attachPage(new ExecutorThreadDumpPage(this, parent.sc))
    }
  }
}

其中SparkUITab就是對WebUITab的簡單封裝,加上了Application名稱和Spark版本屬性。ExecutorsTab類包含了init()方法,在構造函數中調用了該init()方法,init()方法內部調用了SparkUITab類預定義好的attachPage(...)方法,將ExecutorsPage加入,當屬性threadDumpEnabled為true時,也將ExecutorThreadDumpPage加入。

再來看下ExecutorsPage的代碼

private[ui] class ExecutorsPage(
    parent: SparkUITab,
    threadDumpEnabled: Boolean)
  extends WebUIPage("") {

  def render(request: HttpServletRequest): Seq[Node] = {
    val content =
      <div>
        {
          <div id="active-executors" class="row-fluid"></div> ++
          <script src={UIUtils.prependBaseUri(request, "/static/utils.js")}></script> ++
          <script src={UIUtils.prependBaseUri(request, "/static/executorspage.js")}></script> ++
          <script>setThreadDumpEnabled({threadDumpEnabled})</script>
        }
      </div>

    UIUtils.headerSparkPage(request, "Executors", content, parent, useDataTables = true)
  }
}

render()方法用來渲染頁面內容,其流程如下:

1)將content內容封裝好;

2)調用UIUtils.headerSparkPage()方法,將content內容響應給瀏覽器;

3)瀏覽器加載過程中會調用executorspage.js,該JS內部會通過Rest服務器根據當前applicationId,去獲取allexecutos等信息,並將allexecutos信息按照模板executorspage-template.html渲染到executors頁面上。

spark rest 服務實現代碼路徑:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/status/api/v1/

其中上圖中api訪問類:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala

返回allexecutors信息的方法:

  @GET
  @Path("allexecutors")
  def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false))

這里的store正式SparkUI的store。從該方法可以看出來:實際上,spark rest服務提供的數據是存儲在SparkContext的AppStatusStore對象store上。

最后看下ExecutorThreadDumpPage的代碼

private[ui] class ExecutorThreadDumpPage(
    parent: SparkUITab,
    sc: Option[SparkContext]) extends WebUIPage("threadDump") {

  // stripXSS is called first to remove suspicious characters used in XSS attacks
  def render(request: HttpServletRequest): Seq[Node] = {
    val executorId =
      Option(UIUtils.stripXSS(request.getParameter("executorId"))).map { executorId =>
      UIUtils.decodeURLParameter(executorId)
    }.getOrElse {
      throw new IllegalArgumentException(s"Missing executorId parameter")
    }
    val time = System.currentTimeMillis()
    val maybeThreadDump = sc.get.getExecutorThreadDump(executorId)

    val content = maybeThreadDump.map { threadDump =>
      val dumpRows = threadDump.map { thread =>
        val threadId = thread.threadId
        val blockedBy = thread.blockedByThreadId match {
          case Some(_) =>
            <div>
              Blocked by <a href={s"#${thread.blockedByThreadId}_td_id"}>
              Thread {thread.blockedByThreadId} {thread.blockedByLock}</a>
            </div>
          case None => Text("")
        }
        val heldLocks = thread.holdingLocks.mkString(", ")

        <tr id={s"thread_${threadId}_tr"} class="accordion-heading"
            onclick={s"toggleThreadStackTrace($threadId, false)"}
            onmouseover={s"onMouseOverAndOut($threadId)"}
            onmouseout={s"onMouseOverAndOut($threadId)"}>
          <td id={s"${threadId}_td_id"}>{threadId}</td>
          <td id={s"${threadId}_td_name"}>{thread.threadName}</td>
          <td id={s"${threadId}_td_state"}>{thread.threadState}</td>
          <td id={s"${threadId}_td_locking"}>{blockedBy}{heldLocks}</td>
          <td id={s"${threadId}_td_stacktrace"} class="hidden">{thread.stackTrace.html}</td>
        </tr>
      }

    <div class="row-fluid">
      <p>Updated at {UIUtils.formatDate(time)}</p>
      {
        // scalastyle:off
        <p><a class="expandbutton" onClick="expandAllThreadStackTrace(true)">
          Expand All
        </a></p>
        <p><a class="expandbutton hidden" onClick="collapseAllThreadStackTrace(true)">
          Collapse All
        </a></p>
        <div class="form-inline">
        <div class="bs-example" data-example-id="simple-form-inline">
          <div class="form-group">
            <div class="input-group">
              Search: <input type="text" class="form-control" id="search" oninput="onSearchStringChange()"></input>
            </div>
          </div>
        </div>
        </div>
        <p></p>
        // scalastyle:on
      }
      <table class={UIUtils.TABLE_CLASS_STRIPED + " accordion-group" + " sortable"}>
        <thead>
          <th onClick="collapseAllThreadStackTrace(false)">Thread ID</th>
          <th onClick="collapseAllThreadStackTrace(false)">Thread Name</th>
          <th onClick="collapseAllThreadStackTrace(false)">Thread State</th>
          <th onClick="collapseAllThreadStackTrace(false)">Thread Locks</th>
        </thead>
        <tbody>{dumpRows}</tbody>
      </table>
    </div>
    }.getOrElse(Text("Error fetching thread dump"))
    UIUtils.headerSparkPage(request, s"Thread dump for executor $executorId", content, parent)
  }
}

 該頁面主要展示當前executor中thread運行情況。

參考:

Apache Spark源碼走讀之21 -- WEB UI和Metrics初始化及數據更新過程分析

Spark Metrics配置詳解

Spark Structrued Streaming源碼分析--(四)ProgressReporter每個流處理進度計算、StreamQueryManager管理運行的流

Spark Core源碼精讀計划#13:度量系統MetricsSystem的建立

Spark Core源碼精讀計划#14:Spark Web UI界面的實現

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM