Spark基本架構及原理


  轉自:http://www.cnblogs.com/tgzhu/p/5818374.html

        Apache Spark是一個圍繞速度、易用性和復雜分析構建的大數據處理框架,最初在2009年由加州大學伯克利分校的AMPLab開發,並於2010年成為Apache的開源項目之一,與Hadoop和Storm等其他大數據和MapReduce技術相比,Spark有如下優勢:

  • Spark提供了一個全面、統一的框架用於管理各種有着不同性質(文本數據、圖表數據等)的數據集和數據源(批量數據或實時的流數據)的大數據處理的需求。
  • 官方資料介紹Spark可以將Hadoop集群中的應用在內存中的運行速度提升100倍,甚至能夠將應用在磁盤上的運行速度提升10倍。

Spark相比Hadoop MapReduce的優勢:

  • 基於MapReduce的計算引擎通常會將中間結果輸出到磁盤上,進行存儲和容錯。出於任務管道承接的考慮,當一些查詢翻譯到MapReduce任務時,往往會產生多個stage,而這些串聯的stage又依賴於底層文件系統(如HDFS)來存儲每一個stage的輸出結果。spark將執行模型抽象為通用的有向無環圖執行計划(DAG),這可以將多stage的任務串聯或者並行執行,而無需將stage中間結果輸出到HDFS中。類似的引擎包括Dryad、Tez。
  • 數據格式和內存布局,由於MapReduce Schema on Read處理方式會引起較大的處理開銷。Spark抽象出分布式內存存儲結構彈性分布式數據集RDD,進行數據的存儲。RDD能支持粗粒度寫操作,但對於讀操作,RDD可以精確到每條記錄,這使得RDD可以用來作為分布式索引。Spark的特性是能夠控制數據在不同節點上的分區,用戶可以自定義分區策略,如Hash分區等。Shark和Spark SQL在Spark的基礎上實現了列存儲和列存儲壓縮。
  • 執行策略,MapReduce在數據Shuffle之前花費了大量的時間來排序,Spark則可減輕上述問題帶來的開銷。因為Spark任務在shuffle中不是所有場景都需要排序,所以支持基於Hash的分布式聚合,調度中采用更為通用的任務執行計划圖(DAG),每一輪的輸出結果都在內存緩存。
  • 任務調度的開銷,傳統的MapReduce系統,如Hadoop,是為了運行長達數小時的批量作業而設計的,在某些極端情況下,提交一個任務的延遲非常高。Spark采用了事件驅動的類庫AKKA來啟動任務,通過線程池復用線程來避免進程或線程啟動和切換開銷。

目標:

  • 架構及生態
  • Spark與Hadoop
  • 運行流程及特點
  • 常用術語
  • standalone模式
  • yarn集群
  • RDD運行流程

架構及生態:

  • 通常當需要處理的數據量超過了單機尺寸(比如我們的計算機有4GB的內存,而我們需要處理100GB以上的數據)這時我們可以選擇spark集群進行計算,有時我們可能需要處理的數據量並不大,但是計算很復雜,需要大量的時間,這是我們也可以選擇利用spark集群強大的計算資源,並行化地計算,其架構示意圖如下:

  • Spark Core:包含Spark的基本功能;尤其是定義RDD的API、操作以及這兩者上的動作。其他Spark的庫都是構建在RDD和Spark Core之上的。
  • Spark SQL:提供通過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進行交互的API。每個數據庫表被當做一個RDD,Spark SQL查詢被轉換為Spark操作。
  • Spark Streaming:對實時數據流進行處理和控制。Spark Streaming允許程序能夠像普通RDD一樣處理實時數據
  • MLib:一個常用機器學習的算法庫,算法被實現為對RDD的Spark操作。這個庫包含可擴展的學習算法,比如分類、回歸等需要對大量數據集進行迭代的操作
  • GraphX:控制圖、並行圖操作和計算的一組算法和工具的集合。GraphX擴展了RDD API,包含控制圖、創建子圖、訪問路徑上所有頂點的操作
  • Spark架構采用了分布式計算中的Master-Slave模型,Master是對應集群中的含有Master進程的節點,Slave是集群中含有Worker進程的節點。Master作為整個集群的控制器,負責整個集群的正常運行;Worker相當於是計算節點,接收主節點命令與進行狀態匯報;Executor負責任務的執行;Client作為用戶的客戶端負責提交應用,Driver負責控制一個應用的執行,組成圖如下:

  Spark集群部署后,需要在主節點和從節點分別啟動Master進程和Worker進程,對整個集群進行控制。在一個Spark應用的執行過程中,Driver和Worker是兩個重要角色。Driver程序是應用邏輯執行的起點,負責作業的調度,即Task任務的分發,而多個Worker用來管理計算節點和創建Executor並行處理任務。在執行階段,Driver會將Task和Task所依賴的file和jar序列化后傳遞給對應的Worker機器,同時Executor對相應數據分區的任務進行處理。

  Spark的架構中的基本組件:

  • Cluster Manager:在standalone模式中即為Master主節點,控制整個集群,監控worker。在YARN模式中為資源管理器
  • Worker:從節點,負責控制計算節點,啟動Executor或者Driver。在YARN模式中為NodeManager,負責計算節點的控制。
  • Driver:運行Application的main()函數並創建SparkContext。
  • Executor:執行器,在worker node上執行任務的組件、用於啟動線程池運行任務。每個Application擁有獨立的一組Executor。
  • SparkContext:整個應用的上下文,控制應用的生命周期。
  • RDD:Spark的基礎計算單元,一組RDD可形成執行的有向無環圖RDD Graph。
  • DAG Scheduler:根據作業(task)構建基於Stage的DAG,並提交Stage給TaskScheduler。
  • TaskScheduler:將任務(task)分發給Executor執行。
  • SparkEnv:線程級別的上下文, 存儲運行時的重要組件的引用。

Spark與Hadoop:

  • Hadoop有兩個核心模塊,分布式存儲模塊HDFS和分布式計算模塊MapReduce
  • Spark本身並沒有提供分布式文件系統,因此Spark的分析大多依賴於Hadoop的分布式文件系統HDFS
  • Hadoop的MapReduce與Spark都可以進行數據計算,而相比於MapReduce,Spark的速度更快並且提供的功能更加豐富
  • 關系如下圖:

 

運行流程及特點:

  • spark運行流程圖如下:

  1. 構建Spark Application的運行環境,啟動SparkContext
  2. SparkContext向資源管理器(可以是Standalone,Mesos,Yarn)申請運行Executor資源,並啟動StandaloneExecutorbackend
  3. Executor向SparkContext申請Task
  4. SparkContext將應用程序分發給Executor
  5. SparkContext構建成DAG圖,將DAG圖分解成Stage、將Taskset發送給Task Scheduler,最后由Task Scheduler將Task發送給Executor運行
  6. Task在Executor上運行,運行完釋放所有資源

Spark運行特點:

  1. 每個Application獲取專屬的executor進程,該進程在Application期間一直駐留,並以多線程方式運行Task。這種Application隔離機制是有優勢的,無論是從調度角度看(每個Driver調度它自己的任務),還是從運行角度看(來自不同Application的Task運行在不同JVM中),當然這樣意味着Spark Application不能跨應用程序共享數據,除非將數據寫入外部存儲系統
  2. Spark與資源管理器無關,只要能夠獲取Executor進程,並能保持互相通信就可以了
  3. 提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack里,因為Spark Application運行過程中SparkContext和Executor之間有大量的信息互換
  4. Task采用了數據本地性和推測執行的優化機制

常用術語:

  • Application:Application都是指用戶編寫的Spark應用程序,其中包括一個Driver功能的代碼和分布在集群中多個節點上運行的Executor代碼
  • Driver:Spark中的Driver即運行上述Application的main函數並創建SparkContext,創建SparkContext的目的是為了准備Spark應用程序的運行環境,在Spark中有SparkContext負責與ClusterManager通信,進行資源申請、任務的分配和監控等,當Executor部分運行完畢后,Driver同時負責將SparkContext關閉,通常用SparkContext代表Driver
  • Executor:某個Application運行在worker節點上的一個進程,該進程負責運行某些Task,並且負責將數據存到內存或磁盤上,每個Application都有各自獨立的一批Executor,在Spark on Yarn模式下,其進程名稱為CoarseGrainedExecutor Backend。一個CoarseGrainedExecutor Backend有且僅有一個Executor對象,負責將Task包裝成taskRunner,並從線程池中抽取一個空閑線程運行Task,這個每一個CoarseGrainedExecutor Backend能並行運行Task的數量取決於分配給它的cup個數
  • Cluster Manager:指的是在集群上獲取資源的外部服務。目前有三種類型
  1.  standalone:spaark原生的資源管理,由Master負責資源的分配
  2. Apache Mesos:與hadoop MR兼容性良好的一種資源調度框架
  3. Hadoop Yarn:主要指Yarn中的ResourceManager
  • Worker:集群中任何可以運行Application代碼的節點,在Standalone模式中指的是通過slave文件配置的Worker節點,在Spark on Yarn模式下就是NoteManager節點
  • Task:被送到某個Executor上的工作單元,但HadoopMR中的MapTask和ReduceTask概念一樣,是運行Application的基本單位,多個Task組成一個Stage,而Task的調度和管理等是由TaskScheduler負責
  • Job:包含多個Task組成的並行計算,往往由Spark Action觸發生成,一個Application中往往會產生多個Job
  • Stage:每個Job會被拆分成多組Task,作為一個TaskSet,其名稱為Stage,Stage的划分和調度是有DAGScheduler來負責的,Stage有非最終的Stage(Shuffle Map Stage)和最終的Stage(Result Stage)兩種,Stage的邊界就是發生Shuffle的地方
  • DAGScheduler:根據Job構建基於Stage的DAG(Directed Acyclic Graph有向無環圖),並提交Stage給TASKScheduler。其划分Stage的根據是RDD之間的依賴的關系找出開銷最小的調度方法,如下圖

           

  • TASKScheduler:將TaskSET提交給Worker運行,每個Executor運行什么Task就是在此處分配的,TaskScheduler維護所有TaskSet,當Executor向Driver發生心跳時,TaskScheduler會根據資源剩余情況分配相應的Task。另外TaskScheduler還維護着所有Task的運行標簽,重試失敗的Task,下圖展示了TaskScheduler的作用

  • 在不同運行模式中任務調度器具體為:
  1. Spark on Standalone模式為TaskScheduler
  2. YARN-Client模式為YarnClientChusterScheduler
  3. YARN-Chuster模式為YarnClusterScheduler
  • 將這些術語串起來的運行層次圖如下:

  • Job=多個Stage,Stage-多個同種task,Task分為ShuffleMapTask和ResultTask,Dependency分為ShuffleDependency和NarrowDependency

 

Spark運行模式

  • Spark的運行模式多種多樣,靈活多變,部署在單機上時,既可以用本地模式運行,也可以用偽分布模式運行,而當以分布式集群方式部署時,也有眾多的運行模式可供選擇,這取決於集群的實際情況,底層的資源調度即可以依賴外部資源調度框架,也可以使用Spark內建的Standalone模式。
  • 對於外部資源調度框架的支持,目前的實現包括相對穩定的Mesos模式,以及hadoop YARN模式。
  • 本地模式:常用於本地開發測試,本地還分為local和local cluster

 

standalone:獨立集群運行模式

  • Standalone模式使用Spark自帶的資源調度框架
  • 采用Master/Slaver的典型架構,選用ZooKeeper來實現Master的HA

  • 該模式主要的節點有Client節點、Master節點和Worker節點。其中Driver既可以運行在Master節點上,也可以運行在本地Client端。當用spark-shell交互工具提交Spark的Job時,Driver在Master節點上運行;當使用spark-submit工具提交Job或者在Eclip、IDEA等開發平台上使用“new SparkConf.setManager("spark://master:7077")”方式運行Spark任務時,Driver是運行在本地Client端上的。
  • 運行過程如下圖:(參考至:http://blog.csdn.net/gamer_gyt/article/details/51833681

  1. SparkContext連接到Master,向Master注冊並申請資源(CPU Core和Memory)
  2. Master根據SparkContext的資源申請要求和Worker心跳周期內報告的信息決定在哪個Worker上分配資源,然后在該Worker上獲取資源,然后啟動StandaloneExecutorBackend;
  3. StandaloneExecutorBackend向SparkContext注冊;
  4. SparkContext將Application代碼發送給StandaloneExecutorBackend;並且SaprkContext解析Application代碼,構建DAG圖,並提交給DAG Scheduler分解成Stage(當碰到Action操作時,就會催生Job;每個Job中含有1個或多個Stage,Stage一般在獲取外部數據和shuffle之間產生),然后以Stage(或者稱為TaskSet)提交給Task Scheduler,Task Scheduler負責將Task分配到相應的Worker,最后提交給StandaloneExecutorBackend執行;
  5. StandaloneExecutorBackend會建立Executor線程池,開始執行Task,並向SparkContext報告,直至Task完成
  6. 所有Task完成后,SparkContext向Maxter注銷,釋放資源

 

yarn:(參考:http://blog.csdn.net/gamer_gyt/article/details/51833681

  •  Spark on YARN模式根據Driver在集群中的位置分為兩種模式:一種是YARN-Client模式,另一種是YARN-Cluster(或稱為YARN-Standalone模式)
  • Yarn-Client模式中,Driver在客戶端本地運行,這種模式可以使得Spark Application和客戶端進行交互,因為Driver在客戶端,所有可以通過webUI訪問Driver的狀態,默認是http://hadoop1:4040訪問,而YARN通過http://hadoop:8088訪問
  • YARN-client的工作流程步驟為:

 

  • Spark Yarn Client向YARN的ResourceManager申請啟動Application Master。同時在SparkContext初始化中將創建DAGScheduler和TASKScheduler等,由於我們選擇的是Yarn-Client模式,程序會選擇YarnClientClusterScheduler和YarnClientSchedulerBackend
  • ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的Application,與YARN-Cluster區別的是在該Application不運行SparkContext,只與SparkContext進行聯系進行資源的分派
  • Client中的SparkContext初始化完畢后,與ApplicationMaster建立通訊,向ResourceManager注冊,根據任務信息向ResourceManager申請資源(Container)
  • 一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向Client中的SparkContext注冊並申請Task
  • client中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向Driver匯報運行的狀態和速度,以讓Client隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務
  • 應用程序運行完成后,Client的SparkContext向ResourceManager申請注銷並關閉自己

Spark Cluster模式:

  • 在YARN-Cluster模式中,當用戶向YARN中提交一個應用程序后,YARN將兩個階段運行應用程序:
  1. 第一個階段是把Spark的Driver作為一個ApplicationMaster在YARN集群中先啟動;
  2. 第二個階段是由ApplicationMaster創建應用程序,然后為它向ResourceManager申請資源,並啟動Executor來運行Task,同時監控它的整個運行流程,直到運行完成
  • YARN-cluster的工作流程分為以下幾個步驟

  • Spark Yarn Client向YARN中提交應用程序,包括ApplicationMaster程序,啟動ApplicationMaster的命令、需要在Executor中運行的程序等
  • ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的ApplicationMaster,其中ApplicationMaster進行SparkContext等的初始化
  • ApplicationMaster向ResourceManager注冊,這樣用戶可以直接通過ResourceManager查看應用程序的運行狀態,然后它將采用輪詢的方式通過RPC協議為各個任務申請資源,並監控它們的運行狀態直到運行結束
  • 一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向ApplicationMaster中的SparkContext注冊並申請Task。這一點和Standalone模式一樣,只不過SparkContext在Spark Application中初始化時,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler進行過任務的調度,其中YarnClusterScheduler只是對TaskSchedulerImpl的一個簡單包裝,增加了對Executor的等待邏輯等
  • ApplicationMaster中的SparkContext分配Task給CoarseGrainedExecutorbackend執行,CoarseGrainedExecutorBackend運行Task並向ApplicationMaster匯報運行的狀態和速度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務
  • 應用程序運行完成好,ApplicationMaster向ResourceManager申請注銷並關閉自己

Spark Client和Spark Cluster的區別

  •  理解YARN-Client和YARN-Cluster深層次的區別之前先清楚一個概念:Application Master。在YARN中,每個Application示例都有一個ApplicationMaster進程,它是Application啟動的第一個容器,它負責和ResourceManager打交道並請求資源,獲取資源之后告訴NodeManager為其啟動Container。從深層次的含義將YARN-Cluster和YARN-Client模式的區別其實就是ApplicationMaster進程的區別。
  • YARN-Cluster模式下,Driver運行在AM(Application Master)中,它負責向YARN申請資源,並監督作業的運行情況。當用戶提交了作業之后,就可以關掉Client,作業會繼續在YARN上運行,因而YARN-Cluster模式不適合交互類型的作業。
  • YARN-Client模式下,Application Master僅僅向YARN請求Executor,Client會和請求的Container通信來調度它們工作,也就是說Client不能離開

 

RDD運行流程

  •  RDD在Spark中運行大概分為以下三步:
  1. 創建RDD對象
  2. DAGScheduler模塊介入運算,計算RDD之間的依賴關系,RDD之間的依賴關系就形成了DAG
  3. 每一個Job被分為多個Stage。划分Stage的一個主要依據是當前計算因子的輸入是否是確定的,如果是則將其分在同一個Stage,避免多個Stage之間的消息傳遞開銷

示例圖如下:

  • 以下面一個按A-Z首字母分類,查找相同首字母下不同姓名總個數的例子來看一下RDD是如何運行起來的

  • 創建RDD上面的例子除去最后一個collect是個動作,不會創建RDD之外,前面四個轉換都會創建新的RDD。因此第一步就是創建好所有RDD(內部的五項信息)?
  • 創建執行計划Spark會盡可能的管道化,並基於是否要重新組織數據來划分階段(stage),例如本例中的groupBy()轉換就會將整個執行計划分成兩個階段執行。最終會產生一個DAG(directed Acyclic graph,有向無環圖)作為邏輯執行計划

  • 調度任務   將各個階段分成不同的任務(task),每個任務都是數據和計算的合體。在進行下一階段前,當前階段的所有任務都要執行完成。因為下一階段的第一個轉換一定是重新組織數據的,所有必須等當前階段所有結果數據都計算出來了才能繼續。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM