Spark基礎全解析


我的個人博客:https://www.luozhiyun.com/

為什么需要Spark?

MapReduce的缺陷

第一,MapReduce模型的抽象層次低,大量的底層邏輯都需要開發者手工完成。
第二,只提供Map和Reduce兩個操作。
舉個例子,兩個數據集的Join是很基本而且常用的功能,但是在MapReduce的世界中,需要對這兩個數據集 做一次Map和Reduce才能得到結果。
第三,在Hadoop中,每一個Job的計算結果都會存儲在HDFS文件存儲系統中,所以每一步計算都要進行硬 盤的讀取和寫入,大大增加了系統的延遲。
第四,只支持批數據處理,欠缺對流數據處理的支持。

Spark的優勢

Spark最基本的數據抽象叫作彈性分布式數據集(Resilient Distributed Dataset, RDD),它代表一個可以被 分區(partition)的只讀數據集,它內部可以有很多分區,每個分區又有大量的數據記錄(record)。

RDD是Spark最基本的數據結構。Spark提供了很多對RDD的操作,如Map、Filter、flatMap、groupByKey和Union等等,極大地提升了對各 種復雜場景的支持。開發者既不用再絞盡腦汁挖掘MapReduce模型的潛力,也不用維護復雜的MapReduce 狀態機。

相對於Hadoop的MapReduce會將中間數據存放到硬盤中,Spark會把中間數據緩存在內存中,從而減少了 很多由於硬盤讀寫而導致的延遲。

在任務(task)級別上,Spark的並行機制是多線程模型,而MapReduce是多進程模型。

多進程模型便於細粒度控制每個任務占用的資源,但會消耗較多的啟動時間。

而Spark同一節點上的任務以多線程的方式運行在一個JVM進程中,可以帶來更快的啟動速度、更高的CPU 利用率,以及更好的內存共享。

彈性分布式數據集(Resilient Distributed Dataset, RDD)

RDD表示已被分區、不可變的,並能夠被並行操作的數據集合。

分區

分區代表同一個RDD包含的數據被存儲在系統的不同節點中。邏輯上,我們可以認為RDD是一個大的數組。數組中的每個元素代表一個分區(Partition)。

在物理存儲中,每個分區指向一個存放在內存或者硬盤中的數據塊(Block),而這些數據塊是獨立的,它 們可以被存放在系統中的不同節點。

RDD中的每個分區存有它在該RDD中的index。通過RDD的ID和分區的index可以唯一確定對應數據塊的編 號,從而通過底層存儲層的接口中提取到數據進行處理。

不可變性

不可變性代表每一個RDD都是只讀的,它所包含的分區信息不可以被改變。

我們只可以對現有的RDD進行轉換轉換(Transformation)操作,得到新的RDD作為中間計算的結果。

如:

lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)

我們首先讀入文本文件data.txt,創建了第一個RDD lines,它的每一個元素是一行文 本。然后調用map函數去映射產生第二個RDD lineLengths,每個元素代表每一行簡單文本的字數。最后調 用reduce函數去得到第三個RDD totalLength,它只有一個元素,代表整個文本的總字數。

對於代表中間結果的RDD,我們需要記錄它是通過哪個RDD進行哪些轉 換操作得來,即依賴關系依賴關系,而不用立刻去具體存儲計算出的數據本身。

在一個有N步的計算模型中,如果記載第N步輸出RDD的節點發生故障,數據丟失,我們可以從第N-1 步的RDD出發,再次計算,而無需重復整個N步計算過程。

並行操作

Spark不需要將每個中間計算結果進行數據復制以防數據丟失,因為每一步產生的RDD里都會存儲它的依賴關系。

所以並行操作的前提是不同的RDD之間有着怎樣的依賴關系。

例如在一個有N步的計算模型中,第N-1 步的RDD就是第N步RDD的父RDD,相反則是子RDD。

Spark支持兩種依賴關系:窄依賴(Narrow Dependency)和寬依賴(Wide Dependency)。

窄依賴就是父RDD的分區可以一一對應到子RDD的分區,寬依賴就是父RDD的每個分區可以被多個子RDD的 分區使用。

顯然,窄依賴允許子RDD的每個分區可以被並行處理產生,而寬依賴則必須等父RDD的所有分區都被計算好 之后才能開始處理。

所以需要考慮以下兩點:

  • 窄依賴可以支持在同一個節點上鏈式執行多條命令,例如在執行了 map 后,緊接着執行filter。相反,寬依賴需要所有的父分區都是可用的,可能還需要調用類似MapReduce 之類的操作進行跨節點傳遞。
  • 從失敗恢復的角度考慮,窄依賴的失敗恢復更有效,因為它只需要重新計算丟失的父分區即可,而寬依賴牽涉到RDD各級的多個父分區。

檢查點(Checkpoint)

在計算過程中,對於一些計算過程比較耗時的RDD,我們可以將它緩存至硬盤或HDFS中,標記這個RDD有 被檢查點處理過,並且清空它的所有依賴關系。同時,給它新建一個依賴於CheckpointRDD的依賴關系,CheckpointRDD可以用來從硬盤中讀取RDD和生成新的分區信息。

當某個子RDD需要錯誤恢復時,回溯至該RDD,發現它被檢查點記錄過,就可以直接去硬盤中讀取這 個RDD,而無需再向前回溯計算。

存儲級別(Storage Level)

用來記錄RDD持久化時的存儲級別,常用的有以下幾個:

  • MEMORY_ONLY:只緩存在內存中,如果內存空間不夠則不緩存多出來的部分。這是RDD存儲級別的默認 值。
  • MEMORY_AND_DISK:緩存在內存中,如果空間不夠則緩存在硬盤中。
  • DISK_ONLY:只緩存在硬盤中。
  • MEMORY_ONLY_2和MEMORY_AND_DISK_2等:與上面的級別功能相同,只不過每個分區在集群中兩個節 點上建立副本。

RDD的數據操作

RDD的數據操作分為兩種:轉換(Transformation)和動作(Action)。

轉換(Transformation)

轉換是用來把一個RDD轉換成另一個RDD

Map
它把一個RDD中的所有數據通過一個函數,映射成一個新的RDD,任何原 RDD中的元素在新RDD中都有且只有一個元素與之對應。

rdd = sc.parallelize(["b", "a", "c"]) rdd2 = rdd.map(lambda x: (x, 1)) // [('b', 1), ('a', 1), ('c', 1)]

Filter
filter這個操作,是選擇原RDD里所有數據中滿足某個特定條件的數據,去返回一個新的RDD。

動作(Action)

動作則是通過計算返回一個結果

Reduce
它會把RDD中的元素根據一個輸入函數聚合起來。

from operator import add sc.parallelize([1, 2, 3, 4, 5]).reduce(add)// 15

Count
Count會返回RDD中元素的個數。

sc.parallelize([2, 3, 4]).count() // 3

Spark在每次轉換操作的時候,使用了新產生的 RDD 來記錄計算邏輯,這樣就把作用在 RDD 上的所有計算 邏輯串起來,形成了一個鏈條。當對 RDD 進行動作時,Spark 會從計算鏈的最后一個RDD開始,依次從上 一個RDD獲取數據並執行計算邏輯,最后輸出結果。

RDD的持久化(緩存)

每當我們對RDD調用一個新的action操作時,整個RDD都會從頭開始運算。因此,我們應該對多次使用的RDD進行一個持久化操作。

Spark的persist()和cache()方法支持將RDD的數據緩存至內存或硬盤中。

rdd = sc.parallelize([1, 2, 3, 4, 5]) 
rdd1 = rdd.map(lambda x: x+5) 
rdd2 = rdd1.filter(lambda x: x % 2 == 0) 
rdd2.persist() 
count = rdd2.count() // 3 
first = rdd2.first() // 6 
rdd2.unpersist()

在緩存RDD的時候,它所有的依賴關系也會被一並存下來。所以持久化的RDD有自動的容錯機制。如果RDD 的任一分區丟失了,通過使用原先創建它的轉換操作,它將會被自動重算。

持久化可以選擇不同的存儲級別。正如我們講RDD的結構時提到的一樣,有MEMORY_ONLY,MEMORY_AND_DISK,DISK_ONLY等。cache()方法會默認取MEMORY_ONLY這一級別。

Spark SQL

如上圖所示,Spark SQL提供類似於SQL的操作接口,允許數據倉庫應用程序直接獲取數據,允許使用者通過命令行 操作來交互地查詢數據,還提供兩個API:DataFrame API和DataSet API。

DataSet API

DataSet也是不可變分布式的數據單元,它既有與RDD類似的各種轉換和動作函 數定義,而且還享受Spark SQL優化過的執行引擎,使得數據搜索效率更高。

DataSet支持的轉換和動作也和RDD類似,比如map、filter、select、count、show及把數據寫入文件系統 中。

DataSet上的轉換操作也不會被立刻執行,只是先生成新的DataSet,只有當遇到動作操作,才會把 之前的轉換操作一並執行,生成結果。

當動作操作執行時,Spark SQL的查詢優化器會優化這個邏輯計划,並生成一個可以分布式執行的、包含分 區信息的物理計划。

DataSet所描述的數據都被組織到有名字的列中。

如上圖所示,左側的RDD雖然以People為類型參數,但Spark框架本身不了解People類的內部結構。所有的 操作都以People為單位執行。

而右側的DataSet卻提供了詳細的結構信息與每列的數據類型

其次,由於DataSet存儲了每列的數據類型。所以,在程序編譯時可以執行類型檢測。

DataFrame API

DataFrame可以被看作是一種特殊的DataSet。它也是關系型數據庫中表一樣的結構化存儲機制,也是分布 式不可變的數據結構。

DataFrame每一行的類型固定為 Row,他可以被當作DataSet[Row]來處理,我們必須要通過解析才能獲取各列的值。

RDD API、DataFrame API、DataSet API對比

在性能方面,DataFrame和DataSet的性能要比RDD更好。

Spark程序運行時,Spark SQL中的查詢優化器會對語句進行分析,並生成優化過的RDD在底層執行。

對於錯誤檢測而言,RDD和DataSet都是類型安全的,而DataFrame並不是類型安全的。這是因為它不存儲每一列的信息如名字 和類型。

Spark Streaming

無論是DataFrame API還是DataSet API,都是基於批處理模式對靜態數據進行處理的。比如,在每天 某個特定的時間對一天的日志進行處理分析。

而Spark Streaming就是針對流處理的組件。

Spark Streaming的原理

Spark Streaming會像微積分一樣用時間片拆分了無限的數據流,然后對每一個數據片用類似於批處理的方法進行處理,輸 出的數據也是一塊一塊的。

Spark Streaming提供一個對於流數據的抽象DStream。DStream可以由來自Apache Kafka、Flume或者 HDFS的流數據生成,也可以由別的DStream經過各種轉換操作得來。

底層DStream也是由很多個序列化的RDD構成,按時間片(比如一秒)切分成的每個數據單位都是一 個RDD。然后,Spark核心引擎將對DStream的Transformation操作變為針對Spark中對 RDD的 Transformation操作,將RDD經過操作變成中間結果保存在內存中。

DStream

下圖就是DStream的內部形式,即一個連續的RDD序列,每一個RDD代表一個時間窗口的輸入數據流。

對DStream的轉換操作,意味着對它包含的每一個RDD進行同樣的轉換操作。比如下邊的例子。

sc = SparkContext(master, appName) ssc = StreamingContext(sc, 1) lines = sc.socketTextStream("localhost", 9999) words = lines.flatMap(lambda line: line.split(" "))

上面的操作本質上,對一個DStream進行flatMap操作,就是對它里邊的每一個RDD進行flatMap操作,生成了一系列新 的RDD,構成了一個新的代表詞語的DStream。

滑動窗口操作

任何Spark Streaming的程序都要首先創建一個StreamingContext的對象,它是所有Streaming操作的入口。

StreamingContext中最重要的參數是批處理的時間間隔,即把流數據細分成數據塊的粒度。

這個時間間隔決定了流處理的延遲性,所以,需要我們根據需求和資源來權衡間隔的長度。

滑動窗口操作有兩個基本參數:

  • 窗口長度(window length):每次統計的數據的時間跨度。
  • 滑動間隔(sliding interval):每次統計的時間間隔。
    由於Spark Streaming流處理的最小時間單位就是StreamingContext的時間間隔,所以這兩個參數一 定是它的整數倍。

比如,對熱點搜索詞語進行統計,每隔10秒鍾輸出過去60秒內排名前十位的熱點詞。
統計窗口長度就是60s,滑動間隔就是10s。

Spark Streaming的優缺點

優點

  • 借助RDD,能夠支持RDD所有操作,比如map、flatMap、filter、 union等。
  • 借助RDD,能實現數據容錯性。
  • 可以和Spark的核心引擎、Spark SQL、MLlib等無 縫銜接。
    缺點
  • 實時計算延遲較高,一般在秒的級別

Structured Streaming

2016年,Spark在其2.0版本中推出了結構化流數據處理的模塊Structured Streaming。

Structured Streaming是基於Spark SQL引擎實現的,依靠Structured Streaming,在開發者眼里,流數據和 靜態數據沒有區別。我們完全可以像批處理靜態數據那樣去處理流數據。

Structured Streaming模型

Spark Streaming就是把流數據按一定的時間間隔分割成許多個小的數據塊進行批處理。

而在Structured Streaming的模型中,我們要把數據看成一個無邊界的關系型的數據表。每一個數據都是表中的一行,不斷會有新的數據行被添加到表里來。

Structured Streaming的三種輸出模式:

  1. 完全模式(Complete Mode):整個更新過的輸出表都被寫入外部存儲;
  2. 附加模式(Append Mode):上一次觸發之后新增加的行才會被寫入外部存儲。如果老數據有改動則不 適合這個模式;
  3. 更新模式(Update Mode):上一次觸發之后被更新的行才會被寫入外部存儲。

需要注意的是,Structured Streaming並不會完全存儲輸入數據。每個時間間隔它都會讀取最新的輸入,進 行處理,更新輸出表,然后把這次的輸入刪除。Structured Streaming只會存儲更新輸出表所需要的信息。

Structured Streaming與Spark Streaming對比

簡易度和性能
Spark Streaming提供的DStream API與RDD API很類似,相對比較低level。

而Structured Streaming提供的DataFrame API就是這么一個相對高level的API,大部分開發者都很熟悉關系型 數據庫和SQL。這樣的數據抽象可以讓他們用一套統一的方案去處理批處理和流處理,不用去關心具體的執 行細節。

而且,DataFrame API是在Spark SQL的引擎上執行的,Spark SQL有非常多的優化功能。

實時性
Structured Streaming它更像是實時處理,能做到用更小的時間間 隔,最小延遲在100毫秒左右。

而且在Spark 2.3版本中,Structured Streaming引入了連續處理的模式,可以做到真正的毫秒級延遲。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM