講說spark的資源調度和任務調度,基本的spark術語,這里不再多說,懂的人都懂了。。。
按照數字順序閱讀,逐漸深入理解:以下所有截圖均為個人上傳,不知道為什么總是顯示別人的QQ,好尷尬,無所謂啦,開始吧~~
1 寬窄依賴與Stage划分:
上熟悉的圖:
在 Spark 里每一個操作生成一個 RDD,RDD 之間連一條邊,最后這些 RDD 和他們之間的邊組成一個有向無環圖,這個就是 DAG,Spark 內核會在需要計算發生的時刻繪制一張關於計算路徑的有向無環圖,也就是 DAG。有了DAG 圖,Spark 內核下一步的任務就是根據 DAG 圖將計算划分成 Stage,
上圖:G 與 F 之間是寬依賴,所以把 G 和 F 分為兩個 Stage,而 C 、D 到 F,E 到 F 都是窄依賴,所以 CDEF 最終划分為一個 Stage2,A 與 B 之間是寬依賴,B 與 G 之間是窄依賴,所以最終,A 被划分為一個 Stage1,因為 BG 的 stage 依賴於 stage1 和 stage2,所以最終把整個DAG 划分為一個 stage3,所以說,寬窄依賴的作用就是切割 job,划分 stage。
Stage:由一組可以並行計算的 task 組成。
Stage 的並行度:就是其中的 task 的數量。
與互聯網業界的概念有些差異:在互聯網的概念中,並行度是指可同時開辟的線程數,並發數是指每個線程中可處理的最大數據量,比如: 4 個線程,每個線程可處理的數據為 100 萬條,那么並行度就是 4,並發量是 100 萬;而對於 stage 而言,即使其中的 task是分批進行執行的,也都算在並行度中,比如,stage 中有 100 個 task,而這 100 個 task 分4 批次才能執行完,那么該 stage 的並行度也為 100。Stage 的並行度是由最后一個 RDD 的分區決定的。
2 資源調度 (有人喜歡那standalone client方式舉例,那我這把cluster 模式和client模式都說下得了~)
Cluster模式:
源碼位置如圖master.scala:
a)Worker 啟動后向 Master 注冊
b) client 向 Master 發送一條消息,為當前的 Application 啟動一個 Driver 進程
啟動Driver源碼如下圖:
c)Driver 進程向 Master 發送消息:為當前的 Application 申請一批 Executor:
EXECUTOR創建的過程如圖:
上圖的過程,展示了Executor進程被啟動的過程。那么下面的是簡圖:
總結資源調度:
1) 在默認情況下(沒有使用--executor --cores 這個選項)時,每一個 Worker 節點為當前的 Application 只啟動一個 Executor,這個 Executor 會使用這個 Worker 管理的所有的cores。(原因:assignedCores(pos) += minCoresPerExecutor);
2) 默認情況下,每個 Executor 使用 1G 內存;
3) 如果想要在一個 Worker 節點啟動多個 Executor,需要使--executor --cores 這個選項;
4) spreadOutApps 這個參數可以決定 Executor 的啟動方式,默認輪詢方式啟動,這樣有利於數據的本地化。
Client模式(standalone):
實際上我們的代碼會先在Driver這個進程(我們寫的spark程序,打成jar包,用spark-submit來提交,local模式當我們的程序提交到集群上時,會加載並執行我們的jar包,並找到jar包中的main函數執行一遍,執行main所啟動的這個進程名就是SparkSubmit,這個進程就是我們所說的Driver進程; cluster模式會在集群中找一台node,會啟動一個進程執行一遍我們提交的代碼,這個進程就是Driver,Driver啟動之后將其信息注冊到Master中,存儲在ArrayBuffer中)中執行一遍,Driver這個進程中有SparkContext這個對象。代碼從main函數開始執行,new SparkConf()設置我們運行Spark時的環境參數(還可以通過spark-submit -- 加上參數來設置),new SparkContext()(在源碼的700多行左右),創建DAGScheduler和TaskScheduler,TaskScheduler另啟動一個線程將Application注冊到Master中【 是放到Master中的ArrayBuffer的數據結構中,當ArrayBuffer中有信息之后,Master會調用自己的schedule()方法,schedule會為當前的Application申請資源,此時master會找一些空閑的Worker,並在Worker上啟動Executor進程,Executor啟動完成之后會反向注冊給TaskScheduler(在driver)】 。
3 任務調度
Cluster模式:
源碼位置:core/src/main/scala/rdd/RDD.scala。
上圖是任務調度源碼,不在文字敘述,可以參考client模式的部分敘述。
注意:2點:
【a:提交時的容錯能力】
TaskScheduler提交task如果發生了失敗,默認會重試三次,如果依然失敗,那么則認為這個task就失敗了,這時會進行stage重試,DAGScheduler會重新發送TaskSet給TaskScheduler,默認會重試四次,如果四次后依然失敗,則認為job失敗。因此一個task默認情況下重試3*4=12次
如果task失敗是由於shuffle file not find造成的,那么TaskScheduler是不負責重試的,直接進行stage重試。
【b:重試機制的問題】
如果:Task的邏輯是將處理的數據結果放入到數據庫中,
如果一個Task提交到百分之七十五,然后失敗了,這時候會重試,那么有執行了一次task,這時候就會有臟數據的產生。
以上問題如何去解決?
【1.關閉重試機制。
2.在數據庫中設置主鍵,這時候如果重復提交,那么會失敗,也就避免了臟數據的產生。
】
Client 模式(接2 資源調度中的Client模式):
任務調度: (此時代碼的第二行(new SparkContext(conf))已經執行完畢)
a).Action類型的算子觸發job的執行。源碼中調用了SparkContext的runJob()方法,跟進源碼發現底層調用的是DAGScheduler的runJob()方法。
DAGScheduler會將我們的job按照寬窄依賴划分為一個個stage,每個stage中有一組並行計算的task,每一個task都可以看做是一個”pipeline”,,這個管道里面數據是一條一條被計算的,每經過一個RDD會經過一次處理,RDD是一個抽象的概念里面存儲的是一些計算的邏輯,每一條數據計算完成之后會在shuffle write過程中將數據落地寫入到我們的磁盤中。
b).stage划分完之后會以TaskSet的形式提交給我們的TaskScheduler。
源碼中TaskScheduler.submit.tasks(new TaskSet())只是一個調用方法的過程而已。我們口述說是發送到TaskScheduler。TaskScheduler接收到TaskSet之后會進行遍歷,每遍歷一條調用launchTask()方法,launchTask()根據數據本地化的算法發送task到指定的Executor中執行。task在發送到Executor之前首先進行序列化,Executor中有ThreadPool,ThreadPool中有很多線程,在這里面來具體執行我們的task。
c).TaskScheduler和Executor之間有通信(Executor有一個郵箱(消息循環體CoresExecutorGraintedBackend)),Executor接收到task
Executor接收到task后首先將task反序列化,反序列化后將這個task變為taskRunner(new taskRunner),並不是TaskScheduler直接向Executor發送了一個線程,這個線程是在Executor中變成的。然后這個線程就可以在Executor中的ThreadPool中執行了。
d).Executor接收到的task分為maptask 和 reducetask
map task 和 reduce task,比如這里有三個stage,先從stage1到stage2再到stage3,針對於stage2來說,stage1中的task就是map task ,stage2中的task就是reduce task,針對stage3來說...map task 是一個管道,管道的計算結果會在shuffle write階段數據落地,數據落地會根據我們的分區策略寫入到不同的磁盤小文件中,注意相同的key一定寫入到相同的磁盤小文件中),map端執行完成之后,會向Driver中的DAGScheduler對象里面的MapOutputTracker發送了一個map task的執行狀態(成功還是失敗還有每一個小文件的地址)。然后reduce task開始執行,reduce端的輸入數據就是map端的輸出數據。那么如何拿到map端的輸出數據呢?reduce task會先向Driver中MapOutPutTracker請求這一批磁盤小文件的地址,拿到地址后,由reduce task所在的Executor里面的BlockManager向Map task 所在的Executor先建立連接,連接是由ConnectionManager負責的,然后由BlockTransformService去拉取數據,拉取到的數據作為reduce task的輸入數據(如果使用到了廣播變量,reduce task 或者map task 它會先向它所在的Executor中的BlockManager要廣播變量,沒有的話,本地的BlockManager會去連接Driver中的BlockManagerMaster,連接完成之后由BlockTransformService將廣播變量拉取過來)Executor中有了廣播變量了,task就可以正常執行了。
只說實用,不舉理論!