spark[源碼]-sparkContext詳解[一]


 spark簡述

sparkContext在Spark應用程序的執行過程中起着主導作用,它負責與程序和spark集群進行交互,包括申請集群資源、創建RDD、accumulators及廣播變量等。sparkContext與集群資源管理器、work節點交互圖如下:

官網對圖下面幾點說明:
(1)不同的Spark應用程序對應該不同的Executor,這些Executor在整個應用程序執行期間都存在並且Executor中可以采用多線程的方式執行Task。這樣做的好處是,各個Spark應用程序的執行是相互隔離的。除Spark應用程序向外部存儲系統寫數據進行數據交互這種方式外,各Spark應用程序間無法進行數據共享。
(2)Spark對於其使用的集群資源管理器沒有感知能力,只要它能對Executor進行申請並通信即可。這意味着不管使用哪種資源管理器,其執行流程都是不變的。這樣Spark可以不同的資源管理器進行交互。
(3)Spark應用程序在整個執行過程中要與Executors進行來回通信。
(4)Driver端負責Spark應用程序任務的調度,因此最好Driver應該靠近Worker節點。

1.源碼鑒賞-綜述

在spark程序運行起來后,程序就會創建sparkContext,解析用戶的代碼,當遇到action算的時候開始執行程序,但是在執行之前還有很多前提工作要在sparkContext中做的,請記住你要了解了sparkContext,你就了解了spark。

  •  sparkContext構建的頂級三大核心:DAGScheduler,TaskScheduler,SchedulerBackend.
  1. DAGScheduler是面向Job的Stage的高層調度器。
  2. TaskScheduler是一個接口,是低層調度器,根據具體的ClusterManager的不同會有不同的實現。Standalone模式下具體實現的是TaskSchedulerlmpl。
  3. SchedulerBackend是一個接口,根據具體的ClusterManger的不同會有不同的實現,Standalone模式下具體的實現是SparkDeloySchedulerBackend。
  • 從整個程序運行的角度來講,sparkContext包含四大核心對象:DAGScheduler,TaskScheduler,SchedulerBackend,MapOutputTrackerMaster。
  • SparkDeploySchedulerBackend有三大核心功能:
  1. 負責接收Master接受注冊當前程序RegisterWithMaster。
  2. 接受集群中為當前應用程序而分配的計算資源Executor的注冊並管理Executor。
  3. 負責發送Task到具體的Executor執行。
  4. SparkDeploySchedulerBackend是被TaskSchedulerlmpl管理的。

sparkContext變量初始化

創建sparkContext的時候會做很多初始化事情,初始化很多變量。

事件監控總線:private[spark] val listenerBus = new LiveListenerBus

第一個重要的初始化出來了:這個地方是創建sparkEnv,就是創建actor,根據判斷創建dirver-actor

sparkContext的三大核心:這個只是一個定義getter和setter的方法,scala和java是有區別的,可以看看語法。但請時刻技術這三個核心。

從try開始了真正意義上的初始化操作了:396行。

_conf = config.clone():復制一個conf
_conf.validateSettings():檢查一些關鍵配置和是否存在,一些默認配置如果不存在,添加默認設置參數。
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER):請注意這個,其實在spark眼里沒有driver的概念,都是Executor,只是id標簽標記為了driver而已。

sparkEnv初始化:http://www.cnblogs.com/chushiyaoyue/p/7472904.html

下面是三大核心的創建:

 

創建createTaskScheduler:根據master的運行情況創建:

這個地方用到了正則匹配來判斷master的模式,我們以standalone的模式來講解:

 

根據模式匹配:TaskSchedulerImpl 創建,注意集群模式默認重試4次,本地模式只嘗試1次。

可以自己觀察一下其他模式的創建情況,但是會發現TaskSchedulerlmpl基本上是一樣。具體的TaskSchedulerImpl的實例創建和initialize()請參看另一篇文章。

http://www.cnblogs.com/chushiyaoyue/p/7475013.html

new TaskSchedulerImpl(sc):主要的是初始化一些變量。

scheduler.initialize(backend):創建資源配置池和資源調度算法,同時通過SchdulableBuilder.addTaskSetmanager:SchdulableBuilder會確定TaskSetManager的調度順序,然后按照TaskSetManager來確定每個Task具體運行在哪個ExecutorBackend中。

創建_dagScheduler = new DAGScheduler(this)

啟動taskScheduler

在這個方法中再調用 backend (SparkDeploySchedulerBackend) 的 start( ) 方法。

這個地方先啟動super.start()方法,在這個類CoarseGrainedSchedulerBackend里面。

這個方法主要是實例化DriverEndpoint,DriverEndpoint是整個集群內部和應用程序交互的關鍵。

時刻記住RpcEndpoint的聲明周期==constructor -> onStart -> receive* -> onStop

當實例化完成以后調用onStart方法

DriverEndpoint在實例化的時候根據SparkRPC的消息工作機制會調用生命周期方法onStart方法,在該方法執行時會執行Option(self).foreach(_.send(ReviveOffers))來周期性地發ReviveOffers消息給自己,ReviveOffers是個空的object,會觸發makeOffers‘Make fake resource offers on all executors’

開始創建的時候是發送的空的,這是在等待執行具體的task的時候用的。

注冊app到master

 通過SparkDeploySchedulerBackend 注冊到Master 的時候會將以上的 command 提交給 Master ,請注意org.apache.spark.executor.CoarseGrainedExecutorBackend,將來會通過這個啟動啟動執行的executor。

master發指令給worker去啟動Executor所有的進程的時候加載的main方法所在的入口類就是coommand中的CoarseGrainedExecutorBackend,當然你可以實現自己的ExecutorBackend,在CoarseGrainnedExecutorBackend中啟動Executor(Executor是先注冊在實例化),Executor通過線程值並發執行Task。

整體上的內容大概是這樣的啟動過程,其中存在很多具體的細節,在后續的文章中在詳細介紹吧。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM