Spark 對數據的核心抽象——彈性分布式數據集(Resilient Distributed Dataset,簡稱 RDD)。RDD 其實就是分布式的元素集合。在 Spark 中,對數據的所有操作不外乎創建 RDD、轉化已有 RDD 以及調用 RDD 操作進行求值。而在這一切背后,Spark 會自動將RDD 中的數據分發到集群上,並將操作並行化執行。
一、RDD基礎
Spark 中的 RDD 就是一個不可變的分布式對象集合。每個 RDD 都被分為多個分區,這些分區運行在集群中的不同節點上。RDD 可以包含 Python、Java、Scala 中任意類型的對象,甚至可以包含用戶自定義的對象。用戶可以使用兩種方法創建 RDD:讀取一個外部數據集,或在驅動器程序里分發驅動器程序中的對象集合(比如 list 和 set)。
創建出來后,RDD 支持兩種類型的操作:轉化操作(transformation)和行動操作(action)。轉化操作會由一個 RDD 生成一個新的 RDD。另一方面,行動操作會對 RDD 計算出一個結果,並把結果返回到驅動器程序中,或把結果存儲到外部存儲系統(如 HDFS)中。
轉化操作和行動操作的區別在於 Spark 計算 RDD 的方式不同。雖然你可以在任何時候定義新的 RDD,但 Spark 只會惰性計算這些 RDD。它們只有第一次在一個行動操作中用到時,才會真正計算。這種策略剛開始看起來可能會顯得有些奇怪,不過在大數據領域是很有道理的。例如 Spark 在我們運行 lines = sc.textFile(...) 時就把文件中所有的行都讀取並存儲起來,就會消耗很多存儲空間,而我們馬上就要篩選掉其中的很多數據。相反, 一旦 Spark 了解了完整的轉化操作鏈之后,它就可以只計算求結果時真正需要的數據。事實上,在行動操作 first() 中,Spark 只需要掃描文件直到找到第一個匹配的行為止,而不需要讀取整個文件。
最后,默認情況下,Spark 的 RDD 會在你每次對它們進行行動操作時重新計算。如果想在多個行動操作中重用同一個 RDD,可以使用 RDD.persist() 讓 Spark 把這個 RDD 緩存下來。我們可以讓 Spark 把數據持久化到許多不同的地方。在第一次對持久化的 RDD 計算之后,Spark 會把 RDD 的內容保存到內存中(以分區方式存儲到集群中的各機器上),這樣在之后的行動操作中,就可以重用這些數據了。
在任何時候都能進行重算是我們為什么把 RDD 描述為“彈性”的原因。當保存 RDD 數據的一台機器失敗時,Spark 還可以使用這種特性來重算出丟掉的分區,這一過程對用戶是完全透明的。
總的來說,每個 Spark 程序或 shell 會話都按如下方式工作。
(1) 從外部數據創建出輸入 RDD。
(2) 使用諸如 filter() 這樣的轉化操作對 RDD 進行轉化,以定義新的 RDD。
(3) 告訴 Spark 對需要被重用的中間結果 RDD 執行 persist() 操作。
(4) 使用行動操作(例如 count() 和 first() 等)來觸發一次並行計算,Spark 會對計算進行優化后再執行。
二、創建RDD
Spark 提供了兩種創建 RDD 的方式:讀取外部數據集,以及在驅動器程序中對一個集合進行並行化。更常用的方式是從外部存儲中讀取數據來創建 RDD。
// 初始化Spark
// local 集群url
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
// 第一種方式創建RDD
// val lines = sc.textFile("words.txt")
// 第二種方式創建RDD
val lines = sc.parallelize(List("pandas","i like pandas"))
println(lines.count());
println(lines.first())

三、RDD操作
RDD 支持兩種操作:轉化操作和行動操作。RDD 的轉化操作是返回一個新的 RDD 的操作,比如 map() 和 filter() ,而行動操作則是向驅動器程序返回結果或把結果寫入外部系統的操作,會觸發實際的計算,比如 count() 和 first() 。Spark 對待轉化操作和行動操作的方式很不一樣,因此理解你正在進行的操作的類型是很重要的。如果對於一個特定的函數是屬於轉化操作還是行動操作感到困惑,你可以看看它的返回值類型:轉化操作返回的是 RDD,而行動操作返回的是其他的數據類型。
1、轉化操作
RDD 的轉化操作是返回新 RDD 的操作。我們會在 上面講到,轉化出來的 RDD 是惰性求值的,只有在行動操作中用到這些 RDD 時才會被計算。許多轉化操作都是針對各個元素的,也就是說,這些轉化操作每次只會操作 RDD 中的一個元素。不過並不是所有的轉化操作都是這樣的。舉個例子,假定我們有一個日志文件 log.txt,內含有若干消息,希望選出其中的錯誤消息。我們可以使用前面說過的轉化操作 filter() 。

val conf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(conf)
// val lines = sc.parallelize(List("pandas","i like pandas"))
// *****RDD轉化操作*********
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("ERROR"))
val warningRDD = inputRDD.filter(line => line.contains("WARN"))
val badLinesRDD = errorsRDD.union(warningRDD)
// *****RDD行動操作***********
val x = badLinesRDD.count()
println("Input had "+badLinesRDD.count()+" concerning lines")
println("Here are x examples:")
badLinesRDD.take(10).foreach(println)


通過轉化操作,你從已有的 RDD 中派生出新的 RDD,Spark 會使用譜系圖(lineage graph)來記錄這些不同 RDD 之間的依賴關系。Spark 需要用這些信息來按需計算每個 RDD,也可以依靠譜系圖在持久化的 RDD 丟失部分數據時恢復所丟失的數據。下面展示上面代碼中創建出的RDD譜系圖:

2、行動操作
我們已經看到了如何通過轉化操作從已有的 RDD 創建出新的 RDD,不過有時,我們希望對數據集進行實際的計算。行動操作是第二種類型的 RDD 操作,它們會把最終求得的結果返回到驅動器程序,或者寫入外部存儲系統中。由於行動操作需要生成實際的輸出,它
們會強制執行那些求值必須用到的 RDD 的轉化操作。我們可能想輸出關於 badLinesRDD 的一些信息。為此,需要使用兩個行動操作來實現:用 count() 來返回計數結果,用 take() 來收集RDD 中的一些元素。這些代碼在上面一節中的示例代碼已經體現。
除此之外,只有當你的整個數據集能在單台機器的內存中放得下時,才能使用 collect() ,因此, collect() 不能用在大規模數據集上。在大多數情況下,RDD 不能通過 collect() 收集到驅動器進程中,因為它們一般都很大。此時,我們通常要把數據寫到諸如 HDFS 或 Amazon S3 這樣的分布式的存儲系統中。你可以使用 saveAsTextFile() 、 saveAsSequenceFile() ,或者任意的其他行動操作來把 RDD 的數據內容以各種自帶的格式保存起來。
3、惰性求值
RDD 的轉化操作都是惰性求值的。這意味着在被調用行動操作之前 Spark 不會開始計算。這對新用戶來說可能與直覺有些相違背之處,但是對於那些使用過諸如 Haskell等函數式語言或者類似 LINQ 這樣的數據處理框架的人來說,會有些似曾相識。
惰性求值意味着當我們對 RDD 調用轉化操作(例如調用 map() )時,操作不會立即執行。相反,Spark 會在內部記錄下所要求執行的操作的相關信息。我們不應該把 RDD 看作存放着特定數據的數據集,而最好把每個 RDD 當作我們通過轉化操作構建出來的、記錄如何計算數據的指令列表。把數據讀取到 RDD 的操作也同樣是惰性的。因此,當我們調用sc.textFile() 時,數據並沒有讀取進來,而是在必要時才會讀取。和轉化操作一樣的是,讀取數據的操作也有可能會多次執行。
雖然轉化操作是惰性求值的,但還是可以隨時通過運行一個行動操作來強制Spark 執行 RDD 的轉化操作,比如使用 count() 。這是一種對你所寫的程序進行部分測試的簡單方法。
Spark 使用惰性求值,這樣就可以把一些操作合並到一起來減少計算數據的步驟。在類似Hadoop MapReduce 的系統中,開發者常常花費大量時間考慮如何把操作組合到一起,以減少 MapReduce 的周期數。而在 Spark 中,寫出一個非常復雜的映射並不見得能比使用很多簡單的連續操作獲得好很多的性能。因此,用戶可以用更小的操作來組織他們的程序,這樣也使這些操作更容易管理。
四、常見的轉化操作和行動操作
對一個數據為{1, 2, 3, 3}的RDD進行基本的RDD轉化操作

對數據分別為{1, 2, 3}和{3, 4, 5}的RDD進行針對兩個RDD的轉化操作

對一個數據為{1, 2, 3, 3}的RDD進行基本的RDD行動操作

五、持久化(緩存)
Spark RDD 是惰性求值的,而有時我們希望能多次使用同一個 RDD。如果簡單地對 RDD 調用行動操作,Spark 每次都會重算 RDD 以及它的所有依賴。這在迭代算法中消耗格外大,因為迭代算法常常會多次使用同一組數據。下面 就是先對 RDD 作一次計數、再把該 RDD 輸出的一個小例子。
val conf = new SparkConf().setAppName("wordcount").setMaster("local")
val sc = new SparkContext(conf)
val input = sc.parallelize(List(1,2,3,4))
val result = input.map(x=>x*x)
// 兩次執行RDD轉化操作
println(result.count())
println(result.collect().mkString(","))
為了避免多次計算同一個 RDD,可以讓 Spark 對數據進行持久化。當我們讓 Spark 持久化存儲一個 RDD 時,計算出 RDD 的節點會分別保存它們所求出的分區數據。如果一個有持久化數據的節點發生故障,Spark 會在需要用到緩存的數據時重算丟失的數據分區。如果希望節點故障的情況不會拖累我們的執行速度,也可以把數據備份到多個節點上。出於不同的目的,我們可以為 RDD 選擇不同的持久化級別。在 Scala 中,默認情況下 persist() 會把數據以序列化的形式緩存在 JVM 的堆空間中。cache() 與使用默認存儲級別調用 persist() 是一樣的。
org.apache.spark.storage.StorageLevel 中的持久化級別;如有必要,可以通過在存儲級別的末尾加上“_2”來把持久化數據存為兩份

// 在Scala中使用persist()
val conf = new SparkConf().setAppName("wordcount").setMaster("local")
val sc = new SparkContext(conf)
val input = sc.parallelize(List(1,2,3,4))
val result = input.map(x=>x*x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count()) // 輸出 4
println(result.collect().mkString(",")) // 輸出1,4,9,16
我們在第一次對這個 RDD 調用行動操作前就調用了 persist() 方法。 persist() 調用本身不會觸發強制求值。
如果要緩存的數據太多,內存中放不下,Spark 會自動利用最近最少使用(LRU)的緩存策略把最老的分區從內存中移除。對於僅把數據存放在內存中的緩存級別,下一次要用到已經被移除的分區時,這些分區就需要重新計算。但是對於使用內存與磁盤的緩存級別的
分區來說,被移除的分區都會寫入磁盤。不論哪一種情況,都不必擔心你的作業因為緩存了太多數據而被打斷。不過,緩存不必要的數據會導致有用的數據被移出內存,帶來更多重算的時間開銷。
最后,RDD 還有一個方法叫作 unpersist() ,調用該方法可以手動把持久化的 RDD 從緩存中移除。
這篇博文主要來自《Spark快速大數據分析》這本書里面的第三章,內容有刪減,還有本書的一些代碼的實驗結果。
