Spark源碼剖析 - SparkContext的初始化(一)


 1. SparkContext概述

注意:SparkContext的初始化剖析是基於Spark2.1.0版本的

Spark Driver用於提交用戶應用程序,實際可以看作Spark的客戶端。了解Spark Driver的初始化,有助於讀者理解用戶應用程序在客戶端的處理過程。

Spark Driver的初始化始終圍繞着SparkContext的初始化。SparkContext可以算得上是所有Spark應用程序的發動機引擎,轎車要想跑起來,發動機首先要啟動。SparkContext初始化完畢,才能向Spark集群提交任務。在平坦的公路上,發動機只需以較低的轉速、較低的功率就可以游刃有余;在山區,你可能需要一台能夠提供大功率的發動機才能滿足你的需求。這些參數都是可以通過駕駛員操作油門、檔位等傳送給發動機的,而SparkContext的配置參數則由SparkConf負責,SparkConf就是你的操作面板。

SparkConf的構造很簡單,主要是通過ConcurrentHashMap來維護各種Spark的配置屬性。SparkConf代碼結構如下,Spark的配置屬性都是以“spark.”開頭的字符串。

現在開始介紹SparkContext。SparkContext的初始化步驟如下:

  1. 創建Spark執行環境SparkEnv;
  2. 創建並初始化Spark UI;
  3. Hadoop相關配置及Executor環境變量的設置;
  4. 創建任務調度TaskScheduler;
  5. 創建和啟動DAGScheduler;
  6. TaskScheduler的啟動;
  7. 初始化管理器BlockManager(BlockManager是存儲體系的主要組件之一)
  8. 啟動測量系統MetricsSystem;
  9. 創建和啟動Executor分配管理器ExecutorAllocationManager;
  10. ContextCleaner的創建與啟動;
  11. Spark環境更新;
  12. 創建DAGSchedulerSource和BlockManagerSource;
  13. 將SparkContext標記為激活;

SparkContext的主構造參數為SparkConf,其實現如下:

 上面代碼中的CallSite存儲了線程棧中最靠近棧頂的用戶類及最靠近棧底的Scala或者Spark核心類信息。SparkContext默認只有一個實例(由屬性spark.driver.allowMultipleContexts來控制,用戶需要多個SparkContext實例時,可以將其設置為true),方法markPartiallyConstructed用來確保實例的唯一性,並將當前SparkContext標記為正在構建中。

接下來會對SparkConf進行復制,然后對各種配置信息進行校驗,代碼如下:

從上面校驗的代碼看到必須指定屬性spark.master和spark.app.name,否則會拋出異常,結束初始化進程。spark.master用於設置部署模式,spark.app.name用於指定應用程序名稱。

2. 創建執行環境SparkEnv

內容詳情請看https://www.cnblogs.com/swordfall/p/9303348.html

3. 創建並初始化Spark UI

內容詳情請看https://www.cnblogs.com/swordfall/p/9303399.html

4. Hadoop相關配置及Executor環境變量的設置

 內容詳情請看https://www.cnblogs.com/swordfall/p/9306113.html

5. 創建任務調度器TaskScheduler

內容詳情請看:https://www.cnblogs.com/swordfall/p/9314949.html

 6. 創建和啟動DAGScheduler

內容詳情請看:https://www.cnblogs.com/swordfall/p/9314940.html

7. TaskScheduler的啟動

內容詳情請看:https://www.cnblogs.com/swordfall/p/9314930.html

8. 初始化管理器BlockManager

內容詳情請看:https://www.cnblogs.com/swordfall/p/9318900.html

9. 啟動測量系統MetricsSystem

內容詳情請看:https://www.cnblogs.com/swordfall/p/9317579.html

10. 創建和啟動ExecutorAllocationManager

ExecutorAllocationManager用於對已分配的Executor進行管理,創建和啟動ExecutorAllocationManager的代碼如下:

默認情況下不會創建ExecutorAllocationManager,可以修改屬性spark.dynamicAllocation.enabled為true來創建。ExecutorAllocationManager可以設置動態分配最小Executor數量、動態分配最大Executor數量、每個Executor可以運行的Task數量等配置信息,並對配置信息進行校驗。start方法將ExecutorAllocationListener加入listenerBus中,ExecutorAllocationListener通過監聽listenerBus里的事件,動態添加、刪除Executor。並且通過Thread不斷添加Executor,遍歷Executor,將超時的Executor殺掉並移除。ExecutorAllocationListener的實現與其他SparkListener類似,不再贅述。ExecutorAllocationManager的關鍵代碼如下:

注意:listenerBus內置了線程listenerThread,此線程不斷從eventQueue中拉出事件對象,調用監聽器的監聽方法。要啟動此線程,需要調用listenerBus的start()方法,代碼如下:

listenerBus.start()

11. ContextCleaner的創建與啟動

 ContextCleaner用於清理那些超出應用范圍的RDD、ShuffleDependency和Broadcast對象。由於配置屬性spark.cleaner.referenceTracking默認是true,所以會構造並啟動ContextCleaner,代碼如下:

ContextCleaner的組成如下:

  • referenceQueue:緩存頂級的AnyRef引用;
  • referenceBuffer:緩存AnyRef的虛引用;
  • listeners:緩存清理工作的監聽器數組;
  • cleaningThread:用於具體清理工作的線程。

ContextCleaner的工作原理和listenerBus一樣,也采用監聽器模式,由線程來處理,此線程實際只是調用keepCleaning方法。keepCleaning的實現見代碼:

12. Spark環境更新

內容詳情請看:https://www.cnblogs.com/swordfall/p/9318499.html

13. 創建DAGSchedulerSource和BlockManagerSource

在創建DAGSchedulerSource、BlockManagerSource之前首先調用taskScheduler的postStartHook方法,其目的是為了等待backend就緒,見代碼:

14. 將SparkContext標記為激活

 SparkContext初始化的最后將當前SparkContext的狀態從contextBeingConstructed(正在構建中)改為activeContext(已激活),代碼如下:

setActiveContext方法的實現如下:

15. 總結

listenerBus對於監聽器模式的經典應用看來並不復雜,希望讀者朋友能應用到自己的產品中去。此外,使用Netty所提供的異步網絡框架構建的Block傳輸服務,基於Jetty構建的內嵌web服務(HTTP文件服務器和SparkUI),基於codahale提供的第三方測量倉庫創建的測量系統,Executor中的心跳實現等內容,都值得借鑒。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM