Spark Application、Driver、Job、stage、task


1、Application

  application(應用)其實就是用spark-submit提交的程序。一個application通常包含三部分:從數據源(比方說HDFS)取數據形成RDD,通過RDD的transformation和action進行計算,將結果輸出到console或者外部存儲。

2、Driver

  Spark中的driver感覺其實和yarn中Application Master的功能相類似。主要完成任務的調度以及和executor和cluster manager進行協調。有client和cluster聯眾模式。client模式driver在任務提交的機器上運行,而cluster模式會隨機選擇機器中的一台機器啟動driver。通俗講,driver可以理解為用戶自己編寫的程序。我們使用spark-submit提交一個Spark作業之后,這個作業就會啟動一個對應的Driver進程.

  Driver進程本身會根據我們設置的參數,占有一定數量的內存和CPU core。而Driver進程要做的第一件事情,就是向集群管理器(常用的如 yarn)申請運行Spark作業需要使用的資源,這里的資源指的就是Executor進程。YARN集群管理器會根據我們為Spark作業設置的資源參數,在各個工作節點上,啟動一定數量的Executor進程,每個Executor進程都占有一定數量的內存和CPU core。

  在申請到了作業執行所需的資源之后,Driver進程就會開始調度和執行我們編寫的作業代碼了。Driver進程會將我們編寫的Spark作業代碼分拆為多個stage,每個stage執行一部分代碼片段,並為每個stage創建一批task,然后將這些task分配到各個Executor進程中執行。task是最小的計算單元,負責執行一模一樣的計算邏輯(也就是我們自己編寫的某個代碼片段),只是每個task處理的數據不同而已。一個stage的所有task都執行完畢之后,會在各個節點本地的磁盤文件中寫入計算中間結果,然后Driver就會調度運行下一個stage。下一個stage的task的輸入數據就是上一個stage輸出的中間結果。如此循環往復,直到將我們自己編寫的代碼邏輯全部執行完,並且計算完所有的數據,得到我們想要的結果止。  

  Spark是根據shuffle類算子來進行stage的划分。如果我們的代碼中執行了某個shuffle類算子(比如reduceByKey、join等),那么就會在該算子處,划分出一個stage界限來。可以大致理解為,shuffle算子執行之前的代碼會被划分為一個stage,shuffle算子執行以及之后的代碼會被划分為下一個stage。因此一個stage剛開始執行的時候,它的每個task可能都會從上一個stage的task所在的節點,去通過網絡傳輸拉取需要自己處理的所有key,然后對拉取到的所有相同的key使用我們自己編寫的算子函數執行聚合操作(比如reduceByKey()算子接收的函數)。這個過程就是shuffle。

  當我們在代碼中執行了cache/persist等持久化操作時,根據我們選擇的持久化級別的不同,每個task計算出來的數據也會保存到Executor進程的內存或者所在節點的磁盤文件中。因此Executor的內存主要分為三塊:第一塊是讓task執行我們自己編寫的代碼時使用,默認是占Executor總內存的20%;第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進行聚合等操作時使用,默認也是占Executor總內存的20%;第三塊是讓RDD持久化時使用,默認占Executor總內存的60%。

3、Job

  Spark中的Job和MR中Job不一樣不一樣。MR中Job主要是Map或者Reduce Job。而Spark的Job其實很好區別,一個action算子就算一個Job,比方說count,first等。

4、Stage

  stage 是一個 job 的組成單位,就是說,一個 job 會被切分成 1 個或 1 個以上的 stage,然后各個 stage 會按照執行順序依次執行。Spark的Stage是分割RDD執行的各種transformation而來。分割Stage的規則,其實只有一個:從寬依賴處分割。

  RDD被設計為可以記錄依賴關系,關系可以分為兩類:窄依賴和寬依賴。窄依賴:表示父親 RDD 的一個分區最多被子 RDD 一個分區所依賴。寬依賴:表示父親 RDD 的一個分區可以被子 RDD 的多個子分區所依賴。如下圖,左邊是窄依賴,右邊是寬依賴:

  知道了這個分割規則,其實還是有一點疑惑,為什么這么分?
其實道理蠻明顯的,子RDD的partition會依賴父RDD中多個partition,這樣就可能會有一些partition沒有准備好,導致計算不能繼續,所以就分開了,直到准備好了父RDD中所有partition,再繼續進行將父RDD轉換為子RDD的計算。而窄依賴完全不會有這個顧慮,窄依賴是父RDD一個partition對應子RDD一個partition,那么直接計算就可以了。

4、Task

  一個Stage內,最終的RDD有多少個partition,就會產生多少個task。一般情況下,我們一個task運行的時候,使用一個cores。task的數量就是我們任務的最大的並行度。

  task的執行速度是跟每個Executor進程的CPU core數量有直接關系的。一個CPU core同一時間只能執行一個線程。而每個Executor進程上分配到的多個task,都是以每個task一條線程的方式,多線程並發運行的。如果CPU core數量比較充足,而且分配到的task數量比較合理,那么通常來說,可以比較快速和高效地執行完這些task線程。

  如果我們的task數量超過cores總數,則先執行cores個數量的task,然后等待cpu資源空閑后,繼續執行剩下的task。

 

參考:https://www.cnblogs.com/wzj4858/p/8204411.html

  https://blog.csdn.net/gaopu12345/article/details/79156675

  https://blog.csdn.net/mys_35088/article/details/80864092


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM