每一個 spark job 根據 shuffle 划分 stage,每個 stage 形成一個或者多個 taskSet,了解了每個 stage 需要運行多少個 task,有助於我們優化 spark 運行
task 數
首先需要了解以下概念:
RDD,彈性分布式數據集,多個 partition;
split,切片,HDFS 上文件為什么要切片,如何切片,參考我的博客 hadoop 的 Split;
textFlie 分區,textFile 如何對一個文件分區,參考我的博客 RDD認知與創建;
創建 RDD 的過程我們可以認為沒有 task 的概念,比如 讀取 HDFS 文件;
有了 RDD 后才有 task 的概念;
重點
一個 inputSplit 對應 RDD 的一個 partition;
RDD 的一個 partition 對應一個 task,也就是說 一個 inputSplit 對應一個 task;
通常情況下 一個 block 對應一個 inputSplit;
// 以 textFile 為例,每個 inputSplit 不能大於 blockSize,也就是說 可以把 block 切開,但不能把多個 block 組合起來,如果不指定分區,那么每個切片就是 block;
作如下實驗證明上述結論
import time from pyspark import SparkContext time.clock() sc = SparkContext(master='yarn') rdd = sc.textFile('/spark/gps/GPS3.csv', 2).repartition(100).map(lambda x: x).count() print(time.clock()) ##### GPS3.csv 315M,分為 3 個 block #### 不指定分區-100 runtime:0.64 ### 划分2個 stage, # 第一個 stage sc.textFile('/spark/gps/GPS3.csv').repartition(100) 共 3 個task, # 第二個 stage .map(lambda x: x).count() 共 100個task # 19/12/10 22:16:15 INFO cluster.YarnScheduler: Adding task set 0.0 with 3 tasks # 19/12/10 22:16:34 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, hadoop13, executor 2, partition 0, NODE_LOCAL, 7899 bytes) # 19/12/10 22:17:07 INFO cluster.YarnScheduler: Adding task set 1.0 with 100 tasks #### 指定 5 個分區-100 runtime:0.54 ### 划分2個 stage, # 第一個 stage sc.textFile('/spark/gps/GPS3.csv').repartition(100) 共 5 個task, # 第二個 stage .map(lambda x: x).count() 共 100個task # 19/12/10 22:23:09 INFO cluster.YarnScheduler: Adding task set 0.0 with 5 tasks # 19/12/10 22:17:07 INFO cluster.YarnScheduler: Adding task set 1.0 with 100 tasks #### 指定 2 個分區-100 runtime:0.6 ### 划分2個 stage, # 第一個 stage sc.textFile('/spark/gps/GPS3.csv').repartition(100) 共 3 個task, # 第二個 stage .map(lambda x: x).count() 共 100個task # 19/12/10 22:23:09 INFO cluster.YarnScheduler: Adding task set 0.0 with 3 tasks # 19/12/10 22:17:07 INFO cluster.YarnScheduler: Adding task set 1.0 with 100 tasks
可以看到
task 並行度
首先明確一點,並行度與 task 數並無關系,並行度是由 spark-submit 提交的參數決定的
taskSet 被分發到多個 Executor 執行;
每個節點可以運行多個 Executor,一個 Executor 相當於一個進程;
一個 Executor 可以有多個 core,一個 core 執行一個 task,一個 core 相當於 Executor 進程里的一個線程;
task 的並發度 = Executor 數 x core 數 = 總 core 數;
對應到 yarn 模式的 spark-submit 參數
--num-executors
--executor-cores
--total-executor-cores 【這個參數官方解釋只能用於 Spark standalone and Mesos only 模式,不過我用在 yarn 模式沒報錯】
試想如果有 100 個任務,20 個 Executor,每個 Executor 5 個 core,那么資源利用率極高;
然而加入只有 10 個任務,還是 20 個 Executor,每個 Executor 5 個 core,那么資源有很大浪費,這是 spark 調優的一個方向
參考資料:
https://blog.csdn.net/u012965373/article/details/80847543
https://blog.csdn.net/abc_321a/article/details/82020974