Spark中Task數量的分析


本文主要說一下Spark中Task相關概念、RDD計算時Task的數量、Spark Streaming計算時Task的數量。

Task作為Spark作業執行的最小單位,Task的數量及運行快慢間接決定了作業運行的快慢。


開始

先說明一下Spark作業的幾個核心概念

Job(作業):Spark根據行動操作觸發提交作業,以行動操作將我們的代碼切分為多個Job。

Stage(調度階段):每個Job中,又會根據寬依賴將Job划分為多個Stage(包括ShuffleMapStage和ResultStage)。

Task(任務):真正執行計算的部分。Stage相當於TaskSet,每個Stage內部包含了多個Task,將各個Task下發到各個Executor執行計算。

每個Task的處理邏輯完全一樣,不同的是對應處理的數據。即:移動計算而不是移動數據。

Partition(分區):這個是針對RDD而言的,RDD內部維護了分區列表,表示數據在集群中存放的不同位置。

Job、Stage、Task的對應關系如下:
Job-Stage-Task對應圖

Task是真正干活的,所以說是它間接決定了Spark程序的快慢也不過分。


再看看Spark任務提交時的幾個相關配置

num-executors:配置執行任務的Executor的數量。

executor-cores:每個Executor的核的數量。此核非彼核,它不是機器的CPU核,可以理解為Executor的一個線程。

每個核同時只可以執行一個Task。

也就是說一個Spark應用同時執行的任務數 = 用於執行任務的Executor數 * 每個Executor的核數

spark.executor.memory:每個Executor的內存大小。

spark.default.parallelism:RDD的默認分區數。

在我們沒有指定這個參數的前提下,如果是shuffle操作,這個值默認是父RDD中分區數較大的那個值;如果是普通操作,這個值的默認大小取決於集群管理器(YARN, Local這些)。

以YARN為例,如果我們沒有指定,它的大小就是所有用於執行任務的Executor核的總數

spark.sql.shuffle.partitions:這個配置是針對於Spark SQL在shuffle時的默認分區數。默認值是200。只對Spark SQL起作用。

RDD計算時Task的數量

在基於RDD計算時,Task的數量 = RDD的分區數

所以調整RDD分區的數量就可以變相的調整Task的數量。

所以當RDD計算跑的很慢時,可以通過適當的調整RDD分區數來實現提速。

看看Spark.parallelize生成RDD時的源碼實現:

def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
        new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

// 這里的taskScheduler.defaultParallelism就是
// 取的配值中spark.default.parallelism的值。
def defaultParallelism: Int = {
    assertNotStopped()
        taskScheduler.defaultParallelism
}

可以發現通過Spark.parallelize創建的RDD分區,如果我們不指定分區數,那么分區數就是由配置的spark.default.parallelism來決定。

Spark讀Hive、HDFS時的Task數量

這塊之后補上來。。。


Spark Streaming流處理時的Task數量

Spark Streaming作為Spark中用於流處理的一員,它的原理就是運行一個接收器接收數據,然后將接收的數據按塊進行存儲,之后划分Job,執行Task處理數據。

ok,Spark Streaming最后也會轉換成Task進行數據的處理,也就是Task運行速度也會影響它處理數據的速度。


Spark Streaming中Task的數量是由用來存儲接收到數據的Block數來決定的。

那么只要控制Block的數量就可以控制Task的數量。

如下代碼所示,Block是由一個定時器定時生成的。

// 塊生成間隔時間
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
// 一個定時器,按塊生成間隔時間定時根據接收到的數據生成塊。
private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")

所以Block的數量 = 批處理間隔時間 / 塊生成間隔時間

塊生成間隔時間是由配置spark.streaming.blockInterval決定的,默認是200ms,最小是50ms。

所以當Spark Streaming的Task數量成為性能的瓶頸時,可以通過調整參數來調整Task的數量。

總結

1、Task是Spark的最小執行單位,Executor每個核同時只能執行一個Task。

2、RDD計算時,Task數量與分區數對應;Spark Streaming中,Task數量由Block數決定。

3、根據分配的資源以及作業的運行情況,適當調整Task數量。

4、移動計算而不是移動數據。


end. 個人理解,如果偏差歡迎指正。



個人公眾號:碼農峰,定時推送行業資訊,持續發布原創技術文章,歡迎大家關注。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM