1.1 Action操作
前邊提到的first() 、collect() 都是Action操作。常用的有:
collect():把數據返回驅動器程序中最簡單、最常見的操作, 通常在單元測試中使用,數據量不能太大,因為放在內存中,數據量大會內存溢出。
reduce():類似sum() ,如:val sum = rdd.reduce((x, y) => x + y) ,結果同sum
fold():和reduce() 類似,多一個“初始值”,當初始值為零時效果同reduce(). fold(0) = reduce()
take(n) :返回RDD 中的n 個元素,並且嘗試只訪問盡量少的分區。
top(n) :從RDD 中獲取前幾個元素
count() :用來返回元素的個數
countByValue() :返回一個從各值到值對應的計數的映射表
sum():返回匯總

啟動spark-shell

scala> val rdd=sc.parallelize(1 to 5) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd.count() res0: Long = 5 scala> rdd.first() res1: Int = 1 scala> rdd.take(2) res2: Array[Int] = Array(1, 2) scala> rdd.top(3) res3: Array[Int] = Array(5, 4, 3) scala> rdd.sum() res4: Double = 15.0 scala> rdd.fold(0)((x,y)=>x+y) res6: Int = 15 scala> rdd.fold(1)((x,y)=>x+y) res7: Int = 17
fold(n) 的執行原理:
每個分區里進行這樣的計算:初始值+sum(元素)
最后進行:初始值+sum(分區sum值)
初始值累加次數為分區數+1次
scala> val rdd = sc.parallelize(1 to 5,2) scala> rdd.fold(1)((x,y)=>x+y) res8: Int = 18 scala> rdd.fold(2)((x,y)=>x+y) res9: Int = 21
1.2 持久化函數persist()
RDD 是惰性(Lazy)求值的,當我們希望能多次使用同一個RDD時,RDD 調用行動操作,Spark 每次都會重算RDD 以及它的所有依賴, 這在迭代算法中消耗格外大。
如:
val rdd1 = rdd.map(x => x+1)
println(rdd1.first())
println(rdd1.count())
println(rdd1.sum())
println(rdd1.collect().mkString(","))
如果不做處理的話,每個Action函數執行時都會執行一遍rdd.map(x => x+1) ,消耗很大。
Spark提供rdd的persist()函數來解決這個重復計算的問題,persist()把需要重復使用的rdd存起來,這樣僅第一個Action操作才會計算,其他Action操作不需要再計算。
當我們執行rdd的persist()時,計算出RDD 的節點會分別保存它們所求出的分區數據。如果一個有持久化數據的節點發生故障,Spark 會在需要用到緩存的數據時重算丟失的數據分區。
rdd的persist()有5種持久化級別,分別是:


來自org.apache.spark.storage.StorageLevel 的定義。
默認情況下persist() 會把數據以序列化的形式緩存在JVM 的堆空間中。
這樣上方案例可以優化為:
val rdd1 = rdd.map(x => x+1)
rdd1.persist(StorageLevel.DISK_ONLY)
println(rdd1.first())
println(rdd1.count())
println(rdd1.sum())
println(rdd1.collect().mkString(","))
rdd1.unpersist() //釋放緩存,必須手工釋放
如果覺得數據過於重要,怕存一份有風險,則可以存2份:
rdd1.persist(StorageLevel.MEMORY_ONLY_2)
值得注意:
如果要緩存的數據太多,內存中放不下,Spark 會自動利用最近最少使用(LRU)的緩存
策略把最老的分區從內存中移除。但是對於僅把數據存放在內存中的緩存級別,下一次要用到
已經被移除的分區時,這些分區就需要重新計算。
不必擔心你的作業因為緩存了太多數據而被打斷。
