spark持久化:cache 、persist、checkpoint
一、cache持久化
cache實際上是persist的一種簡化方式,是一種懶執行的,執行action類算子才會觸發,cahce后返回值要賦值給一個變量,下一個job直接基於變量進行操作。
cache操作:
public class Persist_Cache { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local") .setAppName("Persist"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> stringJavaRDD = sc.textFile("E:/2018_cnic/learn/wordcount.txt"); JavaRDD<String> cache = stringJavaRDD.cache(); long startTime = System.currentTimeMillis(); long count = cache.count(); long endTime = System.currentTimeMillis(); System.out.println("no cahce duration:"+(endTime-startTime)); long startTime1 = System.currentTimeMillis(); long count1 = cache.count(); long endTime1 = System.currentTimeMillis(); System.out.println("cahce duration:"+(endTime1-startTime1)); } }
結果輸出:
19/04/29 13:57:40 INFO DAGScheduler: Job 0 finished: count at Persist_Cache.java:16, took 0.202060 s no cahce duration:248
19/04/29 13:57:40 INFO SparkContext: Starting job: count at Persist_Cache.java:21
19/04/29 13:57:40 INFO DAGScheduler: Got job 1 (count at Persist_Cache.java:21) with 1 output partitions 19/04/29 13:57:40 INFO DAGScheduler: Final stage: ResultStage 1 (count at Persist_Cache.java:21) 19/04/29 13:57:40 INFO DAGScheduler: Parents of final stage: List() 19/04/29 13:57:40 INFO DAGScheduler: Missing parents: List() 19/04/29 13:57:40 INFO DAGScheduler: Submitting ResultStage 1 (E:/2018_cnic/learn/wordcount.txt MapPartitionsRDD[1] at textFile at Persist_Cache.java:13), which has no missing parents 19/04/29 13:57:40 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.1 KB, free 413.7 MB) 19/04/29 13:57:40 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1925.0 B, free 413.7 MB) 19/04/29 13:57:40 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on hadoop:13416 (size: 1925.0 B, free: 413.9 MB) 19/04/29 13:57:40 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1039
19/04/29 13:57:40 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (E:/2018_cnic/learn/wordcount.txt MapPartitionsRDD[1] at textFile at Persist_Cache.java:13) (first 15 tasks are for partitions Vector(0)) 19/04/29 13:57:40 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 19/04/29 13:57:40 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, PROCESS_LOCAL, 7885 bytes) 19/04/29 13:57:40 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 19/04/29 13:57:40 INFO BlockManager: Found block rdd_1_0 locally 19/04/29 13:57:40 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 746 bytes result sent to driver 19/04/29 13:57:40 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 12 ms on localhost (executor driver) (1/1) 19/04/29 13:57:40 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 19/04/29 13:57:40 INFO DAGScheduler: ResultStage 1 (count at Persist_Cache.java:21) finished in 0.027 s 19/04/29 13:57:40 INFO DAGScheduler: Job 1 finished: count at Persist_Cache.java:21, took 0.028863 s cahce duration:31
19/04/29 13:57:40 INFO SparkContext: Invoking stop() from shutdown hook
二、spark persist持久化
package SparkStreaming; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.storage.StorageLevel; public class Persist_Cache { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local") .setAppName("Persist"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> stringJavaRDD = sc.textFile("E:/2018_cnic/learn/wordcount.txt"); JavaRDD<String> persist = stringJavaRDD.persist(StorageLevel.NONE()); long startTime = System.currentTimeMillis(); long count = persist.count(); long endTime = System.currentTimeMillis(); System.out.println("no cahce duration:"+(endTime-startTime)); long startTime1 = System.currentTimeMillis(); long count1 = persist.count(); long endTime1 = System.currentTimeMillis(); System.out.println("cahce duration:"+(endTime1-startTime1)); } }
結果輸出:結果加快是其內部優化的原因,不是持久化作用。
19/04/29 14:17:28 INFO DAGScheduler: Job 0 finished: count at Persist_Cache.java:17, took 0.228634 s no cahce duration:291
19/04/29 14:17:28 INFO SparkContext: Starting job: count at Persist_Cache.java:22
19/04/29 14:17:28 INFO DAGScheduler: Got job 1 (count at Persist_Cache.java:22) with 1 output partitions 19/04/29 14:17:28 INFO DAGScheduler: Final stage: ResultStage 1 (count at Persist_Cache.java:22) 19/04/29 14:17:28 INFO DAGScheduler: Parents of final stage: List() 19/04/29 14:17:28 INFO DAGScheduler: Missing parents: List() 19/04/29 14:17:28 INFO DAGScheduler: Submitting ResultStage 1 (E:/2018_cnic/learn/wordcount.txt MapPartitionsRDD[1] at textFile at Persist_Cache.java:14), which has no missing parents 19/04/29 14:17:28 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.1 KB, free 413.7 MB) 19/04/29 14:17:28 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1919.0 B, free 413.7 MB) 19/04/29 14:17:28 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on hadoop:13983 (size: 1919.0 B, free: 413.9 MB) 19/04/29 14:17:28 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1039
19/04/29 14:17:28 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (E:/2018_cnic/learn/wordcount.txt MapPartitionsRDD[1] at textFile at Persist_Cache.java:14) (first 15 tasks are for partitions Vector(0)) 19/04/29 14:17:28 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 19/04/29 14:17:28 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, PROCESS_LOCAL, 7885 bytes) 19/04/29 14:17:28 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 19/04/29 14:17:28 INFO HadoopRDD: Input split: file:/E:/2018_cnic/learn/wordcount.txt:0+2482
19/04/29 14:17:28 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 789 bytes result sent to driver 19/04/29 14:17:28 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 12 ms on localhost (executor driver) (1/1) 19/04/29 14:17:28 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 19/04/29 14:17:28 INFO DAGScheduler: ResultStage 1 (count at Persist_Cache.java:22) finished in 0.023 s 19/04/29 14:17:28 INFO DAGScheduler: Job 1 finished: count at Persist_Cache.java:22, took 0.025025 s cahce duration:28
19/04/29 14:17:28 INFO SparkContext: Invoking stop() from shutdown hook
三、spark persist源碼分析:
class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, //不使用堆外內存 private var _deserialized: Boolean, //不序列化 private var _replication: Int = 1
)
val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) 不序列化 val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) 內存中放不下剩余的放入到磁盤中 val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1) 使用堆外內存
持久化的單位是partition
例如:RDD中有3個partition,持久化級別是MEMERY_AND_DISK,內存中可以存幾個就存幾個,不會存儲半個的情況,剩下的存儲到磁盤
MEMERY_AND_DISK_2:數據存儲在幾個節點上?不一定,不能確定,因為每一個partition存儲在哪個節點不確定,其備份存儲在哪里也不確定。
四、spark cache源碼分析:調用的是persist方法,默認持久化級別為 StorageLevel.MEMORY_ONLY
/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) /** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist()
五、checkpoint操作
checkpoint:會將RDD的數據存儲到HDFS中,安全系數較高,因為HDFS會有備份
checkpoint也是懶執行的。如何使用?
sc.checkpoint("path") rdd3.checkpoint()
標注:在RDD的job執行完成后(action類算子被觸發)
1、會從finalRDD(最后一個RDD)從后往前回溯,尋找調用checkpoint的rdd,對這個rdd做一個標記,做完標記后,重新啟動一個job,來計算被checkpoint的RDD,然后將計算結果寫入到相應的HDFS目錄下面。
2、同時將被checkpoint的RDD的依賴關系切斷,強制將依賴關系改變為checkpointRDD。
調用checkpoint的優化方法:
因為被checkpont的RDD被計算兩次,在執行調用checkpoint之前,可以對RDD3進行cache,那么在rdd的job執行完成之后另外啟動一個job,只是將內存中的數據遷移到HDFS就可以了,省去了計算的過程。
