Spark原理小總結


1、spark是什么?

  快速,通用,可擴展的分布式計算引擎

2、彈性分布式數據集RDD

  RDDResilient Distributed Dataset)叫做分布式數據集,Spark中最基本的數據抽象,它代表一個不可變、可分區、里面的元素可並行計算的集合RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在內存中,后續的查詢能夠重用工作集,這極大地提升了查詢速度。

RDD的屬性

 

1)一組分片(Partition,即數據集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定並行計算的粒度用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的CPU Core的數目

2)一個計算每個分區的函數SparkRDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不需要保存每次計算的結果

3)RDD之間的依賴關系RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前后依賴關系。在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。

4)一個PartitionerRDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另外一個是基於范圍的RangePartitioner。只有對於於key-valueRDD,才會有Partitioner,非key-valueRDDParititioner的值是NonePartitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。

5)一個列表,存儲存取每個Partition的優先位置(preferred location)。對於一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的存儲位置。

 

創建RDD的兩種方式

  1、由一個已經存在的Scala集合創建。

    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

  2、由外部存儲系統的數據集創建,包括本地的文件系統,還有所有Hadoop支持的數據集,比如HDFSCassandraHBase

    val rdd2 = sc.textFile("hdfs://node1.itcast.cn:9000/words.txt")

 

3Spark的算子

  RDD中的所有轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。

  1、Transformation

  2、Action

 

4RDD的依賴關系

  RDD和它依賴的父RDDs)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

   

 

RDD緩存

  Spark速度非常快的原因之一,就是在不同操作中可以在內存中持久化或緩存個數據集。當持久化某個RDD后,每一個節點都將把計算的分片結果保存在內存中

 

RDD緩存的方式

  RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,並供后面重用。

  cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份

 

Spark的存儲級別

 

緩存有可能丟失,或者存儲存儲於內存的數據由於內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的數據會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition

 

5DAG的生成

DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就就形成了DAG,根據RDD之間的依賴關系的不同將DAG划分成不同的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來的計算,因此寬依賴是划分Stage的依據。

 

 

spark運行原理 

  1、通過ActorSystem創建MasterActor,啟動定時器,定時檢查與接收Worker節點的發送消息

  2Worker節點主動向Master發送注冊消息

  3Master接收Worker的注冊請求,然后將注冊信息保存起來,並向Worker返回一個注冊成功的消息

  4Worker接收到Master注冊成功的消息后,啟用定時器,定時向master發送心跳報活,Master接收到Worker發送來的心跳消息后,更新Worker上一次的心跳時間

  5DAGScheduler根據FinalRDD遞歸向上解析Lineager的依賴關系,並以寬依賴為切分一個新stage的依據並將多個task任務封裝到TaskSet,其中Task的數量由其父RDD的切片數量決定,最后使用遞歸優先提交父Stage(TaskSet)

  6、先創建TaskSchedulerTaskSchedulerImpl接着又創建SparkDeploySchedulerBackend對資源參數創建AppClientMaster注冊Application,並替每個TaskSet創建TaskManager負責監控此TaskSet中任務的執行情況

  7Master接收到ClientActor的任務描述之后,將任務描述信息保存起來,然后ClientActor返回消息,告知ClientActor任務注冊成功,接下來Master(打散|負載均衡|盡量集中)進行資源調度

  8MasterWorker通信,然后讓Worker啟動Executor

  9ExecutorDriver發送注冊消息,Driver接收到Executor注冊消息后,響應注冊成功的消息

  10Executor接收到Driver注冊成功的消息后,本進程中創建Executor的引用對象

  11DriverTaskSchedulerImpExecutor發送LaunchTask消息,Executor將創建一個線程池作為所提交的Task任務的容器

  12Task接收到launchTask消息后,准備運行文件初始化與反序列化,就緒后,調用Taskrun方法,其中每個Task所執行的函數是應用在RDD中的一個獨立分區上

  13Task運行完成,向TaskManager匯報情況,並且釋放線程資源

  14、所有Task運行結束之后,ExecutorWorker注銷自身,釋放資源。

 

 

Spark Standalone

Spark Standalone模式中,資源調度室Spark自行實現的,其節點類型分為masterworker

其中Driver運行在Master中,並且有長駐內存的Master進程守護,Worker節點上常駐Worker守護進程,負責與Master節點通信,通過ExecutorRunner來控制運行在當前節點上的CoarseGrainedExecutorBackend

每個Worker上存在一個或多個CoarseGrainedExecutorBackend進程,每個進程包含一個Executor對象,該對象持有一個線程池,每個線程池可以執行一個Task

 

 

節點類型:

1. master 節點: 常駐master進程,負責管理全部worker節點。

2. worker 節點: 常駐worker進程,負責管理executor 並與master節點通信。

dirvier:官方解釋為: The process running the main() function of the application and creating the SparkContext。即理解為用戶自己編寫的應用程序

Executor:執行器:

  在每個WorkerNode上為某應用啟動的一個進程,該進程負責運行任務,並且負責將數據存在內存或者磁盤上,每個任務都有各自獨立的Executor

  Executor是一個執行Task的容器。它的主要職責是:

  1、初始化程序要執行的上下文SparkEnv,解決應用程序需要運行時的jar包的依賴,加載類。

  2、同時還有一個ExecutorBackendcluster manager匯報當前的任務狀態,這一方面有點類似hadooptasktrackertask

總結:Executor是一個應用程序運行的監控和執行容器。Executor的數目可以在submit時,由 --num-executors (on yarn)指定.

Job

  包含很多task的並行計算,可以認為Spark RDD 里面的action,每個action的計算會生成一個job

用戶提交的Job會提交給DAGSchedulerJob會被分解成StageTask

 

Stage:

  一個Job會被拆分為多組Task,每組任務被稱為一個Stage就像Map StageReduce Stage

  Stage的划分在RDD的論文中有詳細的介紹,簡單的說是以shuffleresult這兩種類型來划分。在Spark中有兩類task,一類是shuffleMapTask,一類是resultTask,第一類task的輸出是shuffle所需數據,第二類task的輸出是resultstage的划分也以此為依據,shuffle之前的所有變換是一個stageshuffle之后的操作是另一個stage。比如 rdd.parallize(1 to 10).foreach(println) 這個操作沒有shuffle,直接就輸出了,那么只有它的taskresultTaskstage也只有一個;如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 這個job因為有reduce,所以有一個shuffle過程,那么reduceByKey之前的是一個stage,執行shuffleMapTask,輸出shuffle所需的數據,reduceByKey到最后是一個stage,直接就輸出結果了。如果job中有多次shuffle,那么每個shuffle之前都是一個stage

 

Task

  即 stage 下的一個任務執行單元,一般來說,一個 rdd 有多少個 partition,就會有多少個 task,因為每一個 task 只是處理一個 partition 上的數據.

  每個executor執行的task的數目, 可以由submit時,--num-executors(on yarn) 來指定。

 

小結:

  驅動程序就是執行了一個Spark Applicationmain函數和創建Spark Context的進程,它包含了這個application的全部代碼。

  Spark Application中的每個action會被Spark作為Job進行調度。

  每個Job是一個計算序列的最終結果,而這個序列中能夠產生中間結果的計算就是一個stage

  對於TransformationsActions是有着明確區分的。通常Action對應了Job,而Transformation對應了Stage

 

  一個Job被拆分成若干個Stage,每個Stage執行一些計算,產生一些中間結果。它們的目的是最終生成這個Job的計算結果。

  而每個Stage是一個task set,包含若干個taskTaskSpark中最小的工作單元,在一個executor上完成一個特定的事情。

 

  1driver program是用戶寫的帶main函數的代碼

  2、每個action算子的操作都會對應一個job,例如(ForeachRDD寫入外部系統的一個操作)

  3DAGScheduler會對Job進行拆分,拆分的依據:根據FinalRDD(在這里ForeachRDD)遞歸向上解析Lineager的依賴關系,以寬依賴為切分stage的依據,切分成若干個Stage,遞歸優先提交父Stage,每個Stage里面包含多個Task任務

  4、若干個Transformation的算子RDD組成Stage,所以一個RDD中有多少個partition,就有多少個Task,因為每一個Task只對一個partition數據做處理。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM