Spark:partition、task、executor關系


spark中執行任務會顯示如下格式的進度:

[Stage 4:=========================>                              (12 + 11) / 24]
# 這是stage4階段:
## 共有24個task(一個partition對應一個task,所以有24個partition)
## 當前正在並行執行的task數量為11。
## 這里沒有executor數,因為一個executor里可以同時執行多個task(每個task至少要占用一個虛擬核vcore)
## 已經有12個task執行完成

觀察這個進度過程有利於看出是否存在數據傾斜:若其中1個task的完成時間明顯高於其他task,說明很可能這個task處理的數據量多於其他task。

executor和task關系:

一個executor可以並行執行多個task,實際上一個executor是一個進程,task是executor里的一個線程。
一個task至少要獨占executor里的一個虛擬核心vcore。
一個executor里的核心數由spark-submit的--executor-cores參數指定。
一個task要占用幾個核心,可以由.config("spark.task.cpus", 1)配置,默認是1即一個task占用一個vcore。

同時並行執行的task最大數量 = executor數目 * (每個executor核數 / 每個task占用核心數)

任務執行快結束可能會變成這樣:

[Stage 4:=============================================>          (22 + 2) / 24]

因為這時候還有2個task沒有完成,此時有些executor可能已經空閑下來了。


DataFrameReader讀取csv和json如果設了如下選項,會造成生成的DataFrame只有一個partition,也就是只有一個task:

.option("multiLine", true)  
//加入此行會造成生成的DataFrame只有一個partition

因為spark要考慮讀取多行解析文件數據,所以不能進行文件的隨意分割。

相反,如果是單行模式則可以以任意行結束符進行分割,就能並行讀取,生成的DataFrame就能有多個分區。
如果要讀取的文本文件在hdfs上,生成DataFrame的分區數等於原始文件的block數。如1345MB文件,block大小128MB,會生成11個partition(1345/128=10.5)。

查看Dataset分區數:

ds.rdd.getNumPartitions

改變分區數:

ds.repartition #能任意改變分區數,但是速度慢
ds.coalesce #只能減少分區數,對平衡數據傾斜有效,而且是窄依賴所以速度塊。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM