spark內核篇-task數與並行度


每一個 spark job 根據 shuffle 划分 stage,每個 stage 形成一個或者多個 taskSet,了解了每個 stage 需要運行多少個 task,有助於我們優化 spark 運行

 

task 數 

首先需要了解以下概念:

RDD,彈性分布式數據集,多個 partition;

split,切片,HDFS 上文件為什么要切片,如何切片,參考我的博客 hadoop 的 Split

textFlie 分區,textFile 如何對一個文件分區,參考我的博客 RDD認知與創建

 

創建 RDD 的過程我們可以認為沒有 task 的概念,比如 讀取 HDFS 文件;

有了 RDD 后才有 task 的概念; 

 

重點

一個 inputSplit 對應 RDD 的一個 partition;

RDD 的一個 partition 對應一個 task,也就是說 一個 inputSplit 對應一個 task;

通常情況下 一個 block 對應一個 inputSplit;

  // 以 textFile 為例,每個 inputSplit 不能大於 blockSize,也就是說 可以把 block 切開,但不能把多個 block 組合起來,如果不指定分區,那么每個切片就是 block;

 

作如下實驗證明上述結論

import time
from pyspark import SparkContext

time.clock()
sc = SparkContext(master='yarn')
rdd = sc.textFile('/spark/gps/GPS3.csv', 2).repartition(100).map(lambda x: x).count()
print(time.clock())

##### GPS3.csv 315M,分為 3 個 block
#### 不指定分區-100 runtime:0.64
### 划分2個 stage,
# 第一個 stage sc.textFile('/spark/gps/GPS3.csv').repartition(100) 共 3 個task,
# 第二個 stage .map(lambda x: x).count() 共 100個task
# 19/12/10 22:16:15 INFO cluster.YarnScheduler: Adding task set 0.0 with 3 tasks
# 19/12/10 22:16:34 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, hadoop13, executor 2, partition 0, NODE_LOCAL, 7899 bytes)
# 19/12/10 22:17:07 INFO cluster.YarnScheduler: Adding task set 1.0 with 100 tasks

#### 指定 5 個分區-100 runtime:0.54
### 划分2個 stage,
# 第一個 stage sc.textFile('/spark/gps/GPS3.csv').repartition(100) 共 5 個task,
# 第二個 stage .map(lambda x: x).count() 共 100個task
# 19/12/10 22:23:09 INFO cluster.YarnScheduler: Adding task set 0.0 with 5 tasks
# 19/12/10 22:17:07 INFO cluster.YarnScheduler: Adding task set 1.0 with 100 tasks

#### 指定 2 個分區-100 runtime:0.6
### 划分2個 stage,
# 第一個 stage sc.textFile('/spark/gps/GPS3.csv').repartition(100) 共 3 個task,
# 第二個 stage .map(lambda x: x).count() 共 100個task
# 19/12/10 22:23:09 INFO cluster.YarnScheduler: Adding task set 0.0 with 3 tasks
# 19/12/10 22:17:07 INFO cluster.YarnScheduler: Adding task set 1.0 with 100 tasks

可以看到

 

task 並行度

首先明確一點,並行度與 task 數並無關系,並行度是由 spark-submit 提交的參數決定的

 

taskSet 被分發到多個 Executor 執行;

每個節點可以運行多個 Executor,一個 Executor 相當於一個進程;

一個 Executor 可以有多個 core,一個 core 執行一個 task,一個 core 相當於 Executor 進程里的一個線程;

 

task 的並發度 = Executor 數 x core 數 = 總 core 數;

對應到 yarn 模式的 spark-submit 參數

--num-executors

--executor-cores

--total-executor-cores  【這個參數官方解釋只能用於 Spark standalone and Mesos only 模式,不過我用在 yarn 模式沒報錯】

 

試想如果有 100 個任務,20 個 Executor,每個 Executor 5 個 core,那么資源利用率極高;

然而加入只有 10 個任務,還是 20 個 Executor,每個 Executor 5 個 core,那么資源有很大浪費,這是 spark 調優的一個方向

 

 

 

參考資料:

https://blog.csdn.net/u012965373/article/details/80847543

https://blog.csdn.net/abc_321a/article/details/82020974


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM