Spark原理概述


原文來自我的個人網站:http://www.itrensheng.com/archives/Spark_basic_knowledge

一. Spark出現的背景

在Spark出現之前,大數據計算引擎主要是MapReduce。HDFS + MapReduce的組合幾乎可以實現所有的大數據應用場景。MR框架抽象程度比較高,需要我們編寫Map和Reduce兩個步驟(MapReduce 框架其實包含5 個步驟:Map、Sort、Combine、Shuffle以及Reduce)

image.png

每個Map和Reduce之間需要進行Shuffle(這步操作會涉及數量巨大的網絡傳輸,需要耗費大量的時間)。由於 MapReduce 的框架限制,一個 MapReduce 任務只能包含一次 Map 和一次 Reduce,計算完成之后,MapReduce會將運算中間結果寫回到磁盤中,供下次計算使用。

二.Spark簡介

Spark是由加州大學伯克利分校AMP實驗室開源的分布式大規模數據處理通用引擎,具有高吞吐、低延時、通用易擴展、高容錯等特點。Spark內部提供了豐富的開發庫,集成了數據分析引擎Spark SQL、圖計算框架GraphX、機器學習庫MLlib、流計算引擎Spark Streaming

image.png

相比於MapReduce的計算模型,Spark是將數據一直緩存在內存中,直到計算得到最后的結果,再將結果寫入到磁盤,所以多次運算的情況下,Spark省略了多次磁盤IO。

對比 MapReduce Spark
速度 處理數據需要連續的讀寫磁盤 是MapReduce的10到100倍
編碼難度 程序員來賦值每一步 RDD高可用,失敗重試
及時性 不適合做OLAP,只適合批處理 能兼顧批處理和OLAP
調度 使用外部的調度,如Oozie 自帶調度,也可使用外部調度
編程語言 Java Scala
SQL支持 本身不提供,需要外部查詢引擎,如Hive 自帶Spark SQL
可擴展性 最大支持14000個節點 最大8000節點
機器學習 外部依賴Mahout 自帶Spark MLlib
緩存 能不緩存到內存中 可以緩存到內存中
安全性 安全特性比Spark廣泛 不如MapReduce

三. Spark系統架構

image.png

Driver:

一個Spark job運行前會啟動一個Driver進程,也就是作業的主進程,負責解析和生成各個Stage,並調度Task到Executor上

SparkContext:

程序運行調度的核心,高層調度去DAGScheduler划分程序的每個階段,底層調度器TaskScheduler划分每個階段具體任務

Worker:

也就是WorkderNode,負責執行Master所發送的指令,來具體分配資源並執行任務

Executer:

負責執行作業。如圖中所以,Executer是分步在各個Worker Node上,接收來自Driver的命令並加載Task

DAGScheduler:

負責高層調度,划分stage並生產DAG有向無環圖

TaskScheduler:

負責具體stage內部的底層調度,具體task的調度和容錯

Job:

每次Action都會觸發一次Job,一個Job可能包含一個或多個stage

Stage:

用來計算中間結果的Tasksets。分為ShuffleMapStage和ResultStage,出了最后一個Stage是ResultStage外,其他都是ShuffleMapStage。ShuffleMapStage會產生中間結果,是以文件的方式保存在集群當中,以便能夠在不同stage種重用

Task:

任務執行的工作單位,每個Task會被發送到一個節點上,每個Task對應RDD的一個partition.

RDD:

是以partition分片的不可變,Lazy級別數據集合 算子

Transformation:

由DAGScheduler划分到pipeline中,是Lazy級別的,不會觸發任務的執行

Action:

會觸發Job來執行pipeline中的運算

四. Spark Job執行流程

image.png

    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))

    spark.stop()
  1. 使用spark-submit向集群提交一個job之后,就會啟動一個Driver進程。Driver進程會根據deploy-mode不同而不同,可能是本地啟動,也可能是集群中的節點
  2. Driver進程向資源管理器Resource Manager(可以是Standalone、Mesos或YARN)注冊並申請運行Executor資源,如YARN會根據spark-submit中申請的參數來為Spark作業設置對應的資源參數,並在集群中的各個節點上分配對應數量的Executor進程
  3. Driver進程會將整個Job拆分為多個Stage,一個Stage可能包含多個Task,並將這些Task分配到第二步中申請到的Executor進程中執行。Task是執行的最小Unit。當一個Stage所屬的所有Task都執行完成之后,會在各個節點的磁盤文件中記錄中間結果並繼續執行后續的Stage。

四. RDD

定義:

RDD 是 Spark 的計算模型。RDD(Resilient Distributed Dataset)叫做彈性的分布式數據集合,是 Spark中最基本的數據抽象,它代表一個不可變、只讀的,被分區的數據集。
可以將 RDD 理解為一個分布式對象集合,本質上是一個只讀的分區記錄集合。每個 RDD可以分成多個分區,每個分區就是一個數據集片段。一個 RDD 的不同分區可以保存到集群中的不同結點上,從而可以在集群中的不同結點上進行並行計算。

image.png

五大特性:

  1. 分區列表:RDD是分區的,且每一個分區都會被一個Task所處理,所以Job的並行執行能力取決於分區多少。默認情況下,RDD的分區數是集成自父RDD,這個值也可以在創建RDD的時候在代碼中指定

  2. 計算函數:每個分區都有一個計算函數,這個計算函數是以分片為基本單位的。如在RDD的寬依賴場景下,將寬依賴划分為Stage,而Stage使用BlockManager獲取分區數據並根據計算函數來split對應的Blockimage.png

  3. 存在依賴關系:RDD經過計算任務每次都會轉化為一個不可變的新的RDD。因為有依賴關系,所以當前一個RDD失敗的時候,Spark會根據依賴關系重新計算前一個失敗的RDD,而不是所有的RDD。

  4. KV數據類型分區器:控制分區策略和分區數,每個KV形式的RDD都有Partitioner屬性,來控制RDD如何分區。

5.優先位置列表:每個分區都有優先位置列表,用於存儲Partition的優先位置。如果是讀取HDFS,那就是每個Block的優先位置。

RDD的依賴關系

依賴關系分為寬依賴(Wide Dependency)和窄依賴(Narraw Dependency)。

  • 寬依賴:子RDD分區依賴父RDD的所有分區。如果子RDD部分分區甚至全部分區數據損壞或丟失,需要從所有父RDD重新計算,相對窄依賴而言付出的代價更高,所以應盡量避免寬依賴的使用

  • image.png

  • 窄依賴:父RDD的分區只對應一個子RDD的分區。如果子RDD只有部分分區數據損壞或者丟失,只需要從對應的父RDD重新計算恢復如果子RDD只有部分分區數據損壞或者丟失,只需要從對應的父RDD重新計算恢復image.png

類型

RDD可以分為2中類型:Transformation 和 Action
image.png

Transformation 操作不是馬上提交 Spark 集群執行的,Spark 在遇到 Transformation操作時只會記錄需要這樣的操作,並不會去執行,需要等到有 Action 操作的時候才會真正啟動計算過程進行計算.
針對每個 Action,Spark 會生成一個 Job,從數據的創建開始,經過 Transformation, 結尾是 Action 操作.這些操作對應形成一個有向無環圖(DAG),形成 DAG 的先決條件是最后的函數操作是一個Actionimage.png

五. 緩存

Spark 本身就是一個基於內存的迭代式計算,當某個RDD的計算結果會被多次重復使用的時候,緩存就很有必要(尤其是對於整個血統很長的計算任務)。如果程序從頭到尾只有一個 Action 操作且子RDD只依賴於一個父RDD 的話,就不需要使用 cache 這個機制。
Spark 可以使用 persist 和 cache 方法將任意 RDD 緩存到內存、磁盤文件系統中。緩存是容錯的,如果一個 RDD 分片丟失,則可以通過構建它的轉換來自動重構。被緩存的 RDD 被使用時,存取速度會被大大加速。一般情況下,Executor 內存的 60% 會分配給 cache,剩下的 40% 用來執行任務

  • MEMORY_ONLY: 使用未序列化的Java對象格式,將數據保存在內存中。如果內存不夠存放所有的數據,則某些分區的數據就不會進行持久化。那么下次對這個RDD執行算子操作時,那些沒有被持久化的數據,需要從源頭處重新計算一遍。這是默認的持久化策略,使用cache()方法時,實際就是使用的這種持久化策略。

  • MEMORY_ONLY_SER: 基本含義同MEMORY_ONLY。唯一的區別是,會將RDD中的數據進行序列化,RDD的每個partition會被序列化成一個字節數組。這種方式更加節省內存,從而可以避免持久化的數據占用過多內存導致頻繁GC。

  • MYMORY_AND_DISK: 使用未序列化的Java對象格式,優先嘗試將數據保存在內存中。如果內存不夠存放所有的數據,會將數據寫入磁盤文件中,下次對這個RDD執行算子時,持久化在磁盤文件中的數據會被讀取出來使用。

  • MEMORY_AND_DISK_SER: 基本含義同MEMORY_AND_DISK。唯一的區別是,會將RDD中的數據進行序列化,RDD的每個partition會被序列化成一個字節數組。這種方式更加節省內存,從而可以避免持久化的數據占用過多內存導致頻繁GC。

  • DISK_ONLY: 使用未序列化的Java對象格式,將數據全部寫入磁盤文件中。

  • MEMORY_ONLY_2/MEMORY_AND_DISK_2: 對於上述任意一種持久化策略,如果加上后綴_2,代表的是將每個持久化的數據,都復制一份副本,並將副本保存到其他節點上。這種基於副本的持久化機制主要用於進行容錯。假如某個節點掛掉,節點的內存或磁盤中的持久化數據丟失了,那么后續對RDD計算時還可以使用該數據在其他節點上的副本。如果沒有副本的話,就只能將這些數據從源頭處重新計算一遍了。

  • OFF_HEAP(experimental) : RDD的數據序例化之后存儲至Tachyon。相比於MEMORY_ONLY_SER,OFF_HEAP能夠減少垃圾回收開銷、使得Spark Executor更“小”更“輕”的同時可以共享內存;而且數據存儲於Tachyon中,Spark集群節點故障並不會造成數據丟失,因此這種方式在“大”內存或多並發應用的場景下是很有吸引力的。需要注意的是,Tachyon並不直接包含於Spark的體系之內,需要選擇合適的版本進行部署;它的數據是以“塊”為單位進行管理的,這些塊可以根據一定的算法被丟棄,且不會被重建。

Randy
微信掃描二維碼,關注我的公眾號
我的個人網站: http://www.itrensheng.com/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM