《深入理解Spark:核心思想與源碼分析》一書前言的內容請看鏈接《深入理解SPARK:核心思想與源碼分析》一書正式出版上市
《深入理解Spark:核心思想與源碼分析》一書第一章的內容請看鏈接《第1章 環境准備》
《深入理解Spark:核心思想與源碼分析》一書第二章的內容請看鏈接《第2章 SPARK設計理念與基本架構》
由於本書的第3章內容較多,所以打算分別開辟四篇隨筆分別展現。
《深入理解Spark:核心思想與源碼分析》一書第三章第一部分的內容請看鏈接《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(伯篇)》
《深入理解Spark:核心思想與源碼分析》一書第三章第二部分的內容請看鏈接《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(仲篇)》
《深入理解Spark:核心思想與源碼分析》一書第三章第二部分的內容請看鏈接《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(叔篇)》
本文展現第3章第三部分的內容:
3.9 啟動測量系統MetricsSystem
MetricsSystem使用codahale提供的第三方測量倉庫Metrics,有關Metrics的具體信息可以參考附錄D。MetricsSystem中有三個概念:
q Instance:指定了誰在使用測量系統;
q Source:指定了從哪里收集測量數據;
q Sink:指定了往哪里輸出測量數據。
Spark按照Instance的不同,區分為Master、Worker、Application、Driver和Executor。
Spark目前提供的Sink有ConsoleSink、CsvSink、JmxSink、MetricsServlet、GraphiteSink等。
Spark中使用MetricsServlet作為默認的Sink。
MetricsSystem的啟動代碼如下。
val metricsSystem = env.metricsSystem metricsSystem.start()
MetricsSystem的啟動過程包括以下步驟:
1) 注冊Sources;
2) 注冊Sinks;
3) 給Sinks增加Jetty的ServletContextHandler。
MetricsSystem啟動完畢后,會遍歷與Sinks有關的ServletContextHandler,並調用attachHandler將它們綁定到SparkUI上。
metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
3.9.1 注冊Sources
registerSources方法用於注冊Sources,它的實現見代碼清單3-44。注冊Sources的過程分為以下步驟:
1) 從metricsConfig獲取Driver的Properties,默認為創建MetricsSystem的過程中解析的{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json}。
2) 從Driver的Properties中用正則匹配以source.開頭的屬性。然后將屬性中的Source反射得到的實例,加入ArrayBuffer[Source]。
3) 將每個Source的metricRegistry(也是MetricSet的子類型)注冊到ConcurrentMap<String, Metric> metrics。這里的registerSource方法已在3.8.2節講解過。
代碼清單3-44 MetricsSystem
private def registerSources() { val instConfig = metricsConfig.getInstance(instance) val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX) // Register all the sources related to instance sourceConfigs.foreach { kv => val classPath = kv._2.getProperty("class") try { val source = Class.forName(classPath).newInstance() registerSource(source.asInstanceOf[Source]) } catch { case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e) } } }
3.9.2 注冊Sinks
registerSinks方法用於注冊Sinks,它的實現見代碼清單3-45。注冊Sinks的步驟如下:
1) 從Driver的Properties中用正則匹配以sink.開頭的屬性,如:{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json}。將其轉換為Map(servlet -> {class=org.apache.spark.metrics.sink.MetricsServlet, path=/metrics/json})。
2) 將子屬性class對應的類metricsServlet反射得到MetricsServlet實例。如果屬性的key是servlet,將其設置為metricsServlet;如果是Sink,則加入到ArrayBuffer[Sink]中。
代碼清單3-45 MetricsSystem注冊Sinks的實現
private def registerSinks() { val instConfig = metricsConfig.getInstance(instance) val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
sinkConfigs.foreach { kv => val classPath = kv._2.getProperty("class") if (null != classPath) { try { val sink = Class.forName(classPath) .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) .newInstance(kv._2, registry, securityMgr) if (kv._1 == "servlet") { metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) } else { sinks += sink.asInstanceOf[Sink] } } catch { case e: Exception => logError("Sink class "+ classPath + " cannot be instantialized",e) } } } }
3.9.3給Sinks增加Jetty的ServletContextHandler
MetricsSystem的getServletHandlers方法,實現如下。
def getServletHandlers = { require(running, "Can only call getServletHandlers on a running MetricsSystem") metricsServlet.map(_.getHandlers).getOrElse(Array()) }
可以看到調用了metricsServlet的getHandlers,其實現如下。
def getHandlers = Array[ServletContextHandler]( createServletHandler(servletPath, new ServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr) )
最終生成處理/metrics/json請求的ServletContextHandler,而請求的真正處理由getMetricsSnapshot方法,利用fastjson解析。生成的ServletContextHandler通過SparkUI的attachHandler方法,也被綁定到SparkUI。createServletHandler與attachHandler方法都已經在3.4.4節詳細闡述。最終我們可以使用以下這些地址來訪問測量數據。
http://localhost:4040/metrics/applications/json
http://localhost:4040/metrics/json
http://localhost:4040/metrics/master/json
3.10 創建和啟動ExecutorAllocationManager
ExecutorAllocationManager用於動態分配executor,創建和啟動ExecutorAllocationManager的代碼如下。
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] = if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { Some(new ExecutorAllocationManager(this, listenerBus, conf)) } else { None } executorAllocationManager.foreach(_.start())
默認情況下不會創建ExecutorAllocationManager,可以修改屬性spark.dynamicAllocation.enabled為true來創建。ExecutorAllocationManager可以設置動態分配最小Executor數量、動態分配最大Executor數量、每個Executor可以運行的Task數量等配置信息,並對配置信息進行校驗。start方法將ExecutorAllocationListener加入到listenerBus中,ExecutorAllocationListener通過監聽listenerBus里的事件,動態添加刪除executor。並且通過Thread不斷的添加executor,並且遍歷executor,將超時的executor殺掉並且移除。ExecutorAllocationListener的實現與其他SparkListener類似,不再贅述。ExecutorAllocationManager的關鍵代碼見代碼清單3-46。
代碼清單3-46 ExecutorAllocationManagerr的關鍵代碼
private val intervalMillis: Long = 100 private var clock: Clock = new RealClock private val listener = new ExecutorAllocationListener def start(): Unit = { listenerBus.addListener(listener) startPolling() } private def startPolling(): Unit = { val t = new Thread { override def run(): Unit = { while (true) { try { schedule() } catch { case e: Exception => logError("Exception in dynamic executor allocation thread!", e) } Thread.sleep(intervalMillis) } } } t.setName("spark-dynamic-executor-allocation") t.setDaemon(true) t.start() }
根據3.4.1節的內容,我們知道listenerBus內置了線程listenerThread,此線程不斷從eventQueue中拉出事件對象,調用監聽器的監聽方法。要啟動此線程,需要調用listenerBus的start方法,代碼如下。
listenerBus.start()
3.11 ContextCleaner的創建與啟動
由於配置屬性spark.cleaner.referenceTracking默認是true,所以會構造並啟動ContextCleaner,代碼如下。
private[spark] val cleaner: Option[ContextCleaner] = { if (conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) } else { None } } cleaner.foreach(_.start())
ContextCleaner用於清理那些超出應用范圍的RDD、ShuffleDependency和Broadcast對象。ContextCleaner的組成如下:
q referenceQueue:緩存頂級的AnyRef引用;
q referenceBuffer:緩存AnyRef的虛引用;
q listeners:緩存清理工作的監聽器數組;
q cleaningThread:用於具體清理工作的線程。
ContextCleaner的工作原理和listenerBus一樣,也采用監聽器模式,由線程來處理,此線程實際只是調用keepCleaning方法。keepCleaning的實現見代碼清單3-47。
代碼清單3-47 ContextCleaner的實現
private def keepCleaning(): Unit = Utils.logUncaughtExceptions { while (!stopped) { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) .map(_.asInstanceOf[CleanupTaskWeakReference]) // Synchronize here to avoid being interrupted on stop()
synchronized { reference.map(_.task).foreach { task => logDebug("Got cleaning task " + task) referenceBuffer -= reference.get
task match { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) } } } } catch { case ie: InterruptedException if stopped => // ignore case e: Exception => logError("Error in cleaning thread", e) } } }
3.12 Spark環境更新
在SparkContext的初始化過程中,可能對其環境造成影響,所以需要更新環境,代碼如下。
postEnvironmentUpdate() postApplicationStart()
SparkContext初始化過程中,如果設置了spark.jars屬性, spark.jars指定的jar包將由addJar方法加入到httpFileServer的jarDir變量指定的路徑下。spark.files指定的文件將由addFile方法加入到httpFileServer的fileDir變量指定的路徑下。見代碼清單3-48。
代碼清單3-48 依賴文件處理
val jars: Seq[String] = conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten val files: Seq[String] = conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten // Add each JAR given through the constructor
if (jars != null) { jars.foreach(addJar) } if (files != null) { files.foreach(addFile) }
httpFileServer的addFile和addJar方法,見代碼清單3-49。
代碼清單3-49 HttpFileServer提供對依賴文件的訪問
def addFile(file: File) : String = { addFileToDir(file, fileDir) serverUri + "/files/" + file.getName } def addJar(file: File) : String = { addFileToDir(file, jarDir) serverUri + "/jars/" + file.getName } def addFileToDir(file: File, dir: File) : String = { if (file.isDirectory) { throw new IllegalArgumentException(s"$file cannot be a directory.") } Files.copy(file, new File(dir, file.getName)) dir + "/" + file.getName }
postEnvironmentUpdate的實現見代碼清單3-50,其處理步驟如下:
1) 通過調用SparkEnv的方法environmentDetails最終影響環境的JVM參數、Spark 屬性、系統屬性、classPath等,參見代碼清單3-51。
2) 生成事件SparkListenerEnvironmentUpdate,並post到listenerBus,此事件被EnvironmentListener監聽,最終影響EnvironmentPage頁面中的輸出內容。
代碼清單3-50 SparkContext環境更新
private def postEnvironmentUpdate() { if (taskScheduler != null) { val schedulingMode = getSchedulingMode.toString val addedJarPaths = addedJars.keys.toSeq val addedFilePaths = addedFiles.keys.toSeq val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths) val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails) listenerBus.post(environmentUpdate) } }
代碼清單3-51 environmentDetails的實現
val jvmInformation = Seq( ("Java Version", s"$javaVersion ($javaVendor)"), ("Java Home", javaHome), ("Scala Version", versionString) ).sorted val schedulerMode = if (!conf.contains("spark.scheduler.mode")) { Seq(("spark.scheduler.mode", schedulingMode)) } else { Seq[(String, String)]() } val sparkProperties = (conf.getAll ++ schedulerMode).sorted // System properties that are not java classpaths
val systemProperties = Utils.getSystemProperties.toSeq val otherProperties = systemProperties.filter { case (k, _) => k != "java.class.path" && !k.startsWith("spark.") }.sorted // Class paths including all added jars and files val classPathEntries = javaClassPath .split(File.pathSeparator) .filterNot(_.isEmpty) .map((_, "System Classpath"))
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User")) val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted Map[String, Seq[(String, String)]]( "JVM Information" -> jvmInformation, "Spark Properties" -> sparkProperties, "System Properties" -> otherProperties, "Classpath Entries" -> classPaths) }
postApplicationStart方法很簡單,只是向listenerBus發送了SparkListenerApplicationStart事件,代碼如下。
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), startTime, sparkUser))
3.13 創建DAGSchedulerSource和BlockManagerSource
在創建DAGSchedulerSource、BlockManagerSource之前首先調用taskScheduler的postStartHook方法,其目的是為了等待backend就緒,見代碼清單3-52。postStartHook的實現見代碼清單3-53。
創建DAGSchedulerSource和BlockManagerSource的過程類似於ExecutorSource,只不過DAGSchedulerSource測量的信息是stage. failedStages、stage. runningStages、stage. waitingStages、stage. allJobs、stage. activeJobs,BlockManagerSource測量的信息是memory. maxMem_MB、memory. remainingMem_MB、memory. memUsed_MB、memory. diskSpaceUsed_MB。
代碼清單3-52 創建DAGSchedulerSource和BlockManagerSource
taskScheduler.postStartHook() private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler) private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager) private def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) SparkEnv.get.metricsSystem.registerSource(blockManagerSource) } initDriverMetrics()
代碼清單3-53 等待backend就緒的實現
override def postStartHook() { waitBackendReady() } private def waitBackendReady(): Unit = { if (backend.isReady) { return } while (!backend.isReady) { synchronized { this.wait(100) } } }
3.14 將SparkContext標記為激活
SparkContext初始化的最后將當前SparkContext的狀態從contextBeingConstructed(正在構建中)改為activeContext(已激活),代碼如下。
SparkContext.setActiveContext(this, allowMultipleContexts)
setActiveContext方法的實現如下。
private[spark] def setActiveContext( sc: SparkContext, allowMultipleContexts: Boolean): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { assertNoOtherContextIsRunning(sc, allowMultipleContexts) contextBeingConstructed = None activeContext = Some(sc) }
}
3.15 小結
回顧本章, Scala與Akka基於Actor的並發編程模型給人帶來深刻的印象,改變了我本人每當需要提升性能時就想到使用多線程的傳統觀念,Actor與事件模型有類似之處,通過異步處理,減少線程切換開銷,值得開發人員借鑒。listenerBus對於監聽器模式的經典應用將處理轉化為事件並交給統一的線程處理,減少了線程阻塞與切換,提升了性能,希望讀者朋友能應用到自己的產品開發中去。此外,使用Netty所提供的異步網絡框架構建的Block傳輸服務,基於Jetty構建的內嵌web服務、HTTP文件服務器和SparkUI,基於codahale提供的第三方測量倉庫創建的測量系統,Executor中的心跳實現等內容,都值得借鑒。
后記:自己犧牲了7個月的周末和下班空閑時間,通過研究Spark源碼和原理,總結整理的《深入理解Spark:核心思想與源碼分析》一書現在已經正式出版上市,目前亞馬遜、京東、當當、天貓等網站均有銷售,歡迎感興趣的同學購買。我開始研究源碼時的Spark版本是1.2.0,經過7個多月的研究和出版社近4個月的流程,Spark自身的版本迭代也很快,如今最新已經是1.6.0。目前市面上另外2本源碼研究的Spark書籍的版本分別是0.9.0版本和1.2.0版本,看來這些書的作者都與我一樣,遇到了這種問題。由於研究和出版都需要時間,所以不能及時跟上Spark的腳步,還請大家見諒。但是Spark核心部分的變化相對還是很少的,如果對版本不是過於追求,依然可以選擇本書。
京東(現有滿100減30活動):http://item.jd.com/11846120.html