5.RDD的Action操作和持久化persist()


1.1 Action操作

前邊提到的first() 、collect() 都是Action操作。常用的有:

collect():把數據返回驅動器程序中最簡單、最常見的操作, 通常在單元測試中使用,數據量不能太大,因為放在內存中,數據量大會內存溢出。

reduce():類似sum() ,如:val sum = rdd.reduce((x, y) => x + y) ,結果同sum

fold():和reduce() 類似,多一個“初始值”,當初始值為零時效果同reduce().    fold(0) = reduce()

take(n) :返回RDD 中的n 個元素,並且嘗試只訪問盡量少的分區。

top(n) :從RDD 中獲取前幾個元素

count() :用來返回元素的個數

countByValue() :返回一個從各值到值對應的計數的映射表

sum():返回匯總

 

 

 

 

 

 

 

啟動spark-shell

 

 

 

 

 

 

scala> val rdd=sc.parallelize(1 to 5) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd.count() res0: Long = 5 scala> rdd.first() res1: Int = 1 scala> rdd.take(2) res2: Array[Int] = Array(1, 2) scala> rdd.top(3) res3: Array[Int] = Array(5, 4, 3) scala> rdd.sum() res4: Double = 15.0 scala> rdd.fold(0)((x,y)=>x+y) res6: Int = 15 scala> rdd.fold(1)((x,y)=>x+y) res7: Int = 17

 

 

fold(n) 的執行原理:

每個分區里進行這樣的計算:初始值+sum(元素)

最后進行:初始值+sum(分區sum值)

初始值累加次數為分區數+1次

 

scala> val rdd = sc.parallelize(1 to 5,2)

scala> rdd.fold(1)((x,y)=>x+y)

res8: Int = 18

scala> rdd.fold(2)((x,y)=>x+y)

res9: Int = 21

 

 

 

1.2 持久化函數persist()

RDD 是惰性(Lazy)求值的,當我們希望能多次使用同一個RDD時,RDD 調用行動操作,Spark 每次都會重算RDD 以及它的所有依賴, 這在迭代算法中消耗格外大。

如:

val rdd1 = rdd.map(x => x+1)

println(rdd1.first())

println(rdd1.count())

println(rdd1.sum())

println(rdd1.collect().mkString(","))

 

如果不做處理的話,每個Action函數執行時都會執行一遍rdd.map(x => x+1) ,消耗很大。

 

Spark提供rdd的persist()函數來解決這個重復計算的問題,persist()把需要重復使用的rdd存起來,這樣僅第一個Action操作才會計算,其他Action操作不需要再計算。

 

當我們執行rdd的persist()時,計算出RDD 的節點會分別保存它們所求出的分區數據。如果一個有持久化數據的節點發生故障,Spark 會在需要用到緩存的數據時重算丟失的數據分區。

rdd的persist()有5種持久化級別,分別是:

 

 

 

 

來自org.apache.spark.storage.StorageLevel 的定義。

默認情況下persist() 會把數據以序列化的形式緩存在JVM 的堆空間中。

 

這樣上方案例可以優化為:

val rdd1 = rdd.map(x => x+1)

rdd1.persist(StorageLevel.DISK_ONLY)

println(rdd1.first())

println(rdd1.count())

println(rdd1.sum())

println(rdd1.collect().mkString(","))

rdd1.unpersist()    //釋放緩存,必須手工釋放

 

 

如果覺得數據過於重要,怕存一份有風險,則可以存2份:

rdd1.persist(StorageLevel.MEMORY_ONLY_2)

 

 

 

值得注意:

如果要緩存的數據太多,內存中放不下,Spark 會自動利用最近最少使用(LRU)的緩存

策略把最老的分區從內存中移除。但是對於僅把數據存放在內存中的緩存級別,下一次要用到

已經被移除的分區時,這些分區就需要重新計算。

不必擔心你的作業因為緩存了太多數據而被打斷。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM