原文來自我的個人網站:http://www.itrensheng.com/archives/Spark_basic_knowledge
一. Spark出現的背景
在Spark出現之前,大數據計算引擎主要是MapReduce。HDFS + MapReduce的組合幾乎可以實現所有的大數據應用場景。MR框架抽象程度比較高,需要我們編寫Map和Reduce兩個步驟(MapReduce 框架其實包含5 個步驟:Map、Sort、Combine、Shuffle以及Reduce)
每個Map和Reduce之間需要進行Shuffle(這步操作會涉及數量巨大的網絡傳輸,需要耗費大量的時間)。由於 MapReduce 的框架限制,一個 MapReduce 任務只能包含一次 Map 和一次 Reduce,計算完成之后,MapReduce會將運算中間結果寫回到磁盤中,供下次計算使用。
二.Spark簡介
Spark是由加州大學伯克利分校AMP實驗室開源的分布式大規模數據處理通用引擎,具有高吞吐、低延時、通用易擴展、高容錯等特點。Spark內部提供了豐富的開發庫,集成了數據分析引擎Spark SQL、圖計算框架GraphX、機器學習庫MLlib、流計算引擎Spark Streaming
相比於MapReduce的計算模型,Spark是將數據一直緩存在內存中,直到計算得到最后的結果,再將結果寫入到磁盤,所以多次運算的情況下,Spark省略了多次磁盤IO。
對比 | MapReduce | Spark |
---|---|---|
速度 | 處理數據需要連續的讀寫磁盤 | 是MapReduce的10到100倍 |
編碼難度 | 程序員來賦值每一步 | RDD高可用,失敗重試 |
及時性 | 不適合做OLAP,只適合批處理 | 能兼顧批處理和OLAP |
調度 | 使用外部的調度,如Oozie | 自帶調度,也可使用外部調度 |
編程語言 | Java | Scala |
SQL支持 | 本身不提供,需要外部查詢引擎,如Hive | 自帶Spark SQL |
可擴展性 | 最大支持14000個節點 | 最大8000節點 |
機器學習 | 外部依賴Mahout | 自帶Spark MLlib |
緩存 | 能不緩存到內存中 | 可以緩存到內存中 |
安全性 | 安全特性比Spark廣泛 | 不如MapReduce |
三. Spark系統架構
Driver:
一個Spark job運行前會啟動一個Driver進程,也就是作業的主進程,負責解析和生成各個Stage,並調度Task到Executor上
SparkContext:
程序運行調度的核心,高層調度去DAGScheduler划分程序的每個階段,底層調度器TaskScheduler划分每個階段具體任務
Worker:
也就是WorkderNode,負責執行Master所發送的指令,來具體分配資源並執行任務
Executer:
負責執行作業。如圖中所以,Executer是分步在各個Worker Node上,接收來自Driver的命令並加載Task
DAGScheduler:
負責高層調度,划分stage並生產DAG有向無環圖
TaskScheduler:
負責具體stage內部的底層調度,具體task的調度和容錯
Job:
每次Action都會觸發一次Job,一個Job可能包含一個或多個stage
Stage:
用來計算中間結果的Tasksets。分為ShuffleMapStage和ResultStage,出了最后一個Stage是ResultStage外,其他都是ShuffleMapStage。ShuffleMapStage會產生中間結果,是以文件的方式保存在集群當中,以便能夠在不同stage種重用
Task:
任務執行的工作單位,每個Task會被發送到一個節點上,每個Task對應RDD的一個partition.
RDD:
是以partition分片的不可變,Lazy級別數據集合 算子
Transformation:
由DAGScheduler划分到pipeline中,是Lazy級別的,不會觸發任務的執行
Action:
會觸發Job來執行pipeline中的運算
四. Spark Job執行流程
spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) output = counts.collect() for (word, count) in output: print("%s: %i" % (word, count)) spark.stop()
- 使用spark-submit向集群提交一個job之后,就會啟動一個Driver進程。Driver進程會根據deploy-mode不同而不同,可能是本地啟動,也可能是集群中的節點
- Driver進程向資源管理器Resource Manager(可以是Standalone、Mesos或YARN)注冊並申請運行Executor資源,如YARN會根據spark-submit中申請的參數來為Spark作業設置對應的資源參數,並在集群中的各個節點上分配對應數量的Executor進程
- Driver進程會將整個Job拆分為多個Stage,一個Stage可能包含多個Task,並將這些Task分配到第二步中申請到的Executor進程中執行。Task是執行的最小Unit。當一個Stage所屬的所有Task都執行完成之后,會在各個節點的磁盤文件中記錄中間結果並繼續執行后續的Stage。
四. RDD
定義:
RDD 是 Spark 的計算模型。RDD(Resilient Distributed Dataset)叫做彈性的分布式數據集合,是 Spark中最基本的數據抽象,它代表一個不可變、只讀的,被分區的數據集。
可以將 RDD 理解為一個分布式對象集合,本質上是一個只讀的分區記錄集合。每個 RDD可以分成多個分區,每個分區就是一個數據集片段。一個 RDD 的不同分區可以保存到集群中的不同結點上,從而可以在集群中的不同結點上進行並行計算。
五大特性:
-
分區列表:RDD是分區的,且每一個分區都會被一個Task所處理,所以Job的並行執行能力取決於分區多少。默認情況下,RDD的分區數是集成自父RDD,這個值也可以在創建RDD的時候在代碼中指定
-
計算函數:每個分區都有一個計算函數,這個計算函數是以分片為基本單位的。如在RDD的寬依賴場景下,將寬依賴划分為Stage,而Stage使用BlockManager獲取分區數據並根據計算函數來split對應的Block
-
存在依賴關系:RDD經過計算任務每次都會轉化為一個不可變的新的RDD。因為有依賴關系,所以當前一個RDD失敗的時候,Spark會根據依賴關系重新計算前一個失敗的RDD,而不是所有的RDD。
-
KV數據類型分區器:控制分區策略和分區數,每個KV形式的RDD都有Partitioner屬性,來控制RDD如何分區。
5.優先位置列表:每個分區都有優先位置列表,用於存儲Partition的優先位置。如果是讀取HDFS,那就是每個Block的優先位置。
RDD的依賴關系
依賴關系分為寬依賴(Wide Dependency)和窄依賴(Narraw Dependency)。
-
寬依賴:子RDD分區依賴父RDD的所有分區。如果子RDD部分分區甚至全部分區數據損壞或丟失,需要從所有父RDD重新計算,相對窄依賴而言付出的代價更高,所以應盡量避免寬依賴的使用
-
窄依賴:父RDD的分區只對應一個子RDD的分區。如果子RDD只有部分分區數據損壞或者丟失,只需要從對應的父RDD重新計算恢復如果子RDD只有部分分區數據損壞或者丟失,只需要從對應的父RDD重新計算恢復
類型
RDD可以分為2中類型:Transformation 和 Action
Transformation 操作不是馬上提交 Spark 集群執行的,Spark 在遇到 Transformation操作時只會記錄需要這樣的操作,並不會去執行,需要等到有 Action 操作的時候才會真正啟動計算過程進行計算.
針對每個 Action,Spark 會生成一個 Job,從數據的創建開始,經過 Transformation, 結尾是 Action 操作.這些操作對應形成一個有向無環圖(DAG),形成 DAG 的先決條件是最后的函數操作是一個Action
五. 緩存
Spark 本身就是一個基於內存的迭代式計算,當某個RDD的計算結果會被多次重復使用的時候,緩存就很有必要(尤其是對於整個血統很長的計算任務)。如果程序從頭到尾只有一個 Action 操作且子RDD只依賴於一個父RDD 的話,就不需要使用 cache 這個機制。
Spark 可以使用 persist 和 cache 方法將任意 RDD 緩存到內存、磁盤文件系統中。緩存是容錯的,如果一個 RDD 分片丟失,則可以通過構建它的轉換來自動重構。被緩存的 RDD 被使用時,存取速度會被大大加速。一般情況下,Executor 內存的 60% 會分配給 cache,剩下的 40% 用來執行任務
-
MEMORY_ONLY: 使用未序列化的Java對象格式,將數據保存在內存中。如果內存不夠存放所有的數據,則某些分區的數據就不會進行持久化。那么下次對這個RDD執行算子操作時,那些沒有被持久化的數據,需要從源頭處重新計算一遍。這是默認的持久化策略,使用cache()方法時,實際就是使用的這種持久化策略。
-
MEMORY_ONLY_SER: 基本含義同MEMORY_ONLY。唯一的區別是,會將RDD中的數據進行序列化,RDD的每個partition會被序列化成一個字節數組。這種方式更加節省內存,從而可以避免持久化的數據占用過多內存導致頻繁GC。
-
MYMORY_AND_DISK: 使用未序列化的Java對象格式,優先嘗試將數據保存在內存中。如果內存不夠存放所有的數據,會將數據寫入磁盤文件中,下次對這個RDD執行算子時,持久化在磁盤文件中的數據會被讀取出來使用。
-
MEMORY_AND_DISK_SER: 基本含義同MEMORY_AND_DISK。唯一的區別是,會將RDD中的數據進行序列化,RDD的每個partition會被序列化成一個字節數組。這種方式更加節省內存,從而可以避免持久化的數據占用過多內存導致頻繁GC。
-
DISK_ONLY: 使用未序列化的Java對象格式,將數據全部寫入磁盤文件中。
-
MEMORY_ONLY_2/MEMORY_AND_DISK_2: 對於上述任意一種持久化策略,如果加上后綴_2,代表的是將每個持久化的數據,都復制一份副本,並將副本保存到其他節點上。這種基於副本的持久化機制主要用於進行容錯。假如某個節點掛掉,節點的內存或磁盤中的持久化數據丟失了,那么后續對RDD計算時還可以使用該數據在其他節點上的副本。如果沒有副本的話,就只能將這些數據從源頭處重新計算一遍了。
-
OFF_HEAP(experimental) : RDD的數據序例化之后存儲至Tachyon。相比於MEMORY_ONLY_SER,OFF_HEAP能夠減少垃圾回收開銷、使得Spark Executor更“小”更“輕”的同時可以共享內存;而且數據存儲於Tachyon中,Spark集群節點故障並不會造成數據丟失,因此這種方式在“大”內存或多並發應用的場景下是很有吸引力的。需要注意的是,Tachyon並不直接包含於Spark的體系之內,需要選擇合適的版本進行部署;它的數據是以“塊”為單位進行管理的,這些塊可以根據一定的算法被丟棄,且不會被重建。