Spark RDD持久化
RDD持久化工作原理
Spark非常重要的一個功能特性就是可以將RDD持久化在內存中。當對RDD執行持久化操作時,每個節點都會將自己操作的RDD的partition持久化到內存中,並且在之后對該RDD的反復使用中,直接使用內存緩存的partition。這樣的話,對於針對一個RDD反復執行多個操作的場景,就只要對RDD計算一次即可,后面直接使用該RDD,而不需要反復計算多次該RDD。
巧妙使用RDD持久化,甚至在某些場景下,可以將spark應用程序的性能提升10倍。對於迭代式算法和快速交互式應用來說,RDD持久化,是非常重要的。
要持久化一個RDD,只要調用其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接緩存在每個節點中。而且Spark的持久化機制還是自動容錯的,如果持久化的RDD的任何partition丟失了,那么Spark會自動通過其源RDD,使用transformation操作重新計算該partition。
cache()和persist()的區別在於,cache()是persist()的一種簡化方式,cache()的底層就是調用的persist()的無參版本,同時就是調用persist(MEMORY_ONLY),將數據持久化到內存中。如果需要從內存中去除緩存,那么可以使用unpersist()方法。
RDD持久化使用場景
1、第一次加載大量的數據到RDD中
2、頻繁的動態更新RDD Cache數據,不適合使用Spark Cache、Spark lineage
RDD持久化策略
持久化策略的選擇
默認情況下,性能最高的當然是MEMORY_ONLY,但前提是你的內存必須足夠足夠大,可以綽綽有余地存放下整個RDD的所有數據。因為不進行序列化與反序列化操作,就避免了這部分的性能開銷;對這個RDD的后續算子操作,都是基於純內存中的數據的操作,不需要從磁盤文件中讀取數據,性能也很高;而且不需要復制一份數據副本,並遠程傳送到其他節點上。但是這里必須要注意的是,在實際的生產環境中,恐怕能夠直接用這種策略的場景還是有限的,如果RDD中數據比較多時(比如幾十億),直接用這種持久化級別,會導致JVM的OOM內存溢出異常。
如果使用MEMORY_ONLY級別時發生了內存溢出,那么建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD數據序列化后再保存在內存中,此時每個partition僅僅是一個字節數組而已,大大減少了對象數量,並降低了內存占用。這種級別比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。但是后續算子可以基於純內存進行操作,因此性能總體還是比較高的。此外,可能發生的問題同上,如果RDD中的數據量過多的話,還是可能會導致OOM內存溢出的異常。
如果純內存的級別都無法使用,那么建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因為既然到了這一步,就說明RDD的數據量很大,內存無法完全放下。序列化后的數據比較少,可以節省內存和磁盤的空間開銷。同時該策略會優先盡量嘗試將數據緩存在內存中,內存緩存不下才會寫入磁盤。
通常不建議使用DISK_ONLY和后綴為_2的級別:因為完全基於磁盤文件進行數據的讀寫,會導致性能急劇降低,有時還不如重新計算一次所有RDD。后綴為_2的級別,必須將所有數據都復制一份副本,並發送到其他節點上,數據復制以及網絡傳輸會導致較大的性能開銷,除非是要求作業的高可用性,否則不建議使用。
測試案例
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
import org.apache.log4j.{Level, Logger} import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} /** * Spark RDD的持久化 */ object _01SparkPersistOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName()) val sc = new SparkContext(conf) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF) var start = System.currentTimeMillis() val linesRDD = sc.textFile("D:/data/spark/sequences.txt") // linesRDD.cache() // linesRDD.persist(StorageLevel.MEMORY_ONLY) // 執行第一次RDD的計算 val retRDD = linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // retRDD.cache() // retRDD.persist(StorageLevel.DISK_ONLY) retRDD.count() println("第一次計算消耗的時間:" + (System.currentTimeMillis() - start) + "ms") // 執行第二次RDD的計算 start = System.currentTimeMillis() // linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).count() retRDD.count() println("第二次計算消耗的時間:" + (System.currentTimeMillis() - start) + "ms") // 持久化使用結束之后,要想卸載數據 // linesRDD.unpersist() sc.stop() } }
設置相關的持久化策略,再觀察執行時間就可以有一個較為直觀的理解。
共享變量
提供了兩種有限類型的共享變量,廣播變量和累加器。
介紹之前,先直接看下面一個例子:
package cn.xpleaf.bigdata.spark.scala.core.p3
import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} /** * 共享變量 * 我們在dirver中聲明的這些局部變量或者成員變量,可以直接在transformation中使用, * 但是經過transformation操作之后,是不會將最終的結果重新賦值給dirver中的對應的變量。 * 因為通過action,觸發了transformation的操作,transformation的操作,都是通過 * DAGScheduler將代碼打包 序列化 交由TaskScheduler傳送到各個Worker節點中的Executor去執行, * 在transformation中執行的這些變量,是自己節點上的變量,不是dirver上最初的變量,我們只不過是將 * driver上的對應的變量拷貝了一份而已。 * * * 這個案例也反映出,我們需要有一些操作對應的變量,在driver和executor上面共享 * * spark給我們提供了兩種解決方案——兩種共享變量 * 廣播變量 * 累加器 */ object _02SparkShareVariableOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName()) val sc = new SparkContext(conf) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF) val linesRDD = sc.textFile("D:/data/spark/hello.txt") val wordsRDD = linesRDD.flatMap(_.split(" ")) var num = 0 val parisRDD = wordsRDD.map(word => { num += 1 println("map--->num = " + num) (word, 1) }) val retRDD = parisRDD.reduceByKey(_ + _) println("num = " + num) retRDD.foreach(println) println("num = " + num) sc.stop() } }
輸出結果如下:
num = 0
map--->num = 1 map--->num = 1 map--->num = 2 map--->num = 2 map--->num = 3 map--->num = 4 (hello,3) (you,1) (me,1) (he,1) num = 0
廣播變量
Spark的另一種共享變量是廣播變量。通常情況下,當一個RDD的很多操作都需要使用driver中定義的變量時,每次操作,driver都要把變量發送給worker節點一次,如果這個變量中的數據很大的話,會產生很高的傳輸負載,導致執行效率降低。使用廣播變量可以使程序高效地將一個很大的只讀數據發送給多個worker節點,而且對每個worker節點只需要傳輸一次,每次操作時executor可以直接獲取本地保存的數據副本,不需要多次傳輸。
這樣理解, 一個worker中的executor,有5個task運行,假如5個task都需要這從份共享數據,就需要向5個task都傳遞這一份數據,那就十分浪費網絡資源和內存資源了。使用了廣播變量后,只需要向該worker傳遞一次就可以了。
創建並使用廣播變量的過程如下:
在一個類型T的對象obj上使用SparkContext.brodcast(obj)方法,創建一個Broadcast[T]類型的廣播變量,obj必須滿足Serializable。 通過廣播變量的.value()方法訪問其值。 另外,廣播過程可能由於變量的序列化時間過程或者序列化變量的傳輸過程過程而成為瓶頸,而Spark Scala中使用的默認的Java序列化方法通常是低效的,因此可以通過spark.serializer屬性為不同的數據類型實現特定的序列化方法(如Kryo)來優化這一過程。
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
import org.apache.log4j.{Level, Logger} import org.apache.spark.broadcast.Broadcast import org.apache.spark.{SparkConf, SparkContext} /** * 使用Spark廣播變量 * * 需求: * 用戶表: * id name age gender(0|1) * * 要求,輸出用戶信息,gender必須為男或者女,不能為0,1 */ object _03SparkBroadcastOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName()) val sc = new SparkContext(conf) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF) val userList = List( "001,劉向前,18,0", "002,馮 劍,28,1", "003,李志傑,38,0", "004,郭 鵬,48,2" ) val genderMap = Map("0" -> "女", "1" -> "男") val genderMapBC:Broadcast[Map[String, String]] = sc.broadcast(genderMap) val userRDD = sc.parallelize(userList) val retRDD = userRDD.map(info => { val prefix = info.substring(0, info.lastIndexOf(",")) // "001,劉向前,18" val gender = info.substring(info.lastIndexOf(",") + 1) val genderMapValue = genderMapBC.value val newGender = genderMapValue.getOrElse(gender, "男") prefix + "," + newGender }) retRDD.foreach(println) sc.stop() } }
輸出結果如下:
001,劉向前,18,女
003,李志傑,38,女
002,馮 劍,28,男
004,郭 鵬,48,男
當然這個案例只是演示一下代碼的使用,並不能看出其運行的機制。
累加器
Spark提供的Accumulator,主要用於多個節點對一個變量進行共享性的操作。Accumulator只提供了累加的功能。但是確給我們提供了多個task對一個變量並行操作的功能。但是task只能對Accumulator進行累加操作,不能讀取它的值。只有Driver程序可以讀取Accumulator的值。
非常類似於在MR中的一個Counter計數器,主要用於統計各個程序片段被調用的次數,和整體進行比較,來對數據進行一個評估。
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Spark共享變量之累加器Accumulator * * 需要注意的是,累加器的執行必須需要Action觸發 */ object _04SparkAccumulatorOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName()) val sc = new SparkContext(conf) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF) // 要對這些變量都*7,同時統計能夠被3整除的數字的個數 val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13) val listRDD:RDD[Int] = sc.parallelize(list) var counter = 0 val counterAcc = sc.accumulator[Int](0) val mapRDD = listRDD.map(num => { counter += 1 if(num % 3 == 0) { counterAcc.add(1) } num * 7 }) // 下面這種操作又執行了一次RDD計算,所以可以考慮上面的方案,減少一次RDD的計算 // val ret = mapRDD.filter(num => num % 3 == 0).count() mapRDD.foreach(println) println("counter===" + counter) println("counterAcc===" + counterAcc.value) sc.stop() } }
輸出結果如下:
49
56
7
63
14
70
21
77
28
84
35
91
42
counter===0
counterAcc===4
原文鏈接:http://blog.51cto.com/xpleaf/2108614