轉載請標明出處http://www.cnblogs.com/haozhengfei/p/0593214ae0a5395d1411395169eaabfa.html
Spark Core 資源調度與任務調度(standalone client 流程描述)
Spark集群啟動:
集群啟動后,Worker會向Master匯報資源情況(實際上將Worker的資源寫入到Master的HashSet數據機構中)
一個 Worker 默認給一個 Application 啟動 1 個 Executor,可以設置 --executor-cores num 來啟動多個。開機啟動時最好設置 spreadOut, 可以在集群中分散啟動 executor。
資源調度:
實際上我們的代碼會先在Driver這個進程(
我們寫的spark程序,打成jar包,用spark-submit來提交,
local模式當我們的程序提交到集群上時,會加載並執行我們的jar包,並找到jar包中的main函數執行一遍,執行main所啟動的這個進程名就是SparkSubmit,這個進程就是我們所說的Driver進程; cluster模式會在集群中找一台node,會啟動一個進程執行一遍我們提交的代碼,這個進程就是Driver,Driver啟動之后
將其信息注冊到Master中,存儲在ArrayBuffer中
)
中執行一遍,Driver這個進程中有SparkContext這個對象。代碼從main函數開始執行
,
new SparkConf()
設置我們運行Spark時的環境參數(還可以通過spark-submit -- 加上參數來設置),
newSparkContext()(在源碼的700多行左右),創建DAGScheduler和TaskScheduler,
TaskScheduler
另啟動一個線程將Application注冊到Master中(是放到
Master
中的
ArrayBuffer
的數據結構中,當
ArrayBuffer
中有信息之后,
Master
會調用自己的
schedule()
方法,
schedule
會為當前的
Application
申請資源,此時master會找一些空閑的
Worker
,並在
Worker
上啟動
Executor
進程,
Executor
啟動完成之后會
反向注冊
給
TaskScheduler
)
現在Application有了,有了資源后,開始進行任務的調度
任務
調度:
(此時代碼的第二行(new SparkContext(conf))已經執行完畢)
1.
Action類型的算子觸發job的執行。源碼中調用了SparkContext的runJob()方法,跟進源碼發現底層調用的是DAGScheduler的runJob()方法。
DAGScheduler會將我們的job按照寬窄依賴划分為一個個stage,每個stage中有一組並行計算的task,每一個task都可以看做是一個”pipeline”,,這個管道里面數據是一條一條被計算的,每經過一個RDD會經過一次處理,RDD是一個抽象的概念里面存儲的是一些
計算的邏輯
,每一條數據計算完成之后會在
shuffle write
過程中將數據落地寫入到我們的磁盤中。
2.stage划分完之后會以TaskSet的形式提交給我們的TaskScheduler。
源碼中TaskScheduler.submit.tasks(new TaskSet())只是一個調用方法的過程而已。我們口述說是發送到TaskScheduler。TaskScheduler接收到TaskSet之后會進行遍歷,每遍歷一條調用
launchTask()
方法,launchTask()根據
數據本地化的算法發送task到指定的Executor中執行。
task在發送到Executor之前首先進行序列化,Executor中有ThreadPool,ThreadPool中有很多線程,在這里面來具體執行我們的task。
3.TaskScheduler和Executor之間有通信(
Executor有一個郵箱(消息循環體CoresExecutorGraintedBackend)
),
Executor接收到task
Executor接收到task后首先將task反序列化,反序列化后將這個task變為
taskRunner(new taskRunner)
,並不是TaskScheduler直接向Executor發送了一個線程,這個線程是在Executor中變成的。然后這個線程就可以在Executor中的ThreadPool中執行了。
4.Executor接收到的task分為maptask 和 reducetask
map task 和 reduce task,比如這里有三個stage,先從stage1到stage2再到stage3,針對於stage2來說,stage1中的task就是map task ,stage2中的task就是reduce task,針對stage3來說...map task 是一個管道,管道的計算結果會在shuffle write階段數據落地,數據落地會根據我們的分區策略寫入到不同的磁盤小文件中,注意相同的key一定寫入到相同的磁盤小文件中),map端執行完成之后,會向Driver中的DAGScheduler對象里面的MapOutputTracker發送了一個map task的執行狀態(成功還是失敗還有每一個小文件的地址)。然后reduce task開始執行,reduce端的輸入數據就是map端的輸出數據。那么如何拿到map端的輸出數據呢?reduce task會先向Driver中MapOutPutTracker請求這一批磁盤小文件的地址,拿到地址后,由reduce task所在的Executor里面的BlockManager向Map task 所在的Executor先建立連接,連接是由ConnectionManager負責的,然后由BlockTransformService去拉取數據,拉取到的數據作為reduce task的輸入數據(如果使用到了廣播變量,reduce task 或者map task 它會先向它所在的Executor中的BlockManager要廣播變量,沒有的話,本地的BlockManager會去連接Driver中的BlockManagerMaster,連接完成之后由BlockTransformService將廣播變量拉取過來)Executor中有了廣播變量了,task就可以正常執行了。
【細節一:提交時的容錯能力】
TaskScheduler提交task如果發生了失敗,默認會重試三次,如果依然失敗,那么則認為這個task就失敗了,這時會進行stage重試,DAGScheduler會重新發送TaskSet給TaskScheduler,默認會重試四次,如果四次后依然失敗,則認為job失敗。因此一個task默認情況下重試3*4=12次
如果task失敗是由於shuffle file not find造成的,那么TaskScheduler是不負責重試的,直接進行stage重試。
【細節二:重試機制的問題】
前提:Task的邏輯是將處理的數據結果放入到數據庫中
如果一個Task提交到百分之七十五,然后失敗了,這時候會重試,那么有執行了一次task,這時候就會有臟數據的產生。
以上問題如何去解決?
1.關閉重試機制。
2.在數據庫中設置主鍵,這時候如果重復提交,那么會失敗,也就避免了臟數據的產生。