1.RDDs是什么
Resilient distributed datasets(彈性分布式數據集) 。RDDs並行的分布在整個集群中,是Spark分發數據和計算的基礎抽象類,一個RDD是一個不可改變的分布式集合對象,Spark中,所有的計算都是通過RDDs的創建,轉換操作完成的,一個RDD內部由許多partitions(分片)組成。
分片:每個分片包括一部分數據,partitions可在集群不同節點上計算;分片是Spark並行處理的單元,Spark順序的,並行的處理分片。
2.RDDs的創建
(1) 把一個存在的集合傳給SparkContext的parallelize()方法,測試用
val rdd=sc.parallelize(Array(1,2,3,4),4)
第一個參數:待並行化處理的集合;第二個參數:分片個數
(2) 加載外部數據集
val rddText=sc.textFile("hellospark.txt")
3.RDD基本操作之Transformation(轉換)
從之前的RDD構建一個新的RDD,像map()和filter()
(1)逐元素Transformation
map(): 接收函數,把函數應用到RDD的每一個元素,返回新RDD。
val lines=sc.parallelize(Array("hello","spark","hello","world","!")) val lines2=lines.map(word=>(word,1)) lines2.foreach(println) //結果: (hello,1) (spark,1) (hello,1) (world,1) (!,1)
filter(): 接收函數,返回只包含滿足filter()函數的元素的新RDD。
val lines=sc.parallelize(Array("hello","spark","hello","world","!")) val lines3=lines.filter(word=>word.contains("hello")) lines3.foreach(println) //結果: hello hello
flatMap(): 對每個輸入元素,輸出多個輸出元素。flat壓扁的意思,將RDD中元素壓扁后返回一個新的RDD。
val inputs=sc.textFile("/home/lucy/hellospark.txt") val lines=inputs.flatMap(line=>line.split(" ")) lines.foreach(println) //結果 hello spark hello world hello ! //文件內容/home/lucy/hellospark.txt hello spark hello world hello !
(2)集合運算
RDDs支持數學集合的計算,例如並集,交集計算
val rdd1=sc.parallelize(Array("red","red","blue","black","white")) val rdd2=sc.parallelize(Array("red","grey","yellow")) //去重: val rdd_distinct=rdd1.distinct() //去重結果: white blue red black //並集: val rdd_union=rdd1.union(rdd2) //並集結果: red blue black white red grey yellow //交集: val rdd_inter=rdd1.intersection(rdd2) //交集結果: red //包含: val rdd_sub=rdd1.subtract(rdd2) //包含結果: blue white black
4.RDD基本操作之Action
在RDD上計算出來一個結果。把結果返回給driver program或保存在文件系統,count(),save。
函數名 功能 例子 結果
collect() 返回RDD的所有元素 rdd.collect() {1,2,3,3}
count() 計數 rdd.count() 4
countByValue() 返回一個map表示唯一元素出現的個數 rdd.countByValue() {(1,1),(2,1),(3,2)}
take(num) 返回幾個元素 rdd.take(2) {1,2}
top(num) 返回前幾個元素 rdd.top(2) {3,3}
takeOrdered 返回基於提供的排序算法的前幾個元素 rdd.takeOrdered(2)(myOrdering) {3,3}
(num)(ordering)
takeSample 取樣例 rdd.takeSample(false,1) 不確定
(withReplacement,num,[seed])
reduce(func) 合並RDD中元素 rdd.reduce((x,y)=>x+y) 9
fold(zero)(func) 與reduce()相似提供zero value rdd.fold(0)((x,y)=>x+y) 9
foreach(func) 對RDD的每個元素作用函數,什么也不返回 rdd.foreach(func) 無
5.RDDs的特性
1.血統關系圖:
Spark維護着RDDs之間的依賴關系和創建關系,叫做血統關系圖,Spark使用血統關系圖來計算每個RDD的需求和恢復丟失的數據
2.延遲計算
Spark對RDDs的計算是,他們第一次使用action操作的時候。Spark內部記錄metadata表明transformations操作已經被響應了。加載數據也是延遲計算,數據只有在必要的時候,才會被加載進去。
3.RDD持久化
Spark最重要的一個功能是它可以通過各種操作持久化(或者緩存)一個集合到內存中。當持久化一個RDD的時候,每一個節點都將參與計算的所有分區數據存儲到內存中,並且這些數據可以被這個集合(以及這個集合衍生的其他集合)的動作(action)重復利用。這個能力使后續的動作速度更快(通常快10倍以上)。對迭代算法和快速的交互使用來說,緩存是一個關鍵的工具。
可以通過persist()
或者cache()
方法持久化一個rdd。首先,在action中計算得到rdd;然后,將其保存在每個節點的內存中。Spark的緩存是一個容錯的技術-如果RDD的任何一個分區丟失,它可以通過原有的轉換(transformations)操作自動的重復計算並且創建出這個分區。
此外,可以利用不同的存儲級別存儲每一個被持久化的RDD。例如,它允許我們持久化集合到磁盤上、將集合作為序列化的Java對象持久化到內存中、在節點間復制集合或者存儲集合到Tachyon中。我們可以通過傳遞一個StorageLevel
對象給persist()
方法設置這些存儲級別。cache()
方法使用了默認的存儲級別—StorageLevel.MEMORY_ONLY
。完整的存儲級別如下:
Storage Level | Meaning |
---|---|
DISK_ONLY,&_2 | 僅僅將RDD分區存儲到磁盤中 |
MEMORY_ONLY,&_2 | 將RDD作為非序列化的Java對象存儲在jvm中。如果內存裝不下原始文件那么大的數據,一些分區將不會被緩存,從而在每次需要這些分區時都需重新計算它們。這是系統默認的存儲級別。 |
MEMORY_ONLY_SER,&_2 | 將RDD作為序列化的Java對象存儲(每個分區一個byte數組)。這種方式比非序列化方式更節省空間,特別是用到快速的序列化工具時,但是會更耗費cpu資源—密集的讀操作。 |
MEMORY_AND_DISK,&_2 | 將RDD作為非序列化的Java對象存儲在jvm中。如果內存裝不下原始文件那么大的數據,將這些不適合存在內存中的分區存儲在磁盤中,每次需要時讀出它們。 |
MEMORY_AND_DISK_SER,&_2 | 和MEMORY_ONLY_SER類似,但不是在每次需要時重復計算這些不適合存儲到內存中的分區,而是將這些分區存儲到磁盤中。 |
OFF_HEAP (experimental) | 以序列化的格式存儲RDD到Tachyon中。相對於MEMORY_ONLY_SER,OFF_HEAP減少了垃圾回收的花費,允許更小的執行者共享內存池。這使其在擁有大量內存的環境下或者多並發應用程序的環境中具有更強的吸引力。 |
NOTE:在python中,存儲的對象都是通過Pickle庫序列化了的,所以是否選擇序列化等級並不重要。
Spark也會自動持久化一些shuffle操作(如reduceByKey
)中的中間數據,即使用戶沒有調用persist
方法。這樣的好處是避免了在shuffle出錯情況下,需要重復計算整個輸入。
如何選擇存儲級別?
1.如果你的RDD適合默認的存儲級別(MEMORY_ONLY),就選擇默認的存儲級別。因為這是cpu利用率最高的選項,會使RDD上的操作盡可能的快。
2.如果不適合用默認的級別,選擇MEMORY_ONLY_SER。選擇一個更快的序列化庫提高對象的空間使用率,但是仍能夠相當快的訪問。
3.除非函數計算RDD的花費較大或者它們需要過濾大量的數據,不要將RDD存儲到磁盤上,否則,重復計算一個分區就會和重磁盤上讀取數據一樣慢。
4.如果你希望更快的錯誤恢復,可以利用重復(replicated)存儲級別。所有的存儲級別都可以通過重復計算丟失的數據來支持完整的容錯,但是重復的數據能夠使你在RDD上繼續運行任務,而不需要重復計算丟失的數據。
Spark自動的監控每個節點緩存的使用情況,利用最近最少使用原則刪除老舊的數據。如果你想手動的刪除RDD,可以使用RDD.unpersist()
方法
6.KeyValue對RDDs
創建KeyValue對RDDs:
val rdd3=sc.parallelize(Array((1,2),(3,4),(3,6)))
KeyValue對RDDs的Transformation(轉換):
(1)reduceByKey(func) 把相同key的結合
val rdd4=rdd3.reduceByKey((x,y)=>x+y)
//結果
(1,2)
(3,10)
(2)groupByKey 把相同的key的values分組
val rdd5=rdd3.groupByKey()
//結果
(1,CompactBuffer(2))
(3,CompactBuffer(4, 6))
(3)mapValues() 函數作用於pairRDD的每個元素,key不變
val rdd6=rdd3.mapValues(x=>x+1)
//結果
(1,3)
(3,5)
(3,7)
(4)keys/values
rdd3.keys.foreach(println)
1
3
3
rdd3.values.foreach(println)
2
4
6
(5)sortByKey
val rdd7=rdd3.sortByKey()
//結果
(1,2)
(3,4)
(3,6)
(6)combineByKey(): (createCombiner,mergeValue,mergeCombiners,partitioner)
最常用的基於key的聚合函數,返回的類型可以與輸入類型不一樣。許多基於key的聚合函數都用到了它,像groupByKey()
原理:遍歷partition中的元素,元素的key,要么之前見過的,要么不是。如果是新元素,使用我們提供的createCombiner()函數,如果是這個partition中已經存在的key,就會使用mergeValue()函數,合計每個partition的結果的時候,使用mergeCombiner()函數
例子:求平均值
val score=sc.parallelize(Array(("Tom",80.0),("Tom",90.0),("Tom",85.0),("Ben",85.0),("Ben",92.0),("Ben",90.0))) val score2=score.combineByKey(score=>(1,score),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double),c2:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2)) //結果 (Ben,(3,267.0)) (Tom,(3,255.0))
val average=score2.map{case(name,(num,score))=>(name,score/num)} //結果 (Ben,89.0) (Tom,85.0)
7.RDD依賴
Spark中RDD的高效與DAG圖有着莫大的關系,在DAG調度中需要對計算過程划分stage,而划分依據就是RDD之間的依賴關系。針對不同的轉換函數,RDD之間的依賴關系分類窄依賴(narrow dependency)和寬依賴(wide dependency, 也稱 shuffle dependency).
窄依賴是指父RDD的每個分區只被子RDD的一個分區所使用;寬依賴是指父RDD的每個分區都可能被多個子RDD分區所使用:
寬依賴和窄依賴如下圖所示:
這種划分有兩個用處。首先,窄依賴支持在一個結點上管道化執行。例如基於一對一的關系,可以在filter之后執行map。其次,窄依賴支持更高效的故障還原。因為對於窄依賴,只有丟失的父RDD的分區需要重新計算。
對於寬依賴,一個結點的故障可能導致來自所有父RDD的分區丟失,因此就需要完全重新執行。因此對於寬依賴,Spark會在持有各個父分區的結點上,將中間數據持久化來簡化故障還原,就像MapReduce會持久化map的輸出一樣。