Hadoop 和 Spark 的關系
Spark 運算比 Hadoop 的 MapReduce 框架快的原因是因為 Hadoop 在一次 MapReduce 運算之后,會將數據的運算結果從內存寫入到磁盤中,第二次 Mapredue 運算時在從磁盤中讀取數據,所以其瓶頸在2次運算間的多余 IO 消耗. Spark 則是將數據一直緩存在內存中,直到計算得到最后的結果,再將結果寫入到磁盤,所以多次運算的情況下, Spark 是比較快的. 其優化了迭代式工作負載
Hadoop的局限 | Spark的改進 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Spark 的主要特點還包括:
-
- (1)提供 Cache 機制來支持需要反復迭代計算或者多次數據共享,減少數據讀取的 IO 開銷;
- (2)提供了一套支持 DAG 圖的分布式並行計算的編程框架,減少多次計算之間中間結果寫到 Hdfs 的開銷;
- (3)使用多線程池模型減少 Task 啟動開稍, shuffle 過程中避免不必要的 sort 操作並減少磁盤 IO 操作。(Hadoop 的 Map 和 reduce 之間的 shuffle 需要 sort)
Spark 系統架構
明確相關術語
- Application: Appliction都是指用戶編寫的Spark應用程序,其中包括一個Driver功能的代碼和分布在集群中多個節點上運行的Executor代碼
- Driver: Spark中的Driver即運行上述Application的main函數並創建SparkContext,創建SparkContext的目的是為了准備Spark應用程序的運行環境,在Spark中有SparkContext負責與ClusterManager通信,進行資源申請、任務的分配和監控等,當Executor部分運行完畢后,Driver同時負責將SparkContext關閉,通常用SparkContext代表Driver
- Executor: 某個Application運行在worker節點上的一個進程, 該進程負責運行某些Task, 並且負責將數據存到內存或磁盤上,每個Application都有各自獨立的一批Executor, 在Spark on Yarn模式下,其進程名稱為CoarseGrainedExecutor Backend。一個CoarseGrainedExecutor Backend有且僅有一個Executor對象, 負責將Task包裝成taskRunner,並從線程池中抽取一個空閑線程運行Task, 這個每一個oarseGrainedExecutor Backend能並行運行Task的數量取決與分配給它的cpu個數
- Cluter Manager:指的是在集群上獲取資源的外部服務。目前有三種類型
-
- Standalon : spark原生的資源管理,由Master負責資源的分配
- Apache Mesos:與hadoop MR兼容性良好的一種資源調度框架
- Hadoop Yarn: 主要是指Yarn中的ResourceManager
- Worker: 集群中任何可以運行Application代碼的節點,在Standalone模式中指的是通過slave文件配置的Worker節點,在Spark on Yarn模式下就是NoteManager節點
- Task: 被送到某個Executor上的工作單元,但hadoopMR中的MapTask和ReduceTask概念一樣,是運行Application的基本單位,多個Task組成一個Stage,而Task的調度和管理等是由TaskScheduler負責
- Job: 包含多個Task組成的並行計算,往往由Spark Action觸發生成, 一個Application中往往會產生多個Job
- Stage: 每個Job會被拆分成多組Task, 作為一個TaskSet, 其名稱為Stage,Stage的划分和調度是有DAGScheduler來負責的,Stage有非最終的Stage(Shuffle Map Stage)和最終的Stage(Result Stage)兩種,Stage的邊界就是發生shuffle的地方
- DAGScheduler: 根據Job構建基於Stage的DAG(Directed Acyclic Graph有向無環圖),並提交Stage給TASkScheduler。 其划分Stage的依據是RDD之間的依賴的關系找出開銷最小的調度方法,如下圖
- TASKSedulter: 將TaskSET提交給worker運行,每個Executor運行什么Task就是在此處分配的. TaskScheduler維護所有TaskSet,當Executor向Driver發生心跳時,TaskScheduler會根據資源剩余情況分配相應的Task。另外TaskScheduler還維護着所有Task的運行標簽,重試失敗的Task。下圖展示了TaskScheduler的作用
- 在不同運行模式中任務調度器具體為:
-
- Spark on Standalone模式為TaskScheduler
- YARN-Client模式為YarnClientClusterScheduler
- YARN-Cluster模式為YarnClusterScheduler
- 將這些術語串起來的運行層次圖如下:
- Job=多個stage,Stage=多個同種task, Task分為ShuffleMapTask和ResultTask,Dependency分為ShuffleDependency和NarrowDependency
整個 Spark 集群中,分為 Master 節點與 worker 節點,,其中 Master 節點負責將串行任務變成可並行執行的任務集Tasks, 同時還負責出錯問題處理等,而 Worker 節點負責執行任務.
Driver 的功能是創建 SparkContext, 負責執行用戶寫的 Application 的 main 函數進程,Application 就是用戶寫的程序.
不同的模式可能會將 Driver 調度到不同的節點上執行.集群管理模式里, local 一般用於本地調試.
每個 Worker 上存在一個或多個 Executor 進程,該對象擁有一個線程池,每個線程負責一個 Task 任務的執行.根據 Executor 上 CPU-core 的數量,其每個時間可以並行多個 跟 core 一樣數量的 Task.Task 任務即為具體執行的 Spark 程序的任務.
- spark運行流程圖如下:
- 構建Spark Application的運行環境,啟動SparkContext
- SparkContext向資源管理器(可以是Standalone,Mesos,Yarn)申請運行Executor資源,並啟動StandaloneExecutorbackend,
- Executor向SparkContext申請Task
- SparkContext將應用程序分發給Executor
- SparkContext構建成DAG圖,將DAG圖分解成Stage、將Taskset發送給Task Scheduler,最后由Task Scheduler將Task發送給Executor運行
- Task在Executor上運行,運行完釋放所有資源
Spark運行特點:
- 每個Application獲取專屬的executor進程,該進程在Application期間一直駐留,並以多線程方式運行Task。這種Application隔離機制是有優勢的,無論是從調度角度看(每個Driver調度他自己的任務),還是從運行角度看(來自不同Application的Task運行在不同JVM中),當然這樣意味着Spark Application不能跨應用程序共享數據,除非將數據寫入外部存儲系統
- Spark與資源管理器無關,只要能夠獲取executor進程,並能保持相互通信就可以了
- 提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack里,因為Spark Application運行過程中SparkContext和Executor之間有大量的信息交換
- Task采用了數據本地性和推測執行的優化機制
Spark作業基本運行原理
詳細原理見上圖。
我們使用spark-submit提交一個Spark作業之后,這個作業就會啟動一個對應的Driver進程。根據你使用的部署模式(deploy-mode)不同,Driver進程可能在本地啟動,也可能在集群中某個工作節點上啟動。Driver進程本身會根據我們設置的參數,占有一定數量的內存和CPU core。而Driver進程要做的第一件事情,就是向集群管理器(YARN或者其他資源管理集群)申請運行Spark作業需要使用的資源,這里的資源指的就是Executor進程。YARN集群管理器會根據我們為Spark作業設置的資源參數,在各個工作節點上,啟動一定數量的Executor進程,每個Executor進程都占有一定數量的內存和CPU core。
在申請到了作業執行所需的資源之后,Driver進程就會開始調度和執行我們編寫的作業代碼了。Driver進程會將我們編寫的Spark作業代碼分拆為多個stage,每個stage執行一部分代碼片段,並為每個stage創建一批task,然后將這些task分配到各個Executor進程中執行。task是最小的計算單元,負責執行一模一樣的計算邏輯(也就是我們自己編寫的某個代碼片段),只是每個task處理的數據不同而已。一個stage的所有task都執行完畢之后,會在各個節點本地的磁盤文件中寫入計算中間結果,然后Driver就會調度運行下一個stage。下一個stage的task的輸入數據就是上一個stage輸出的中間結果。如此循環往復,直到將我們自己編寫的代碼邏輯全部執行完,並且計算完所有的數據,得到我們想要的結果為止。
Spark是根據shuffle類算子來進行stage的划分。如果我們的代碼中執行了某個shuffle類算子(比如reduceByKey、join等),那么就會在該算子處,划分出一個stage界限來。可以大致理解為,shuffle算子執行之前的代碼會被划分為一個stage,shuffle算子執行以及之后的代碼會被划分為下一個stage。因此一個stage剛開始執行的時候,它的每個task可能都會從上一個stage的task所在的節點,去通過網絡傳輸拉取需要自己處理的所有key,然后對拉取到的所有相同的key使用我們自己編寫的算子函數執行聚合操作(比如reduceByKey()算子接收的函數)。這個過程就是shuffle。
當我們在代碼中執行了cache/persist等持久化操作時,根據我們選擇的持久化級別的不同,每個task計算出來的數據也會保存到Executor進程的內存或者所在節點的磁盤文件中。
因此Executor的內存主要分為三塊:第一塊是讓task執行我們自己編寫的代碼時使用,默認是占Executor總內存的20%;第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進行聚合等操作時使用,默認也是占Executor總內存的20%;第三塊是讓RDD持久化時使用,默認占Executor總內存的60%。
task的執行速度是跟每個Executor進程的CPU core數量有直接關系的。一個CPU core同一時間只能執行一個線程。而每個Executor進程上分配到的多個task,都是以每個task一條線程的方式,多線程並發運行的。如果CPU core數量比較充足,而且分配到的task數量比較合理,那么通常來說,可以比較快速和高效地執行完這些task線程。
以上就是Spark作業的基本運行原理的說明