1. SparkContext概述
注意:SparkContext的初始化剖析是基於Spark2.1.0版本的
Spark Driver用於提交用戶應用程序,實際可以看作Spark的客戶端。了解Spark Driver的初始化,有助於讀者理解用戶應用程序在客戶端的處理過程。
Spark Driver的初始化始終圍繞着SparkContext的初始化。SparkContext可以算得上是所有Spark應用程序的發動機引擎,轎車要想跑起來,發動機首先要啟動。SparkContext初始化完畢,才能向Spark集群提交任務。在平坦的公路上,發動機只需以較低的轉速、較低的功率就可以游刃有余;在山區,你可能需要一台能夠提供大功率的發動機才能滿足你的需求。這些參數都是可以通過駕駛員操作油門、檔位等傳送給發動機的,而SparkContext的配置參數則由SparkConf負責,SparkConf就是你的操作面板。
SparkConf的構造很簡單,主要是通過ConcurrentHashMap來維護各種Spark的配置屬性。SparkConf代碼結構如下,Spark的配置屬性都是以“spark.”開頭的字符串。
現在開始介紹SparkContext。SparkContext的初始化步驟如下:
- 創建Spark執行環境SparkEnv;
- 創建並初始化Spark UI;
- Hadoop相關配置及Executor環境變量的設置;
- 創建任務調度TaskScheduler;
- 創建和啟動DAGScheduler;
- TaskScheduler的啟動;
- 初始化管理器BlockManager(BlockManager是存儲體系的主要組件之一)
- 啟動測量系統MetricsSystem;
- 創建和啟動Executor分配管理器ExecutorAllocationManager;
- ContextCleaner的創建與啟動;
- Spark環境更新;
- 創建DAGSchedulerSource和BlockManagerSource;
- 將SparkContext標記為激活;
SparkContext的主構造參數為SparkConf,其實現如下:
上面代碼中的CallSite存儲了線程棧中最靠近棧頂的用戶類及最靠近棧底的Scala或者Spark核心類信息。SparkContext默認只有一個實例(由屬性spark.driver.allowMultipleContexts來控制,用戶需要多個SparkContext實例時,可以將其設置為true),方法markPartiallyConstructed用來確保實例的唯一性,並將當前SparkContext標記為正在構建中。
接下來會對SparkConf進行復制,然后對各種配置信息進行校驗,代碼如下:
從上面校驗的代碼看到必須指定屬性spark.master和spark.app.name,否則會拋出異常,結束初始化進程。spark.master用於設置部署模式,spark.app.name用於指定應用程序名稱。
2. 創建執行環境SparkEnv
內容詳情請看https://www.cnblogs.com/swordfall/p/9303348.html
3. 創建並初始化Spark UI
內容詳情請看https://www.cnblogs.com/swordfall/p/9303399.html
4. Hadoop相關配置及Executor環境變量的設置
內容詳情請看https://www.cnblogs.com/swordfall/p/9306113.html
5. 創建任務調度器TaskScheduler
內容詳情請看:https://www.cnblogs.com/swordfall/p/9314949.html
6. 創建和啟動DAGScheduler
內容詳情請看:https://www.cnblogs.com/swordfall/p/9314940.html
7. TaskScheduler的啟動
內容詳情請看:https://www.cnblogs.com/swordfall/p/9314930.html
8. 初始化管理器BlockManager
內容詳情請看:https://www.cnblogs.com/swordfall/p/9318900.html
9. 啟動測量系統MetricsSystem
內容詳情請看:https://www.cnblogs.com/swordfall/p/9317579.html
10. 創建和啟動ExecutorAllocationManager
ExecutorAllocationManager用於對已分配的Executor進行管理,創建和啟動ExecutorAllocationManager的代碼如下:
默認情況下不會創建ExecutorAllocationManager,可以修改屬性spark.dynamicAllocation.enabled為true來創建。ExecutorAllocationManager可以設置動態分配最小Executor數量、動態分配最大Executor數量、每個Executor可以運行的Task數量等配置信息,並對配置信息進行校驗。start方法將ExecutorAllocationListener加入listenerBus中,ExecutorAllocationListener通過監聽listenerBus里的事件,動態添加、刪除Executor。並且通過Thread不斷添加Executor,遍歷Executor,將超時的Executor殺掉並移除。ExecutorAllocationListener的實現與其他SparkListener類似,不再贅述。ExecutorAllocationManager的關鍵代碼如下:
注意:listenerBus內置了線程listenerThread,此線程不斷從eventQueue中拉出事件對象,調用監聽器的監聽方法。要啟動此線程,需要調用listenerBus的start()方法,代碼如下:
listenerBus.start()
11. ContextCleaner的創建與啟動
ContextCleaner用於清理那些超出應用范圍的RDD、ShuffleDependency和Broadcast對象。由於配置屬性spark.cleaner.referenceTracking默認是true,所以會構造並啟動ContextCleaner,代碼如下:
ContextCleaner的組成如下:
- referenceQueue:緩存頂級的AnyRef引用;
- referenceBuffer:緩存AnyRef的虛引用;
- listeners:緩存清理工作的監聽器數組;
- cleaningThread:用於具體清理工作的線程。
ContextCleaner的工作原理和listenerBus一樣,也采用監聽器模式,由線程來處理,此線程實際只是調用keepCleaning方法。keepCleaning的實現見代碼:
12. Spark環境更新
內容詳情請看:https://www.cnblogs.com/swordfall/p/9318499.html
13. 創建DAGSchedulerSource和BlockManagerSource
在創建DAGSchedulerSource、BlockManagerSource之前首先調用taskScheduler的postStartHook方法,其目的是為了等待backend就緒,見代碼:
14. 將SparkContext標記為激活
SparkContext初始化的最后將當前SparkContext的狀態從contextBeingConstructed(正在構建中)改為activeContext(已激活),代碼如下:
setActiveContext方法的實現如下:
15. 總結
listenerBus對於監聽器模式的經典應用看來並不復雜,希望讀者朋友能應用到自己的產品中去。此外,使用Netty所提供的異步網絡框架構建的Block傳輸服務,基於Jetty構建的內嵌web服務(HTTP文件服務器和SparkUI),基於codahale提供的第三方測量倉庫創建的測量系統,Executor中的心跳實現等內容,都值得借鑒。