Spark運行架構


一 Spark與hadoop

Hadoop有兩個核心模塊,分布式存儲模塊HDFS和分布式計算模塊Mapreduce

Spark 支持多種編程語言,包括 Java、Python、R 和 Scala,同時 Spark 也支持 Hadoop 的底層存儲系統 HDFS,但 Spark 不依賴 Hadoop。

Hadoop的Mapreduce與spark都可以進行數據計算,而相比於Mapreduce,spark的速度更快並且提供的功能更加豐富

關系圖如下:

Hadoop、Hive、Spark 之間是什么關系、什么是Yarn

二 spark生態組成

通常當需要處理的數據量超過了單機尺度(比如我們的計算機有4GB的內存,而我們需要處理100GB以上的數據)這時我們可以選擇spark集群進行計算,有時我們可能需要處理的數據量並不大,但是計算很復雜,需要大量的時間,這時我們也可以選擇利用spark集群強大的計算資源,並行化地計算,

Spark 除了 Spark Core 外,還有其它由多個組件組成,目前主要有四個組件:Spark SQL、Spark Streaming、MLlib、GraphX。這四個組件加上 Spark Core 組成了 Spark 的生態。通常,我們在編寫一個 Spark 應用程序,需要用到 Spark
Core 和其余 4 個組件中的至少一個。Spark 的整體構架圖如下圖所示:

Spark SQL:提供通過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進行交互的API。每個數據庫表被當做一個RDD(彈性分布式數據集resilient distributed dataset),Spark SQL查詢被轉換為Spark操作。

Spark Streaming:這個模塊主要是對流數據的處理,支持流數據的可伸縮和容錯處理,Spark Streaming允許程序能夠像普通RDD一樣處理實時數據,可以與 Flume(針對數據日志進行優化的一個系統)和 Kafka(針對分布式消息傳遞進行優化的流處理平台)等已建立的數據源集成。Spark Streaming 的實現,也使用 RDD 抽象的概念,使得在為流數據(如批量歷史日志數據)編寫應用程序時,能夠更靈活,也更容易實現。

MLlib:主要用於機器學習領域,它實現了一系列常用的機器學習和統計算法,如分類、回歸、聚類、主成分分析等算法。

GraphX:這個模塊主要支持數據圖的分析和計算,並支持圖形處理的 Pregel API 版本。GraphX 包含了許多被廣泛理解的圖形算法,如 PageRank。

Spark Core:是 Spark 的核心,主要負責任務調度等管理功能。Spark
Core 的實現依賴於 RDDs(Resilient Distributed Datasets,
彈性分布式數據集)的程序抽象概念。

Spark 底層還支持多種數據源,能夠從其它文件系統讀取數據,如 HDFS、Amazon S3、Hypertable、HBase 等。Spark 對這些文件系統的支持,同時也豐富了整個 Spark 生態的運行環境。

 

spark架構

Cluster Manager:在standalone模式中即為Master主節點,控制整個集群,監控worker。在YARN模式中為資源管理器

Worker節點:從節點,負責控制計算節點,啟動Executor或者Driver。

Driver: 運行Application 的main()函數

Executor:執行器,是為某個Application運行在worker node上的一個進程

三 Spark的架構詳解

 

 

1、Application:Spark Application的概念和Hadoop MapReduce中的類似,指的是用戶編寫的Spark應用程序,包含了一個Driver 功能的代碼和分布在集群中多個節點上運行的Executor代碼;

 

2、Driver:負責運行上述Application的main()函數並且創建SparkContext,其中創建SparkContext的目的是為了准備Spark應用程序的運行環境。在Spark中由SparkContext負責和ClusterManager通信,進行資源的申請、任務的分配和監控等;當Executor部分運行完畢后,Driver負責將SparkContext關閉。通常用SparkContext代表Drive;

 

3、Executor:執行器,是為了執行Application而運行在Worker 節點上的一個進程,該進程負責運行Task,並且負責將數據存在內存或者磁盤上,每個Application都有各自獨立的一批Executor。在Spark on Yarn模式下,其進程名稱為CoarseGrainedExecutorBackend,類似於Hadoop MapReduce中的YarnChild。一個CoarseGrainedExecutorBackend進程有且僅有一個executor對象,它負責將Task包裝成taskRunner,並從線程池中抽取出一個空閑線程運行Task。每個CoarseGrainedExecutorBackend能並行運行Task的數量就取決於分配給它的CPU的個數了;

 

4、Cluster Manager:指的是在集群上獲取資源的外部服務,目前有:

Ø  Standalone:Spark原生的資源管理,由Master負責資源的分配,控制整個集群,監控worker;

Ø  Hadoop Yarn(目前最流行):由YARN中的ResourceManager負責資源的分配;

大數據生態中有很多亂七八糟的工具,都在同一個集群上運轉,大家需要互相尊重有序工作。所以另外一個重要組件是,調度系統:現在最流行的是Yarn。你可以把他看作中央管理,好比你媽在廚房監工,哎,你妹妹切菜切完了,你可以把刀拿去殺雞了。只要大家都服從你媽分配,那大家都能愉快滴燒菜。

你可以認為,大數據生態圈就是一個廚房工具生態圈。為了做不同的菜,中國菜,日本菜,法國菜,你需要各種不同的工具。而且客人的需求正在復雜化,你的廚具不斷被發明,也沒有一個萬用的廚具可以處理所有情況,因此它會變的越來越復雜。
Yarn介紹

 

5、Worker:集群中任何可以運行Application代碼的節點,類似於YARN中的NodeManager節點。在Standalone模式中指的就是通過Slave文件配置的Worker節點,在Spark on Yarn模式中指的就是NodeManager節點;

 

6、作業(Job):包含多個Task組成的並行計算,往往由Spark Action催生, 一個Application中往往會產生多個Job,一個JOB包含多個RDD及作用於相應RDD上的各種Operation;

7、階段(Stage):每個Job會被拆分很多組Task,作為一個TaskSet, 其名稱為Stage,Stage的划分和調度是有DAGScheduler來負責的,Stage有非最終的Stage(Shuffle Map Stage)和最終的Stage(Result Stage)兩種,Stage的邊界就是發生shuffle的地方

8、任務(Task):被送到某個Executor上的工作單元,但hadoopMR中的MapTask和ReduceTask概念一樣,是運行Application的基本單位,多個Task組成一個Stage,而Task的調度和管理等是由TaskScheduler負責

一圖說明job、stage、task三者的關系

 

9、DAGScheduler:根據Job構建基於Stage的DAGDirected Acyclic Graph有向無環圖),並提交Stage給TASkScheduler。 根據RDD和Stage之間的關系找出開銷最小的調度方法,然后把Stage以TaskSet的形式提交給TaskScheduler,下圖展示了DAGScheduler的作用:

DAGScheduler決定了運行Task的理想位置,並把這些信息傳遞給下層的TaskScheduler。此外,DAGScheduler還處理由於Shuffle數據丟失導致的失敗,這有可能需要重新提交運行之前的Stage(非Shuffle數據丟失導致的Task失敗由TaskScheduler處理)。

 

 

10、TASKSedulter: 將TaskSET提交給worker運行,每個Executor運行什么Task就是在此處分配的. TaskScheduler維護所有TaskSet,當Executor向Driver發生心跳時,TaskScheduler會根據資源剩余情況分配相應的Task。另外TaskScheduler還維護着所有Task的運行標簽,重試失敗的Task。下圖展示了TaskScheduler的作用

 此外須知,在不同運行模式中TASKSedulter任務調度器具體為:

(1)Spark on Standalone模式為TaskScheduler

(2)YARN-Client模式為YarnClientClusterScheduler

(3)YARN-Cluster模式為YarnClusterScheduler

 

將上述術語串起來的運行層次圖如下:

Job=多個stage,Stage=多個同種task, Task分為ShuffleMapTask和ResultTask,Dependency分為ShuffleDependency和NarrowDependency

四 Spark運行基本流程

1、不論 Spark 以何種模式進行部署,任務提交后,都會先啟動 Driver 進程,即啟動SparkContext,構建Spark Application的運行環境,隨后 Driver(SparkContext )進程向資源管理器(可以是Standalone,Mesos,Yarn)注冊應用程序、申請運行Executor資源

2、資源管理器分配Executor資源並啟動StandaloneExecutorBackend,Executor運行情況將隨着心跳發送到資源管理器上;

3、Worker 上的 Executor 啟動后會向 Driver 反向注冊,SparkContext知曉具體的執行者有哪些之后,會依此構建生成DAG圖,將DAG圖分解成Stage,並把Taskset發送給Task Scheduler。Executor向SparkContext申請Task,Task Scheduler將Task發放給Executor運行同時SparkContext將應用程序代碼發放給Executor。

4、Task在Executor上運行,在任務執行的過程中,Executor 也會不斷與 Driver 進行通信,報告任務運行情況。運行完釋放所有資源

詳細執行圖

 

 

總結spark的運行特點:

  1.每個Application獲取專屬的executor進程,該進程在Application期間一直駐留,並以多線程方式運行Task。這種Application隔離機制是有優勢的,無論是從調度角度看(每個Driver調度他自己的任務),還是從運行角度看(來自不同Application的Task運行在不同JVM中),當然這樣意味着Spark Application不能跨應用程序共享數據,除非將數據寫入外部存儲系統

2.Spark與資源管理器無關,只要能夠獲取executor進程,並能保持相互通信就可以了

3.提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack里,因為Spark Application運行過程中SparkContext和Executor之間有大量的信息交換

4.Task采用了數據本地性和推測執行的優化機制

 

RDD運行原理

那么 RDD在Spark架構中是如何運行的呢?總高層次來看,主要分為三步:

1.創建 RDD 對象

2.DAGScheduler模塊介入運算,計算RDD之間的依賴關系。RDD之間的依賴關系就形成了DAG

3.每一個JOB被分為多個Stage,划分Stage的一個主要依據是當前計算因子的輸入是否是確定的,如果是則將其分在同一個Stage,避免多個Stage之間的消息傳遞開銷。

以下面一個按 A-Z 首字母分類,查找相同首字母下不同姓名總個數的例子來看一下 RDD 是如何運行起來的。

 

 

 

步驟 1 :創建 RDD  上面的例子除去最后一個 collect 是個動作,不會創建 RDD 之外,前面四個轉換都會創建出新的 RDD 。因此第一步就是創建好所有 RDD( 內部的五項信息 ) 。

步驟 2 :創建執行計划 Spark 會盡可能地管道化,並基於是否要重新組織數據來划分 階段 (stage) ,例如本例中的 groupBy() 轉換就會將整個執行計划划分成兩階段執行。最終會產生一個 DAG(directed acyclic graph ,有向無環圖 ) 作為邏輯執行計划。

 

步驟 3 :調度任務  將各階段划分成不同的 任務 (task) ,每個任務都是數據和計算的合體。在進行下一階段前,當前階段的所有任務都要執行完成。因為下一階段的第一個轉換一定是重新組織數據的,所以必須等當前階段所有結果數據都計算出來了才能繼續。

假設本例中的 hdfs://names 下有四個文件塊,那么 HadoopRDD 中 partitions 就會有四個分區對應這四個塊數據,同時 preferedLocations 會指明這四個塊的最佳位置。現在,就可以創建出四個任務,並調度到合適的集群結點上。

五 Spark運行模式

spark支持四種運行模式

1、本地運行模式(local模式),常用於本地開發測試

2、分布式集群模式之:獨立運行模式(Standalone模式),是Spark 自帶的一種集群管理模式,即獨立模式,自帶完整的服務,可單獨部署到一個集群中,無需依賴任何其他資源管理系統。它是 Spark 實現的資源調度框架,其主要的節點有 Driver 節點、Master 節點和 Worker 節點。Standalone模式也是最簡單最容易部署的一種模式。

3、分布式集群模式之:Spark on Mesos模式,,即 Spark 運行在Apache Mesos框架之上的一種模式。Apache Mesos是一個更強大的分布式資源管理框架,負責集群資源的分配,它允許多種不同的框架部署在其上,包括YARN。它被稱為是分布式系統的內核。

4、分布式集群模式之:Spark on YARN模式,即 Spark 運行在Hadoop YARN框架之上的一種模式。Hadoop YARN(Yet Another Resource
Negotiator,另一種資源協調者)是一種新的 Hadoop 資源管理器,它是一個通用資源管理系統,可為上層應用提供統一的資源管理和調度。

 

針對三種分布式集群模式,都采用了Master/Worker(Slave)的架構,Spark 分布式運行架構大致如下:

Spark客戶端直接連接Mesos,不需要額外構建Spark集群,國內用的少;

Spark客戶端直接連接Yarn,不需要額外構建Spark集群。國內生產上用的多。

而集群模式又根據Driver運行在哪又分為客Client模式和Cluster模式。用戶在提交任務給 Spark 處理時,以下兩個參數共同決定了 Spark 的運行方式。

· –master MASTER_URL :決定了 Spark 任務提交給哪種集群處理。
· –deploy-mode DEPLOY_MODE:決定了 Driver 的運行方式,可選值為 Client或者 Cluster。

5.1 standalone獨立集群運行模式

Standalone模式使用Spark自帶的資源調度框架,Standalone 集群有四個重要組成部分,分別是:

1、Driver:是一個進程,我們編寫的 Spark 應用程序就運行在 Driver 上,由Driver 進程執行;

Driver既可以運行在Master節點上中,也可以運行在本地Client端。
當用spark-shell交互式工具提交Spark的Job時,Driver在Master節點上運行;
當使用spark-submit工具提交Job或者在Eclips、IDEA等開發平台上使用
”new SparkConf.setManager(“spark://master:7077”)”方式運行Spark任務時,Driver是運行在本地Client端上的。

2、aster:是一個進程,主要負責資源的調度和分配,並進行集群的監控等職責;
3、Worker:是一個進程,一個 Worker 運行在集群中的一台服務器上,主要負責兩個職責,一個是用自己的內存存儲 RDD 的某個或某些 partition;另一個是啟動其他進程和線程(Executor),對 RDD 上的 partition 進行並行的處理和計算。
4、Executor:是一個進程,一個 Worker 上可以運行多個 Executor,Executor通過啟動多個線程(task)來執行對 RDD 的 partition 進行並行計算,也就是執行我們對 RDD 定義的例如 map、flatMap、reduce 等算子操作。

5.1.1 Standalone的client模式

 

解析:
在 Standalone Client 模式下,Driver 在任務提交的本地機器上運行,Driver 啟動后向 Master 注冊應用程序,Master 根據 submit 腳本的資源需求找到內部資源至少可以啟動一個 Executor 的所有 Worker,然后在這些 Worker 之間分配 Executor,Worker上的 Executor 啟動后會向 Driver 反向注冊,所有的 Executor 注冊完成后,Driver 開始執行 main 函數,之后執行到 Action 算子時,開始划分 stage,每個 stage 生成對應的 taskSet,之后將 task 分發到各個 Executor 上執行。
 

運行流程詳解

 

1.SparkContext連接到Master,向Master注冊並申請資源(CPU Core 和Memory);

2.Master根據SparkContext的資源申請要求和Worker心跳周期內報告的信息決定在哪個Worker上分配資源,然后在該Worker上獲取資源,然后啟動StandaloneExecutorBackend;

3.StandaloneExecutorBackend向SparkContext注冊;

4.SparkContext將Applicaiton代碼發送給StandaloneExecutorBackend;並且SparkContext解析Applicaiton代碼,構建DAG圖,並提交給DAG Scheduler分解成Stage(當碰到Action操作時,就會催生Job;每個Job中含有1個或多個Stage,Stage一般在獲取外部數據和shuffle之前產生),然后以Stage(或者稱為TaskSet)提交給Task Scheduler,Task Scheduler負責將Task分配到相應的Worker,最后提交給StandaloneExecutorBackend執行;

5.StandaloneExecutorBackend會建立Executor線程池,開始執行Task,並向SparkContext報告,直至Task完成。

6.所有Task完成后,SparkContext向Master注銷,釋放資源。

5.1.2 Standalone的cluster模式

解析
在 Standalone Cluster 模式下,任務提交后,Master 會找到一個 Worker 啟動 Driver進程, Driver 啟動后向 Master 注冊應用程序,Master 根據 submit 腳本的資源需求找到內部資源至少可以啟動一個 Executor 的所有 Worker,然后在這些 Worker 之間分配 Executor,Worker 上的 Executor 啟動后會向 Driver 反向注冊,所有的 Executor注冊完成后,Driver 開始執行 main 函數,之后執行到 Action 算子時,開始划分 stage,每個 stage 生成對應的 taskSet,之后將 task 分發到各個 Executor 上執行。注意,Standalone 的兩種模式下(client/Cluster),Master 在接到 Driver 注冊Spark 應用程序的請求后,會獲取其所管理的剩余資源能夠啟動一個 Executor 的所有 Worker,然后在這些 Worker 之間分發 Executor,此時的分發只考慮 Worker 上的資源是否足夠使用,直到當前應用程序所需的所有 Executor 都分配完畢,Executor反向注冊完畢后,Driver 開始執行 main 程序。

5.1.3 YARN框架原理

何框架與YARN的結合,都必須遵循YARN的開發模式。在分析Spark on YARN的實現細節之前,有必要先分析一下YARN框架的一些基本原理。

 Yarn框架的基本運行流程圖為:

 其中,ResourceManager負責將集群的資源分配給各個應用使用,而資源分配和調度的基本單位是Container,其中封裝了機器資源,如內存、CPU、磁盤和網絡等,每個任務會被分配一個Container,該任務只能在該Container中執行,並使用該Container封裝的資源。NodeManager是一個個的計算節點,主要負責啟動Application所需的Container,監控資源(內存、CPU、磁盤和網絡等)的使用情況並將之匯報給ResourceManager。ResourceManager與NodeManagers共同組成整個數據計算框架,ApplicationMaster與具體的Application相關,主要負責同ResourceManager協商以獲取合適的Container,並跟蹤這些Container的狀態和監控其進度。

5.1.4 YARN的client運行模式

Yarn-Client模式中,Driver在客戶端本地運行,這種模式可以使得Spark Application和客戶端進行交互,因為Driver在客戶端,所以可以通過webUI訪問Driver的狀態,默認是http://hadoop1:4040訪問,而YARN通過http:// hadoop1:8088訪問。

YARN-client的工作流程分為以下幾個步驟:

1.Spark Yarn Client向YARN的ResourceManager申請啟動Application Master。同時在SparkContent初始化中將創建DAGScheduler和TASKScheduler等,由於我們選擇的是Yarn-Client模式,程序會選擇YarnClientClusterScheduler和YarnClientSchedulerBackend;

2.ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的ApplicationMaster,與YARN-Cluster區別的是在該ApplicationMaster不運行SparkContext,只與SparkContext進行聯系進行資源的分派;

3.Client中的SparkContext初始化完畢后,與ApplicationMaster建立通訊,向ResourceManager注冊,根據任務信息向ResourceManager申請資源(Container);

4.一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向Client中的SparkContext注冊並申請Task;

5.Client中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向Driver匯報運行的狀態和進度,以讓Client隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務;

6.應用程序運行完成后,Client的SparkContext向ResourceManager申請注銷並關閉自己。

5.1.5 YARN的cluster運行模式

在YARN-Cluster模式中,當用戶向YARN中提交一個應用程序后,YARN將分兩個階段運行該應用程序:

    1.第一個階段是把Spark的Driver作為一個ApplicationMaster在YARN集群中先啟動;

    2.第二個階段是由ApplicationMaster創建應用程序,然后為它向ResourceManager申請資源,並啟動Executor來運行Task,同時監控它的整個運行過程,直到運行完成

 

1.   Spark Yarn Client向YARN中提交應用程序,包括ApplicationMaster程序、啟動ApplicationMaster的命令、需要在Executor中運行的程序等;

2.   ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的ApplicationMaster,其中ApplicationMaster進行SparkContext等的初始化;

3.   ApplicationMaster向ResourceManager注冊,這樣用戶可以直接通過ResourceManage查看應用程序的運行狀態,然后它將采用輪詢的方式通過RPC協議為各個任務申請資源,並監控它們的運行狀態直到運行結束;

4.   一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向ApplicationMaster中的SparkContext注冊並申請Task。這一點和Standalone模式一樣,只不過SparkContext在Spark Application中初始化時,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler進行任務的調度,其中YarnClusterScheduler只是對TaskSchedulerImpl的一個簡單包裝,增加了對Executor的等待邏輯等;

5.   ApplicationMaster中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向ApplicationMaster匯報運行的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務;

6.   應用程序運行完成后,ApplicationMaster向ResourceManager申請注銷並關閉自己。

5.1.6 YARN-Client 與 YARN-Cluster 區別

理解YARN-Client和YARN-Cluster深層次的區別之前先清楚一個概念:Application Master。在YARN中,每個Application實例都有一個ApplicationMaster進程,它是Application啟動的第一個容器。它負責和ResourceManager打交道並請求資源,獲取資源之后告訴NodeManager為其啟動Container。從深層次的含義講YARN-Cluster和YARN-Client模式的區別其實就是ApplicationMaster進程的區別。

l  YARN-Cluster模式下,Driver運行在AM(Application Master)中,它負責向YARN申請資源,並監督作業的運行狀況。當用戶提交了作業之后,就可以關掉Client,作業會繼續在YARN上運行,因而YARN-Cluster模式不適合運行交互類型的作業;

l  YARN-Client模式下,Application Master僅僅向YARN請求Executor,Client會和請求的Container通信來調度他們工作,也就是說Client不能離開。

 

補充

因為在Spark作業運行過程中,一般情況下會有大量數據在Driver和集群中進行交互,
所以如果是基於yarn-client的模式,則會在程序運行過程中產生大量的網絡數據傳輸,
造成網卡流量激增;而基於yarn-cluster這種模式,因為driver本身就在集群內部,
所以數據的傳輸也是在集群內部來完成,那么網絡傳輸壓力相對要小;
所以在企業生產環境下多使用yarn-cluster這種模式,測試多用yarn-client這種模式。
但是帶來一個問題,就是不方便監控日志,yarn-cluster這種模式要想監控日志,
必須要到每一台機器上面去查看,但這都不是問題,因為我們有sparkUI,同時也有各種各樣的日志監控組件

 

六 spark使用與不同模式的部署

官方:https://spark-reference-doc-cn.readthedocs.io/zh_CN/latest/deploy-guide/spark-standalone.html

 

Standalone spark自帶的集群模式。需要構建一個由Master+Slave構成的Spark集群,選用ZooKeeper來實現Master的HA,框架結構圖如下:

 https://zhuanlan.zhihu.com/p/99398378

 

推薦閱讀:

https://www.cnblogs.com/shishanyuan/p/4721326.html

https://blog.csdn.net/shuimofengyang/article/details/100124601 

https://www.cnblogs.com/jinggangshan/p/8063970.html

https://blog.csdn.net/github_26054561/article/details/46344889

https://juejin.cn/post/6844904058612875277

https://zhuanlan.zhihu.com/p/99398378

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM