spark中執行任務會顯示如下格式的進度:
[Stage 4:=========================> (12 + 11) / 24]
# 這是stage4階段:
## 共有24個task(一個partition對應一個task,所以有24個partition)
## 當前正在並行執行的task數量為11。
## 這里沒有executor數,因為一個executor里可以同時執行多個task(每個task至少要占用一個虛擬核vcore)
## 已經有12個task執行完成
觀察這個進度過程有利於看出是否存在數據傾斜:若其中1個task的完成時間明顯高於其他task,說明很可能這個task處理的數據量多於其他task。
executor和task關系:
一個executor可以並行執行多個task,實際上一個executor是一個進程,task是executor里的一個線程。
一個task至少要獨占executor里的一個虛擬核心vcore。
一個executor里的核心數由spark-submit的--executor-cores
參數指定。
一個task要占用幾個核心,可以由.config("spark.task.cpus", 1)
配置,默認是1即一個task占用一個vcore。
同時並行執行的task最大數量 = executor數目 * (每個executor核數 / 每個task占用核心數)
任務執行快結束可能會變成這樣:
[Stage 4:=============================================> (22 + 2) / 24]
因為這時候還有2個task沒有完成,此時有些executor可能已經空閑下來了。
DataFrameReader讀取csv和json如果設了如下選項,會造成生成的DataFrame只有一個partition,也就是只有一個task:
.option("multiLine", true)
//加入此行會造成生成的DataFrame只有一個partition
因為spark要考慮讀取多行解析文件數據,所以不能進行文件的隨意分割。
相反,如果是單行模式則可以以任意行結束符進行分割,就能並行讀取,生成的DataFrame就能有多個分區。
如果要讀取的文本文件在hdfs上,生成DataFrame的分區數等於原始文件的block數。如1345MB文件,block大小128MB,會生成11個partition(1345/128=10.5)。
查看Dataset分區數:
ds.rdd.getNumPartitions
改變分區數:
ds.repartition #能任意改變分區數,但是速度慢
ds.coalesce #只能減少分區數,對平衡數據傾斜有效,而且是窄依賴所以速度塊。