Spark基本架構及原理


 Hadoop 和 Spark 的關系

Spark 運算比 Hadoop 的 MapReduce 框架快的原因是因為 Hadoop 在一次 MapReduce 運算之后,會將數據的運算結果從內存寫入到磁盤中,第二次 Mapredue 運算時在從磁盤中讀取數據,所以其瓶頸在2次運算間的多余 IO 消耗. Spark 則是將數據一直緩存在內存中,直到計算得到最后的結果,再將結果寫入到磁盤,所以多次運算的情況下, Spark 是比較快的. 其優化了迭代式工作負載

Hadoop的局限 Spark的改進
    • 抽象層次低,代碼編寫難以上手
    • 通過使用RDD的統一抽象,實現數據處理邏輯的代碼非常簡潔
    • 只提供了Map和Reduce兩個操作,欠缺表達力
    • 通過RDD提供了很多轉換和動作,實現了很多基本操作,如Sort, Join等
    • 一個Job只有Map和Reduce兩個階段,復雜的程序需要大量的Job來完成,且Job之間的依賴關系需要開發者自行管理
    • 一個Job可以包含RDD的多個轉換操作,在調度時可以生成多個階段(Stage),而且如果多個map操作的RDD的分區不變,是可以放在同一個Task中進行
    • 處理邏輯隱藏在代碼細節中,缺乏整體邏輯視圖
    • RDD的轉換支持流式API,提供處理邏輯的整體視圖
    • 對迭代式數據處理性能比較差,Reduce與下一步Map之間的中間結果只能存放在HDFS中
    •  通過內存緩存數據,可大大提高迭代式計算的性能,內存不足時可以溢出到本地磁盤,而不是HDFS
 
    • ReduceTask需要等待所有MapTask都完成后才可以開始
 
    • 分區相同的轉換構成流水線放在一個Task中運行,分區不同的轉換需要Shuffle,被划分到不同的Stage中,需要等待前面的Stage完成后才可以開始
    •  時延高,只適用Batch數據處理,對於交互式數據處理和實時數據處理的支持不夠
    •  通過將流拆成小的batch提供Discretized Stream處理流數據

 

Spark 的主要特點還包括:

    • (1)提供 Cache 機制來支持需要反復迭代計算或者多次數據共享,減少數據讀取的 IO 開銷;
    • (2)提供了一套支持 DAG 圖的分布式並行計算的編程框架,減少多次計算之間中間結果寫到 Hdfs 的開銷;
    • (3)使用多線程池模型減少 Task 啟動開稍, shuffle 過程中避免不必要的 sort 操作並減少磁盤 IO 操作。(Hadoop 的 Map 和 reduce 之間的 shuffle 需要 sort)

Spark 系統架構

明確相關術語

 

  • Application: Appliction都是指用戶編寫的Spark應用程序,其中包括一個Driver功能的代碼和分布在集群中多個節點上運行的Executor代碼
  • Driver:  Spark中的Driver即運行上述Application的main函數創建SparkContext,創建SparkContext的目的是為了准備Spark應用程序的運行環境,在Spark中有SparkContext負責與ClusterManager通信進行資源申請、任務的分配和監控等,當Executor部分運行完畢后,Driver同時負責將SparkContext關閉,通常用SparkContext代表Driver
  • Executor:  某個Application運行在worker節點上的一個進程,  該進程負責運行某些Task, 並且負責將數據存到內存或磁盤上,每個Application都有各自獨立的一批Executor, 在Spark on Yarn模式下,其進程名稱為CoarseGrainedExecutor Backend。一個CoarseGrainedExecutor Backend有且僅有一個Executor對象, 負責將Task包裝成taskRunner,並從線程池中抽取一個空閑線程運行Task, 這個每一個oarseGrainedExecutor Backend能並行運行Task的數量取決與分配給它的cpu個數
  • Cluter Manager:指的是在集群上獲取資源的外部服務。目前有三種類型

 

    1.  Standalon : spark原生的資源管理,由Master負責資源的分配
    2.  Apache Mesos:與hadoop MR兼容性良好的一種資源調度框架
    3.  Hadoop Yarn: 主要是指Yarn中的ResourceManager

 

  • Worker: 集群中任何可以運行Application代碼的節點,在Standalone模式中指的是通過slave文件配置的Worker節點,在Spark on Yarn模式下就是NoteManager節點
  • Task: 被送到某個Executor上的工作單元,但hadoopMR中的MapTask和ReduceTask概念一樣,是運行Application的基本單位多個Task組成一個Stage,而Task的調度和管理等是由TaskScheduler負責
  • Job: 包含多個Task組成的並行計算,往往由Spark Action觸發生成, 一個Application中往往會產生多個Job
  • Stage: 每個Job會被拆分成多組Task, 作為一個TaskSet, 其名稱為Stage,Stage的划分和調度是有DAGScheduler來負責的,Stage有非最終的Stage(Shuffle Map Stage)和最終的Stage(Result Stage)兩種,Stage的邊界就是發生shuffle的地方
  • DAGScheduler: 根據Job構建基於Stage的DAG(Directed Acyclic Graph有向無環圖),並提交Stage給TASkScheduler。 其划分Stage的依據是RDD之間的依賴的關系找出開銷最小的調度方法,如下圖
  • TASKSedulter: 將TaskSET提交給worker運行,每個Executor運行什么Task就是在此處分配的. TaskScheduler維護所有TaskSet,當Executor向Driver發生心跳時,TaskScheduler會根據資源剩余情況分配相應的Task。另外TaskScheduler還維護着所有Task的運行標簽,重試失敗的Task。下圖展示了TaskScheduler的作用
  • 在不同運行模式中任務調度器具體為:

 

    1.   Spark on Standalone模式為TaskScheduler
    2.   YARN-Client模式為YarnClientClusterScheduler
    3.   YARN-Cluster模式為YarnClusterScheduler

 

  • 將這些術語串起來的運行層次圖如下:
  • Job=多個stage,Stage=多個同種task, Task分為ShuffleMapTask和ResultTask,Dependency分為ShuffleDependency和NarrowDependency

 

  整個 Spark 集群中,分為 Master 節點與 worker 節點,,其中 Master 節點負責將串行任務變成可並行執行的任務集Tasks, 同時還負責出錯問題處理等,而 Worker 節點負責執行任務
  Driver 的功能是創建 SparkContext, 負責執行用戶寫的 Application 的 main 函數進程,Application 就是用戶寫的程序. 
  不同的模式可能會將 Driver 調度到不同的節點上執行.集群管理模式里, local 一般用於本地調試. 
  每個 Worker 上存在一個或多個 Executor 進程,該對象擁有一個線程池,每個線程負責一個 Task 任務的執行.根據 Executor 上 CPU-core 的數量,其每個時間可以並行多個 跟 core 一樣數量的 Task.Task 任務即為具體執行的 Spark 程序的任務. 

  • spark運行流程圖如下:
  1. 構建Spark Application的運行環境,啟動SparkContext
  2. SparkContext向資源管理器(可以是Standalone,Mesos,Yarn)申請運行Executor資源,並啟動StandaloneExecutorbackend,
  3. Executor向SparkContext申請Task
  4. SparkContext將應用程序分發給Executor
  5. SparkContext構建成DAG圖,將DAG圖分解成Stage、將Taskset發送給Task Scheduler,最后由Task Scheduler將Task發送給Executor運行
  6. Task在Executor上運行,運行完釋放所有資源

     Spark運行特點:

  1. 每個Application獲取專屬的executor進程,該進程在Application期間一直駐留,並以多線程方式運行Task。這種Application隔離機制是有優勢的,無論是從調度角度看(每個Driver調度他自己的任務),還是從運行角度看(來自不同Application的Task運行在不同JVM中),當然這樣意味着Spark Application不能跨應用程序共享數據,除非將數據寫入外部存儲系統
  2. Spark與資源管理器無關,只要能夠獲取executor進程,並能保持相互通信就可以了
  3. 提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack里,因為Spark Application運行過程中SparkContext和Executor之間有大量的信息交換
  4. Task采用了數據本地性和推測執行的優化機制

Spark作業基本運行原理

 

 

詳細原理見上圖。

  我們使用spark-submit提交一個Spark作業之后,這個作業就會啟動一個對應的Driver進程。根據你使用的部署模式(deploy-mode)不同,Driver進程可能在本地啟動,也可能在集群中某個工作節點上啟動。Driver進程本身會根據我們設置的參數,占有一定數量的內存和CPU core。而Driver進程要做的第一件事情,就是向集群管理器(YARN或者其他資源管理集群)申請運行Spark作業需要使用的資源,這里的資源指的就是Executor進程。YARN集群管理器會根據我們為Spark作業設置的資源參數,在各個工作節點上,啟動一定數量的Executor進程,每個Executor進程都占有一定數量的內存和CPU core。

  在申請到了作業執行所需的資源之后,Driver進程就會開始調度和執行我們編寫的作業代碼了。Driver進程會將我們編寫的Spark作業代碼分拆為多個stage,每個stage執行一部分代碼片段,並為每個stage創建一批task,然后將這些task分配到各個Executor進程中執行。task是最小的計算單元,負責執行一模一樣的計算邏輯(也就是我們自己編寫的某個代碼片段),只是每個task處理的數據不同而已。一個stage的所有task都執行完畢之后,會在各個節點本地的磁盤文件中寫入計算中間結果,然后Driver就會調度運行下一個stage。下一個stage的task的輸入數據就是上一個stage輸出的中間結果。如此循環往復,直到將我們自己編寫的代碼邏輯全部執行完,並且計算完所有的數據,得到我們想要的結果為止。

  Spark是根據shuffle類算子來進行stage的划分。如果我們的代碼中執行了某個shuffle類算子(比如reduceByKey、join等),那么就會在該算子處,划分出一個stage界限來。可以大致理解為,shuffle算子執行之前的代碼會被划分為一個stage,shuffle算子執行以及之后的代碼會被划分為下一個stage。因此一個stage剛開始執行的時候,它的每個task可能都會從上一個stage的task所在的節點,去通過網絡傳輸拉取需要自己處理的所有key,然后對拉取到的所有相同的key使用我們自己編寫的算子函數執行聚合操作(比如reduceByKey()算子接收的函數)。這個過程就是shuffle。

  當我們在代碼中執行了cache/persist等持久化操作時,根據我們選擇的持久化級別的不同,每個task計算出來的數據也會保存到Executor進程的內存或者所在節點的磁盤文件中。

  因此Executor的內存主要分為三塊:第一塊是讓task執行我們自己編寫的代碼時使用,默認是占Executor總內存的20%;第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進行聚合等操作時使用,默認也是占Executor總內存的20%;第三塊是讓RDD持久化時使用,默認占Executor總內存的60%。

  task的執行速度是跟每個Executor進程的CPU core數量有直接關系的。一個CPU core同一時間只能執行一個線程。而每個Executor進程上分配到的多個task,都是以每個task一條線程的方式,多線程並發運行的。如果CPU core數量比較充足,而且分配到的task數量比較合理,那么通常來說,可以比較快速和高效地執行完這些task線程。

以上就是Spark作業的基本運行原理的說明

Refer

Spark(一): 基本架構及原理

Spark 學習: spark 原理簡述與 shuffle 過程介紹


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM