Spark的任務調度


本文嘗試從源碼層面梳理Spark在任務調度與資源分配上的做法。

 

 

 

先從Executor和SchedulerBackend說起。Executor是真正執行任務的進程,本身擁有若干cpu和內存,可以執行以線程為單位的計算任務,它是資源管理系統能夠給予的最小單位。SchedulerBackend是spark提供的接口,定義了許多與Executor事件相關的處理,包括:新的executor注冊進來的時候記錄executor的信息,增加全局的資源量(核數),進行一次makeOffer;executor更新狀態,若任務完成的話,回收core,進行一次makeOffer;其他停止executor、remove executor等事件。下面由makeOffer展開。
 
makeOffer的目的是在有資源更新的情況下,通過調用scheduler的resourceOffers方法來觸發它對現有的任務進行一次分配,最終launch新的tasks。這里的全局 scheduler就是TaskScheduler,實現是TaskSchedulerImpl,它可以對接各種SchedulerBackend的實現,包括standalone的,yarn的,mesos的。SchedulerBackend在做makeOffer的時候,會把現有的 executor資源以WorkerOfffer列表的方式傳給scheduler,即以worker為單位,將worker信息及其內的資源交給 scheduler。scheduler拿到這一些集群的資源后,去遍歷已提交的tasks並根據locality決定如何launch tasks。
 
TaskScheduler里,resourceOffers方法會將已經提交的tasks進行一次優先級排序,這個排序算法目前是兩種:FIFO或FAIR。得到這一份待運行的tasks后,接下里就是要把schedulerBackend交過來的worker資源信息合理分配給這些tasks。分配前,為了避免每次都是前幾個worker被分到tasks,所以先對WorkerOffer列表進行一次隨機洗牌。接下來就是遍歷tasks,看workers的資源“夠不夠”,“符不符合”task,ok 的話task就被正式launch起來。注意,這里資源"夠不夠"是很好判斷的,在TaskScheduler里設置了每個task啟動需要的cpu個數,默認是1,所以只需要做核數的大小判斷和減1操作就可以遍歷分配下去。而"符不符合"這件事情,取決於每個tasks的locality設置。
 
task 的locality有五種,按優先級高低排:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY。也就是最好在同個進程里,次好是同個node(即機器)上,再次是同機架,或任意都行。task有自己的locality,如果本次資源里沒有想要的 locality資源,怎么辦呢?spark有一個spark.locality.wait參數,默認是3000ms。對於 process,node,rack,默認都使用這個時間作為locality資源的等待時間。所以一旦task需要locality,就可能會觸發delay scheduling。
 
到這里,對於任務的分配,資源的使用大致有個了解。實際上,TaskScheduler的resourceOffer里還觸發了 TaskSetManager的resourceOffer方法,TaskSetManager的resourceOffer是會檢查task的 locality並最終調用DAGScheduler去launch這個task。這些類的名字以及他們彼此的調用關系,看起來是比較亂的。我簡單梳理下。
 
這件事情要從Spark的DAG切割說起。Spark RDD通過其transaction和action操作,串起來形成了一個DAG。action的調用,觸發了DAG的提交和整個job的執行。觸發之后,由DAGScheduler這個全局唯一的面向stage的DAG調度器來切分DAG,根據是否 shuffle來切成多個小DAG,即stage。凡是RDD之間是窄依賴的,都歸到一個stage里,這里面的每個操作都對應成MapTask,並行度就是各自RDD的partition數目。凡是遇到寬依賴的操作,那么就把這一次操作切為一個stage,這里面的操作對應成ResultTask,結果 RDD的partition數就是並行度。MapTask和ResultTask分別可以簡單理解為傳統MR的Map和Reduce,切分他們的依據本質上就是shuffle。所以shuffle之前,大量的map是可以同partition內操作的。每個stage對應的是多個MapTask或多個 ResultTask,這一個stage內的task集合成一個TaskSet類,由TaskSetManager來管理這些task的運行狀態,locality處理(比如需要delay scheduling)。這個TaskSetManager是Spark層面上的,如何管理自己的tasks,即任務線程,這一層與底下資源管理是剝離的。我們上面提到的TaskSetManager的resourceOffer方法,是task與底下資源的交互,這個資源交互的協調人是 TaskScheduler,也是全局的,TaskScheduler對接的是不同的SchedulerBackend的實現(比如 mesos,yarn,standalone),如此來對接不同的資源管理系統。同時,對資源管理系統來說,他們要負責的是進程,是worker上起幾個進程,每個進程分配多少資源。所以這兩層很清楚,spark本身計算框架內管理線程級別的task,每個stage都有一個TaskSet,本身是個小DAG,可以丟到全局可用的資源池里跑;spark下半身的雙層資源管理部分掌控的是進程級別的executor,不關心task怎么擺放,也不關心task運行狀態,這是TaskSetManager管理的事情,兩者的協調者就是TaskScheduler及其內的SchedulerBackend實現。
 
SchedulerBackend 的實現,除去local模式的不說,分為細粒度和粗粒度兩種。細粒度只有Mesos(mesos有粗細兩種粒度的使用方式)實現了,粗粒度的實現者有 yarn,mesos,standalone。拿standalone模式來說粗粒度,每台物理機器是一個worker,worker一共可以使用多少 cpu和內存,啟動時候可以指定每個worker起幾個executor,即進程,每個executor的cpu和內存是多少。在我看來,粗粒度與細粒度的主要區別,就是粗粒度是進程long-running的,計算線程可以調到executor上跑,但executor的cpu和內存更容易浪費。細粒度的話,可以存在復用,可以實現搶占等等更加苛刻但促進資源利用率的事情。這倆概念還是AMPLab論文里最先提出來並在Mesos里實現的。AMPLab 在資源使用粒度甚至任務分配最優的這塊領域有不少論文,包括Mesos的DRF算法、Sparrow調度器等。所以standalone模式下,根據 RDD的partition數,以及每個task需要的cpu數,可以很容易計算每台物理機器的負載量、資源的消耗情況、甚至知道TaskSet要分幾批才能跑完一個stage。

來自: http://blog.csdn.net/pelick/article/details/41866845


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM