本文主要說一下Spark中Task相關概念、RDD計算時Task的數量、Spark Streaming計算時Task的數量。
Task作為Spark作業執行的最小單位,Task的數量及運行快慢間接決定了作業運行的快慢。
開始
先說明一下Spark作業的幾個核心概念:
Job(作業):Spark根據行動操作觸發提交作業,以行動操作將我們的代碼切分為多個Job。
Stage(調度階段):每個Job中,又會根據寬依賴將Job划分為多個Stage(包括ShuffleMapStage和ResultStage)。
Task(任務):真正執行計算的部分。Stage相當於TaskSet,每個Stage內部包含了多個Task,將各個Task下發到各個Executor執行計算。
每個Task的處理邏輯完全一樣,不同的是對應處理的數據。即:移動計算而不是移動數據。
Partition(分區):這個是針對RDD而言的,RDD內部維護了分區列表,表示數據在集群中存放的不同位置。
Job、Stage、Task的對應關系如下:

Task是真正干活的,所以說是它間接決定了Spark程序的快慢也不過分。
再看看Spark任務提交時的幾個相關配置:
num-executors:配置執行任務的Executor的數量。
executor-cores:每個Executor的核的數量。此核非彼核,它不是機器的CPU核,可以理解為Executor的一個線程。
每個核同時只可以執行一個Task。
也就是說一個Spark應用同時執行的任務數 = 用於執行任務的Executor數 * 每個Executor的核數。
spark.executor.memory:每個Executor的內存大小。
spark.default.parallelism:RDD的默認分區數。
在我們沒有指定這個參數的前提下,如果是shuffle操作,這個值默認是父RDD中分區數較大的那個值;如果是普通操作,這個值的默認大小取決於集群管理器(YARN, Local這些)。
以YARN為例,如果我們沒有指定,它的大小就是所有用於執行任務的Executor核的總數。
spark.sql.shuffle.partitions:這個配置是針對於Spark SQL在shuffle時的默認分區數。默認值是200。只對Spark SQL起作用。
RDD計算時Task的數量
在基於RDD計算時,Task的數量 = RDD的分區數。
所以調整RDD分區的數量就可以變相的調整Task的數量。
所以當RDD計算跑的很慢時,可以通過適當的調整RDD分區數來實現提速。
看看Spark.parallelize生成RDD時的源碼實現:
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
// 這里的taskScheduler.defaultParallelism就是
// 取的配值中spark.default.parallelism的值。
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}
可以發現通過Spark.parallelize創建的RDD分區,如果我們不指定分區數,那么分區數就是由配置的spark.default.parallelism來決定。
Spark讀Hive、HDFS時的Task數量
這塊之后補上來。。。
Spark Streaming流處理時的Task數量
Spark Streaming作為Spark中用於流處理的一員,它的原理就是運行一個接收器接收數據,然后將接收的數據按塊進行存儲,之后划分Job,執行Task處理數據。
ok,Spark Streaming最后也會轉換成Task進行數據的處理,也就是Task運行速度也會影響它處理數據的速度。
Spark Streaming中Task的數量是由用來存儲接收到數據的Block數來決定的。
那么只要控制Block的數量就可以控制Task的數量。
如下代碼所示,Block是由一個定時器定時生成的。
// 塊生成間隔時間
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
// 一個定時器,按塊生成間隔時間定時根據接收到的數據生成塊。
private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
所以Block的數量 = 批處理間隔時間 / 塊生成間隔時間。
塊生成間隔時間是由配置spark.streaming.blockInterval決定的,默認是200ms,最小是50ms。
所以當Spark Streaming的Task數量成為性能的瓶頸時,可以通過調整參數來調整Task的數量。
總結
1、Task是Spark的最小執行單位,Executor每個核同時只能執行一個Task。
2、RDD計算時,Task數量與分區數對應;Spark Streaming中,Task數量由Block數決定。
3、根據分配的資源以及作業的運行情況,適當調整Task數量。
4、移動計算而不是移動數據。
end. 個人理解,如果偏差歡迎指正。

個人公眾號:碼農峰,定時推送行業資訊,持續發布原創技術文章,歡迎大家關注。
