
Spark內部有若干術語(Executor、Job、Stage、Task、Driver、DAG等),需要理解並搞清其內部關系,因為這是性能調優的基石。
節點類型有:
1. Master 節點: 常駐master進程,負責管理全部worker節點。
2. Worker 節點: 常駐worker進程,負責管理executor 並與master節點通信。
Dirvier:官方解釋為: The process running the main() function of the application and creating the SparkContext。即理解為用戶自己編寫的應用程序
Executor:執行器:
在每個Worker節點上為某應用啟動的一個進程,該進程負責運行任務,並且負責將數據存在內存或者磁盤上,每個job都有各自獨立的Executor。
Executor是一個執行Task的容器。它的主要職責是:
1、初始化程序要執行的上下文SparkEnv,解決應用程序需要運行時的jar包的依賴,加載類。
2、同時還有一個ExecutorBackend向cluster manager匯報當前的任務狀態,有點類似hadoop的tasktracker和task。
也就是說,Executor是一個應用程序運行的監控和執行容器。Executor的數目可以在submit時,由 --num-executors (on yarn)指定.
Job:
包含很多task的並行計算,可以認為是Spark RDD 里面的action,每個action算子的執行會生成一個job。
用戶提交的Job會提交給DAGScheduler,Job會被分解成Stage和Task。
Stage:
一個Job會被拆分為多組Task,每組任務被稱為一個Stage就像Map Stage, Reduce Stage。
Stage的划分簡單來說是以shuffle和result這兩種類型來划分。
在Spark中有兩類task,一類是shuffleMapTask,一類是resultTask。第一類task的輸出是shuffle所需數據,第二類task的輸出是result,stage的划分也以此為依據:shuffle之前的所有變換是一個stage,shuffle之后的操作是另一個stage。例如:
1) rdd.parallize(1 to 10).foreach(println) 這個操作沒有shuffle,直接就輸出了,那么只有它的task是resultTask,stage也只有一個;
2) 如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 這個job因為有reduce,所以有一個shuffle過程,那么reduceByKey之前的是一個stage,執行shuffleMapTask,輸出shuffle所需的數據,reduceByKey到最后是一個stage,直接就輸出結果了。
如果job中有多次shuffle,那么每個shuffle之前都是一個stage。
spark中會引起shuffle的算子有:
去重distinct、聚合reduceByKey/groupByKey/xxByKey、排序sortByKey、表關聯join、重分區Repartition/Coalesce(shuffle=true)等。
Task:
stage 下的單個任務執行單元。
一個rdd有多少個partition,就會有多少個task,因為每一個 task 只是處理一個partition上的數據。所以有時為提高執行並行度,使用Repartition或Coalesce(shuffle=true),增多partition數量,從而增多task數量
