spark 源碼分析之二十一 -- Task的執行流程


引言

在上兩篇文章 spark 源碼分析之十九 -- DAG的生成和Stage的划分 和 spark 源碼分析之二十 -- Stage的提交 中剖析了Spark的DAG的生成,Stage的划分以及Stage轉換為TaskSet后的提交。

如下圖,我們在前兩篇文章中剖析了DAG的構建,Stage的划分以及Stage轉換為TaskSet后的提交,本篇文章主要剖析TaskSet被TaskScheduler提交之后的Task的整個執行流程,關於具體Task是如何執行的兩種stage對應的Task的執行有本質的區別,我們將在下一篇文章剖析。

我們先來剖析一下SchdulerBackend的子類實現。在yarn 模式下,它有兩個實現yarn-client 模式下的 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend實現 和 yarn-cluster 模式下的 org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend 實現,如下圖。

這兩個類在spark 項目的 resource-managers 目錄下的 yarn 目錄下定義實現。

下面簡單看一下這幾個類的定義和實現。

ExecutorAllocationClient

簡單說明一下,這個類主要是負責想Cluster Master請求或殺掉executor。核心方法如下,不做過多解釋,可以看源碼做進一步了解。

 

SchedulerBackend

接口定義

A backend interface for scheduling systems that allows plugging in different ones under TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as machines become available and can launch tasks on them.

主要方法

其定義的方法如下:

killTask:請求 executor 殺掉正在運行的task

applicationId:獲取job的applicationId

applicationAttemptId:獲取task的 attemptId

getDriverLogUrls:獲取驅動程序日志的URL。這些URL用於顯示驅動程序的UI Executors選項卡中的鏈接。

maxNumConcurrentTasks:當前task的最大並發數

下面我們來看一下它的子類。

 

CoarseGrainedSchedulerBackend

類聲明

A scheduler backend that waits for coarse-grained executors to connect. This backend holds onto each executor for the duration of the Spark job rather than relinquishing executors whenever a task is done and asking the scheduler to launch a new executor for each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).

調度程序后端,等待粗粒度執行程序進行連接。 此后端在Spark作業期間保留每個執行程序,而不是在任務完成時放棄執行程序並要求調度程序為每個新任務啟動新的執行程序。 執行程序可以以多種方式啟動,例如用於粗粒度Mesos模式的Mesos任務或用於Spark的獨立部署模式(spark.deploy。*)的獨立進程。

內部類DriverEndpoint

類說明

它是線程安全的。代表的是Driver的endpoint

類結構

如下,是它的類結構:

rpcEnv 是指的每個節點上的NettyRpcEnv

executorsPendingLossReason:記錄了已經丟失的並且不知道原因的executor

addressToExecutorId:記錄了每一個executor的id和executor地址的映射關系 

下面我們看一下Task以及其繼承關系。

 

Task

類說明

它是Task的基本單元。

類結構

即內部結構如下:
下面看一下其核心方法。

run:運行Task,被executor調用,源碼如下:

 

runTask 運行Task,被run方法調用,它是一個抽象方法,由子類來實現。

kill:殺死Task。源碼如下:

下面看一下其繼承關系。

繼承關系

Task的繼承關系如下:

A unit of execution. We have two kinds of Task's in Spark:

- org.apache.spark.scheduler.ShuffleMapTask

- org.apache.spark.scheduler.ResultTask

A Spark job consists of one or more stages.

The very last stage in a job consists of multiple ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task and sends the task output back to the driver application. A ShuffleMapTask executes the task and divides the task output to multiple buckets (based on the task's partitioner).

下面分別看一下兩個Task的實現,是如何定義 runTask 方法的?

ResultTask

類名:org.apache.spark.scheduler.ResultTask

其runTask方法如下:

 

ShuffleMapTask

類名:org.apache.spark.scheduler.ShuffleMapTask

其runTask方法如下:

Executor

全稱:org.apache.spark.executor.Executor

類說明

Executor對象是Spark Executor的抽象,它背后有一個線程池用來執行任務。其實從源碼可以看出,Spark的Executor這個術語,其實來自於Java線程池部分的Executors。

下面主要分析一下其內部的結構。

執行Task的線程池

線程池定義如下:

心跳機制

Executor會不斷地向driver發送心跳來匯報其健康狀況,如下:

EXECUTOR_HEARTBEAT_INTERVAL 值默認為 10s, 可以通過參數 spark.executor.heartbeatInterval 來進行調整。

startDriverHeartBeater方法如下:

其依賴方法 reportHeartBeat 方法源碼如下:

殺死任務機制--reaper機制

首先先來了解一下 TaskReaper。

TaskReaper

類說明:

Supervises the killing / cancellation of a task by sending the interrupted flag, optionally sending a Thread.interrupt(), and monitoring the task until it finishes. Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptable or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks. The TaskReaper was introduced in SPARK-18761 as a mechanism to monitor and clean up zombie tasks. For backwards-compatibility / backportability this component is disabled by default and must be explicitly enabled by setting spark.task.reaper.enabled=true. A TaskReaper is created for a particular task when that task is killed / cancelled. Typically a task will have only one TaskReaper, but it's possible for a task to have up to two reapers in case kill is called twice with different values for the interrupt parameter. Once created, a TaskReaper will run until its supervised task has finished running. If the TaskReaper has not been configured to kill the JVM after a timeout (i.e. if spark.task.reaper.killTimeout < 0) then this implies that the TaskReaper may run indefinitely if the supervised task never exits. 

其源碼如下:

思路:發送kill信號,等待一定時間后,如果任務停止,則返回,否則yarn模式下拋出一場,對local模式沒有影響。

是否啟用reaper機制

reaper機制默認是不啟用的,可以通過參數 spark.task.reaper.enabled 來啟用。

taskReapter線程池

它也是一個daemon的支持多個worker同時工作的線程池,也就是說可以同時停止多個任務。

kill任務

當kill任務的時候,會調用kill Task方法,源碼如下:

 

driver端SchedulerBackend接受task請求

在上一篇文章spark 源碼分析之二十 -- Stage的提交中,提到SchedulerBackend接收到task請求后調用了 makeOffsers 方法,如下:

先調用TaskScheduler分配資源,並返回TaskDescription對象,然后拿着該對象去執行任務。

分配資源

過濾掉即將被回收的executor

executorDataMap的定義如下:

其中ExecutorData 是記錄着executor的信息。包括 executor的address,port,可用cpu核數,總cpu核數等信息。

executorIsAlive方法定義如下:

即該executor既不在即將被回收的集合中也不在丟失的executor集合中。

構造WorkOffer集合

WorkOffer對象代表着一個executor上的可用資源,類定義如下:

分配資源

org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers 方法如下:

思路:先過濾掉不可用的WorkOffser對象,然后給每一個TaskSet分配資源。如果taskSet是barrier的,需要初始化barrierCoordinator的rpc endpoint。

記錄映射關系

記錄hostname和executorId的映射關系,記錄executorId和taskId的映射關系,源碼如下:

1. 其中 executorAdded的源碼如下:

org.apache.spark.scheduler.DAGScheduler#executorAdded的映射關系如下:

經過eventProcessLoop異步消息隊列后,最終被如下分支處理:

最終處理邏輯如下,即把狀態健康的executor從失敗的epoch集合中移除。

 

2. 其中,獲取host的rack信息的方法沒有實現,返回None。

更新不可用executor集合

blacklistTrackerOpt 定義如下:

org.apache.spark.scheduler.BlacklistTracker#isBlacklistEnabled 方法如下:

即 BLACKLIST_ENABLED 可以通過設置參數 spark.blacklist.enabled 來設定是否使用blacklist,默認沒有設置。如果設定了spark.scheduler.executorTaskBlacklistTime參數值大於 0 ,也啟用 blacklist。

 

BlacklistTracker 主要就是用來追蹤有問題的executor和host信息的,其類說明如下:

BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add additional blacklisting of executors and nodes for individual tasks and stages which works in concert with the blacklisting here. The tracker needs to deal with a variety of workloads, eg.:

bad user code -- this may lead to many task failures, but that should not count against individual executors

many small stages -- this may prevent a bad executor for having many failures within one stage, but still many failures over the entire application 

"flaky" executors -- they don't fail every task, but are still faulty enough to merit blacklisting See the design doc on SPARK-8425 for a more in-depth discussion. 

過濾不可用WorkOffer

過濾掉host或executor在黑名單中的WorkOffer,對應源碼如下:

對TaskSetManager排序

對應源碼如下:

首先對WorkOffer集合隨機打亂順序,然后獲取其可用core,可用slot的信息,然后獲取排序后的TaskSetManager隊列。rootPool是Pool對象,源碼在 TaskScheduler提交TaskSet 中有描述,不再贅述。

CPUS_PER_TASK的核數默認是1,即一個task使用一個core,所以在spark算子中,盡量不要使用多線程,因為就一個core,提高不了多少的性能。可以通過spark.task.cpus參數進行調節。

org.apache.spark.scheduler.Pool#getSortedTaskSetQueue 源碼如下:

 其中TaskSetManager的 getSortedTaskSetManager的源碼如下:

重新計算本地性:

org.apache.spark.scheduler.TaskSetManager#executorAdded 的源碼如下:

org.apache.spark.scheduler.TaskSetManager#computeValidLocalityLevels 源碼如下:

在這里,可以很好的理解五種數據本地性級別。先加入數據本地性數組的優先考慮使用。

為每一個TaskSet分配資源

對應源碼如下:

如果slot資源夠用或者TaskSet不是barrier的,開始為TaskSet分配資源。

org.apache.spark.scheduler.TaskSchedulerImpl#resourceOfferSingleTaskSet 源碼如下:

思路:遍歷每一個shuffledOffers,如果其可用cpu核數不小於一個slot所用的核數,則分配資源,分配資源完畢后,記錄taskId和taskSetManager的映射關系、taskId和executorId的映射關系、executorId和task的映射關系。最后可用核數減一個slot所以的cpu核數。

其依賴方法 org.apache.spark.scheduler.TaskSetManager#resourceOffer 源碼如下,思路:先檢查該executor和該executor所在的host都不在黑名單中,若在則返回None,否則開始分配資源。

分配資源步驟:

1. 計算數據本地性。

2. 每一個task出隊並構建 TaskDescription 對象。

 其依賴方法 org.apache.spark.scheduler.TaskSetManager#getAllowedLocalityLevel 源碼如下,目的就是計算該task 的允許的最大數據本地性。

初始化BarrierCoordinator

如果任務資源分配成功並且TaskSet是barrier的,則初始化BarrierCoordinator,源碼如下:

依賴方法 org.apache.spark.scheduler.TaskSchedulerImpl#maybeInitBarrierCoordinator 如下:

運行Task

org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#makeOffers中,分配資源結束后,就可以運行task了,源碼如下:

序列化TaskDescription

其依賴方法 lauchTasks 源碼如下:

org.apache.spark.scheduler.TaskDescription#encode 方法是一個序列化的操作,將內存中的Java Function對象序列化為字節數組。源碼如下:

maxRpcMessageSize定義如下:

org.apache.spark.util.RpcUtils#maxMessageSizeBytes 源碼如下:

默認為128MB,可以通過參數 spark.rpc.message.maxSize 來調整。

executorData可用核數減去一個Slot所需的核數后,去調用executor運行task。

發送RPC請求executor運行任務

對應 lauchTasks 源碼如下:

經過底層RPC的傳輸,executorEndpoint的處理代碼receive方法處理分支為:

其主要有兩步,反序列化TaskDescription字節數據為Java對象。

調用executor來運行task。

下面詳細來看每一步。

executor反序列化TaskDescription

思路:將通過RPC傳輸過來的ByteBuffer對象中的字節數據內容反序列化為在內存中的Java對象,即TaskDescription對象。

executor運行task

Executor對象是Spark Executor的抽象,它背后有一個線程池用來執行任務。其實從源碼可以看出,Spark的Executor這個術語,其實來自於Java線程池部分的Executors。

launchTasks方法源碼如下:

TaskRunner是一個Runnable的實現,worker線程池中的worker會去執行其run方法。

下面來看一下TaskRunner類。

TaskRunner

類說明

它繼承了Runnable接口,worker線程池中的worker會去執行其run方法來執行任務,其主要方法如下:

kill任務

 

運行任務

run方法比較長,划分為四部分來說明。

准備環境

對應源碼如下:

初始化環境,修改task的運行狀態為RUNNING,初始化gc時間。

准備task配置

其源碼如下:

反序列化Task對象,並且設置Task的依賴。

運行task

記錄任務開始時間,開始使用cpu時間,運行task,最后釋放內存。

其依賴方法 org.apache.spark.util.Utils#tryWithSafeFinally 源碼如下:

從源碼可以看出,第一個方法是執行的方法,第二個方法是finally方法體中需要執行的方法。即釋放內存。

處理失敗任務

源碼如下:

 

更新metrics信息

關於metrics的相關內容,不做過多介紹。源碼如下:

序列化Task執行結果

思路:將返回值序列化為ByteBuffer對象。

將結果返回給driver

org.apache.spark.executor.CoarseGrainedExecutorBackend#statusUpdate 方法如下:

經過rpc后,driver端org.apache.spark.executor.CoarseGrainedExecutorBackend 的 receive 方法如下: 

 

思路:更新task的狀態,接着在同一個executor上分配資源,執行任務。

更新task狀態

org.apache.spark.scheduler.TaskSchedulerImpl#statusUpdate 方法如下:

處理失敗任務

源碼如下,不做再深入的剖析:

處理成功任務

源碼如下:

 

其依賴方法 org.apache.spark.scheduler.TaskSchedulerImpl#handleSuccessfulTask 源碼如下:

org.apache.spark.scheduler.TaskSetManager#handleSuccessfulTask 源碼如下:

org.apache.spark.scheduler.TaskSchedulerImpl#markPartitionCompletedInAllTaskSets 源碼如下:

org.apache.spark.scheduler.TaskSetManager#markPartitionCompleted 的源碼如下:

org.apache.spark.scheduler.TaskSetManager#maybeFinishTaskSet 源碼如下:

 

通知DAGScheduler任務已完成

在org.apache.spark.scheduler.TaskSetManager#handleSuccessfulTask 源碼中,最后調用了dagScheduler的taskEnded 方法,源碼如下:

即發送事件消息給eventProcessLoop隊列做異步處理:

在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#doOnReceive 源碼中,處理該事件的分支為:

即會調用 org.apache.spark.scheduler.DAGScheduler#handleTaskCompletion,源碼中處理成功的返回值的代碼如下:

我們重點關注其返回值的處理,如果執行的是一個Action操作,則會進入第一個分支。如果執行的是shuffle操作,則會進入第二個分支。

Action作業的返回值處理

先來看第一個分支:

跟返回值有關的代碼如下:

org.apache.spark.scheduler.JobWaiter#taskSucceeded源碼如下:

思路:調用RDD定義的resultHandler方法,取出返回值,如果該 task執行完畢之后,所有task都已經執行完畢了,那么jobPromise可以標志為成功,driver就可以拿着action操作返回的值做進一步操作。

假設是collect方法,可以根據 org.apache.spark.SparkContext#submitJob 依賴方法推出resultHandler的定義,如下:

可以知道,resultHandler是在調用方法之前傳遞過來的方法參數。

我們從collect 方法正向推:

其調用的SparkContext的幾個重載的runJob方法如下:

即,上圖中標紅的就是resultHandler方法,collect方法是應用於整個RDD的分區的。

也就是說,org.apache.spark.scheduler.JobWaiter#taskSucceeded的第一個參數其實就是partition,第二個參數就是該action在RDD的該partition上計算后的返回值。

該resultHandler方法將返回值,直接賦值給result的特定分區。最終,將所有分區的數據都返回給driver。注意,現在的返回值是數組套數組的形式,即二維數組。

最終collect方法中也定義了二維數組flatten為一維數組的方法,如下:

這個方法內部是會生成一個ArrayBuilder對象的用來添加數組元素,最終構造新數組返回。這個方法是會內存溢出的,所以不建議使用這個方法獲取大量結果數據。

 

下面,我們來看第二個分支。

Shuffle作業的返回值處理

shuffle作業的返回值是 MapStatus 類型。

先來聊一下MapStatus類。

MapStatus

主要方法如下:

 

location表示 shuffle的output數據由哪個BlockManager管理着。

getSizeForBlock:獲取指定block的大小。

其繼承關系如下:

CompressedMapStatus 主要是實現了壓縮的MapStatus,即在網絡傳輸進行序列化的時候,可以對MapStatus進行壓縮。

HighlyCompressedMapStatus 主要實現了大block的存儲,以及保存了block的平均大小以及block是否為空的信息。

處理shuffle 作業返回值

我們只關注返回值的處理,org.apache.spark.scheduler.DAGScheduler#handleTaskCompletion方法中涉及值處理的源碼如下:

org.apache.spark.MapOutputTrackerMaster#registerMapOutput 的源碼如下,mapId就是partition的id:

其中,成員變量 shuffleStatuses 定義如下:

即shuffleStatuses在driver端保存了shuffleId和shuffleStatus的信息。便於后續stage可以調用 MapOutputTrackerMasterEndpoint ref 來獲取該stage返回的MapStatus信息。具體內容,我們將在下一節分析。

 

總結

本篇文章主要介紹了跟Spark內部Task運行的細節流程,關於Task的運行部分沒有具體涉及,Task按照ResultStage和ShuffleStage划分為兩種Task,ResultStage任務和ShuffleStage分別對應的Task的執行流程有本質的區別,將在下一篇文章進行更加詳細的剖析。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM