Spark原理及關鍵技術點


Spark

Apache Spark 是專為大規模數據處理而設計的快速通用的計算引擎。Spark是UC Berkeley AMP lab (加州大學伯克利分校的AMP實驗室)所開源的類Hadoop MapReduce的通用並行框架,Spark,擁有Hadoop MapReduce所具有的優點;但不同於MapReduce的是——Job中間輸出結果可以保存在內存中,從而不再需要讀寫HDFS,因此Spark能更好地適用於數據挖掘與機器學習等需要迭代的MapReduce的算法。

RDD

RDD(Resilient Distributed Dataset)叫做分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、里面的元素可並行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在內存中,后續的查詢能夠重用工作集,這極大地提升了查詢速度。

RDD的屬性:

1)一組分片(Partition),即數據集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的CPU Core的數目。

2)一個計算每個分區的函數。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不需要保存每次計算的結果。

3)RDD之間的依賴關系。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前后依賴關系。在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。

4)一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另外一個是基於范圍的RangePartitioner。只有對於於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。

5)一個列表,存儲存取每個Partition的優先位置(preferred location)。對於一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的存儲位置。

創建RDD的兩種方式

  1、由一個已經存在的Scala集合創建。

    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

  2、由外部存儲系統的數據集創建,包括本地的文件系統,還有所有Hadoop支持的數據集,比如HDFS、Cassandra、HBase等

    val rdd2 = sc.textFile("hdfs://node1.itcast.cn:9000/words.txt")

RDD的依賴關系

RDD和它依賴的父RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
 
 
 image
 

RDD緩存

Spark速度非常快的原因之一,就是在不同操作中可以在內存中持久化或緩存個數據集。當持久化某個RDD后,每一個節點都將把計算的分片結果保存在內存中

RDD緩存的方式

RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,並供后面重用。

cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份

RDD & DataFrame && DataSet

  • RDD:全稱Resilient Distributed Dataset,彈性分布式數據集,Spark中最基礎的數據抽象,特點是RDD只包含數據本身,沒有數據結構。

  • DataFrame:也是一個分布式數據容器,除數據本身,還記錄了數據的結構信息,即schema;結構信息便於Spark知道該數據集中包含了哪些列,每一列的類型和數據是什么。

  • DataSet:Spark中最上層的數據抽象,不僅包含數據本身,記錄了數據的結構信息schema,還包含了數據集的類型,也就是真正把數據集做成了一個java對象的形式,需要先創建一個樣例類case class,把數據做成樣例類的格式,每一列就是樣例類里的屬性。

注:

(1)DataSet是面向對象的思想,把數據變成了對象的屬性。

(2)DataSet是強類型,比如可以有DataSet[Car],DataSet[Person](汽車對象數據集,人對象數據集);DataFrame=DataSet[Row],DataFrame是DataSet的特例。

(3)在后期的Spark版本中,DataSet會逐步取代RDD和DataFrame成為唯一的API接口。

(4)三者可以互相轉換

Spark的存儲級別

image

緩存有可能丟失,或者存儲於內存的數據由於內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的數據會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition。

Spark的算子

RDD中的所有轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。

 1、Transformation

 2、Action

DAG的生成

DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就就形成了DAG,根據RDD之間的依賴關系的不同將DAG划分成不同的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來的計算,因此寬依賴是划分Stage的依據。

image

spark運行原理

1、通過ActorSystem創建MasterActor,啟動定時器,定時檢查與接收Worker節點的發送消息

2、Worker節點主動向Master發送注冊消息
 
3、Master接收Worker的注冊請求,然后將注冊信息保存起來,並向Worker返回一個注冊成功的消息

4、Worker接收到Master注冊成功的消息后,啟用定時器,定時向master發送心跳報活,Master接收到Worker發送來的心跳消息后,更新Worker上一次的心跳時間

5、DAGScheduler根據FinalRDD遞歸向上解析Lineager的依賴關系,並以寬依賴為切分一個新stage的依據,並將多個task任務封裝到TaskSet,其中Task的數量由其父RDD的切片數量決定,最后使用遞歸優先提交父Stage(TaskSet)

6、先創建TaskScheduler即TaskSchedulerImpl接着又創建SparkDeploySchedulerBackend對資源參數創建AppClient與Master注冊Application,並替每個TaskSet創建TaskManager負責監控此TaskSet中任務的執行情況

7、Master接收到ClientActor的任務描述之后,將任務描述信息保存起來,然后向ClientActor返回消息,告知ClientActor任務注冊成功,接下來Master(打散|負載均衡|盡量集中)進行資源調度

8、Master跟Worker通信,然后讓Worker啟動Executor

9、Executor向Driver發送注冊消息,Driver接收到Executor注冊消息后,響應注冊成功的消息

10、Executor接收到Driver注冊成功的消息后,本進程中創建Executor的引用對象
 
11、Driver中TaskSchedulerImp向Executor發送LaunchTask消息,Executor將創建一個線程池作為所提交的Task任務的容器

12、Task接收到launchTask消息后,准備運行文件初始化與反序列化,就緒后,調用Task的run方法,其中每個Task所執行的函數是應用在RDD中的一個獨立分區上

13、Task運行完成,向TaskManager匯報情況,並且釋放線程資源

14、所有Task運行結束之后,Executor向Worker注銷自身,釋放資源。

節點類型:

spark 的部署圖:

image

  1. master 節點: 常駐master進程,負責管理全部worker節點。

  2. worker 節點: 常駐worker進程,負責管理executor 並與master節點通信。

dirvier:官方解釋為: The process running the main() function of the application and creating the SparkContext。即理解為用戶自己編寫的應用程序

Executor:執行器:

在每個WorkerNode上為某應用啟動的一個進程,該進程負責運行任務,並且負責將數據存在內存或者磁盤上,每個任務都有各自獨立的Executor。

Executor是一個執行Task的容器。它的主要職責是:

  1、初始化程序要執行的上下文SparkEnv,解決應用程序需要運行時的jar包的依賴,加載類。

  2、同時還有一個ExecutorBackend向cluster manager匯報當前的任務狀態,這一方面有點類似hadoop的tasktracker和task。

  總結:Executor是一個應用程序運行的監控和執行容器。Executor的數目可以在submit時,由 --num-executors (on yarn)指定.
  

Job:

包含很多task的並行計算,可以認為是Spark RDD 里面的action,每個action的計算會生成一個job。

用戶提交的Job會提交給DAGScheduler,Job會被分解成Stage和Task。
  

Stage:

一個Job會被拆分為多組Task,每組任務被稱為一個Stage就像Map Stage, Reduce Stage。

Stage的划分在RDD的論文中有詳細的介紹,簡單的說是以shuffle和result這兩種類型來划分。在Spark中有兩類task,一類是shuffleMapTask,一類是resultTask,第一類task的輸出是shuffle所需數據,第二類task的輸出是result,stage的划分也以此為依據,shuffle之前的所有變換是一個stage,shuffle之后的操作是另一個stage。比如 rdd.parallize(1 to 10).foreach(println) 這個操作沒有shuffle,直接就輸出了,那么只有它的task是resultTask,stage也只有一個;如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 這個job因為有reduce,所以有一個shuffle過程,那么reduceByKey之前的是一個stage,執行shuffleMapTask,輸出shuffle所需的數據,reduceByKey到最后是一個stage,直接就輸出結果了。如果job中有多次shuffle,那么每個shuffle之前都是一個stage。

Task

即 stage 下的一個任務執行單元,一般來說,一個 rdd 有多少個 partition,就會有多少個 task,因為每一個 task 只是處理一個 partition 上的數據.

總結

本文主要對Spark的關鍵技術及原理做了闡述,主要理解以下概念:

  • driver program是用戶寫的帶main函數的代碼

  • 每個action算子的操作都會對應一個job,例如(ForeachRDD寫入外部系統的一個操作)

  • DAGScheduler會對Job進行拆分,拆分的依據:根據FinalRDD(在這里ForeachRDD)遞歸向上解析Lineager的依賴關系,以寬依賴為切分stage的依據,切分成若干個Stage,遞歸優先提交父Stage,每個Stage里面包含多個Task任務

  • 若干個Transformation的算子RDD組成Stage,一個RDD中有多少個partition,就有多少個Task,因為每一個Task只對一個partition數據做處理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM