spark持久化


spark持久化:cache 、persist、checkpoint

一、cache持久化

cache实际上是persist的一种简化方式,是一种懒执行的,执行action类算子才会触发,cahce后返回值要赋值给一个变量,下一个job直接基于变量进行操作。

cache操作:

public class Persist_Cache { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local") .setAppName("Persist"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> stringJavaRDD = sc.textFile("E:/2018_cnic/learn/wordcount.txt"); JavaRDD<String> cache = stringJavaRDD.cache(); long startTime = System.currentTimeMillis(); long count = cache.count(); long endTime = System.currentTimeMillis(); System.out.println("no cahce duration:"+(endTime-startTime)); long startTime1 = System.currentTimeMillis(); long count1 = cache.count(); long endTime1 = System.currentTimeMillis(); System.out.println("cahce duration:"+(endTime1-startTime1)); } }

结果输出:

19/04/29 13:57:40 INFO DAGScheduler: Job 0 finished: count at Persist_Cache.java:16, took 0.202060 s no cahce duration:248
19/04/29 13:57:40 INFO SparkContext: Starting job: count at Persist_Cache.java:21
19/04/29 13:57:40 INFO DAGScheduler: Got job 1 (count at Persist_Cache.java:21) with 1 output partitions 19/04/29 13:57:40 INFO DAGScheduler: Final stage: ResultStage 1 (count at Persist_Cache.java:21) 19/04/29 13:57:40 INFO DAGScheduler: Parents of final stage: List() 19/04/29 13:57:40 INFO DAGScheduler: Missing parents: List() 19/04/29 13:57:40 INFO DAGScheduler: Submitting ResultStage 1 (E:/2018_cnic/learn/wordcount.txt MapPartitionsRDD[1] at textFile at Persist_Cache.java:13), which has no missing parents 19/04/29 13:57:40 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.1 KB, free 413.7 MB) 19/04/29 13:57:40 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1925.0 B, free 413.7 MB) 19/04/29 13:57:40 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on hadoop:13416 (size: 1925.0 B, free: 413.9 MB) 19/04/29 13:57:40 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1039
19/04/29 13:57:40 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (E:/2018_cnic/learn/wordcount.txt MapPartitionsRDD[1] at textFile at Persist_Cache.java:13) (first 15 tasks are for partitions Vector(0)) 19/04/29 13:57:40 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 19/04/29 13:57:40 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, PROCESS_LOCAL, 7885 bytes) 19/04/29 13:57:40 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 19/04/29 13:57:40 INFO BlockManager: Found block rdd_1_0 locally 19/04/29 13:57:40 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 746 bytes result sent to driver 19/04/29 13:57:40 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 12 ms on localhost (executor driver) (1/1) 19/04/29 13:57:40 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 19/04/29 13:57:40 INFO DAGScheduler: ResultStage 1 (count at Persist_Cache.java:21) finished in 0.027 s 19/04/29 13:57:40 INFO DAGScheduler: Job 1 finished: count at Persist_Cache.java:21, took 0.028863 s cahce duration:31
19/04/29 13:57:40 INFO SparkContext: Invoking stop() from shutdown hook

二、spark persist持久化

package SparkStreaming; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.storage.StorageLevel; public class Persist_Cache { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local") .setAppName("Persist"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> stringJavaRDD = sc.textFile("E:/2018_cnic/learn/wordcount.txt"); JavaRDD<String> persist = stringJavaRDD.persist(StorageLevel.NONE()); long startTime = System.currentTimeMillis(); long count = persist.count(); long endTime = System.currentTimeMillis(); System.out.println("no cahce duration:"+(endTime-startTime)); long startTime1 = System.currentTimeMillis(); long count1 = persist.count(); long endTime1 = System.currentTimeMillis(); System.out.println("cahce duration:"+(endTime1-startTime1)); } }

结果输出:结果加快是其内部优化的原因,不是持久化作用。

19/04/29 14:17:28 INFO DAGScheduler: Job 0 finished: count at Persist_Cache.java:17, took 0.228634 s no cahce duration:291
19/04/29 14:17:28 INFO SparkContext: Starting job: count at Persist_Cache.java:22
19/04/29 14:17:28 INFO DAGScheduler: Got job 1 (count at Persist_Cache.java:22) with 1 output partitions 19/04/29 14:17:28 INFO DAGScheduler: Final stage: ResultStage 1 (count at Persist_Cache.java:22) 19/04/29 14:17:28 INFO DAGScheduler: Parents of final stage: List() 19/04/29 14:17:28 INFO DAGScheduler: Missing parents: List() 19/04/29 14:17:28 INFO DAGScheduler: Submitting ResultStage 1 (E:/2018_cnic/learn/wordcount.txt MapPartitionsRDD[1] at textFile at Persist_Cache.java:14), which has no missing parents 19/04/29 14:17:28 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.1 KB, free 413.7 MB) 19/04/29 14:17:28 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1919.0 B, free 413.7 MB) 19/04/29 14:17:28 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on hadoop:13983 (size: 1919.0 B, free: 413.9 MB) 19/04/29 14:17:28 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1039
19/04/29 14:17:28 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (E:/2018_cnic/learn/wordcount.txt MapPartitionsRDD[1] at textFile at Persist_Cache.java:14) (first 15 tasks are for partitions Vector(0)) 19/04/29 14:17:28 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 19/04/29 14:17:28 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, PROCESS_LOCAL, 7885 bytes) 19/04/29 14:17:28 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 19/04/29 14:17:28 INFO HadoopRDD: Input split: file:/E:/2018_cnic/learn/wordcount.txt:0+2482
19/04/29 14:17:28 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 789 bytes result sent to driver 19/04/29 14:17:28 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 12 ms on localhost (executor driver) (1/1) 19/04/29 14:17:28 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 19/04/29 14:17:28 INFO DAGScheduler: ResultStage 1 (count at Persist_Cache.java:22) finished in 0.023 s 19/04/29 14:17:28 INFO DAGScheduler: Job 1 finished: count at Persist_Cache.java:22, took 0.025025 s cahce duration:28
19/04/29 14:17:28 INFO SparkContext: Invoking stop() from shutdown hook

三、spark persist源码分析:

class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, //不使用堆外内存 private var _deserialized: Boolean, //不序列化 private var _replication: Int = 1
)
  val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) 不序列化 val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) 内存中放不下剩余的放入到磁盘中 val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1)  使用堆外内存

持久化的单位是partition

例如:RDD中有3个partition,持久化级别是MEMERY_AND_DISK,内存中可以存几个就存几个,不会存储半个的情况,剩下的存储到磁盘

MEMERY_AND_DISK_2:数据存储在几个节点上?不一定,不能确定,因为每一个partition存储在哪个节点不确定,其备份存储在哪里也不确定。

四、spark cache源码分析:调用的是persist方法,默认持久化级别为 StorageLevel.MEMORY_ONLY

/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) /** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist()

五、checkpoint操作

checkpoint:会将RDD的数据存储到HDFS中,安全系数较高,因为HDFS会有备份

checkpoint也是懒执行的。如何使用?

sc.checkpoint("path") rdd3.checkpoint()

标注:在RDD的job执行完成后(action类算子被触发)

1、会从finalRDD(最后一个RDD)从后往前回溯,寻找调用checkpoint的rdd,对这个rdd做一个标记,做完标记后,重新启动一个job,来计算被checkpoint的RDD,然后将计算结果写入到相应的HDFS目录下面。

2、同时将被checkpoint的RDD的依赖关系切断,强制将依赖关系改变为checkpointRDD。

调用checkpoint的优化方法:

因为被checkpont的RDD被计算两次,在执行调用checkpoint之前,可以对RDD3进行cache,那么在rdd的job执行完成之后另外启动一个job,只是将内存中的数据迁移到HDFS就可以了,省去了计算的过程。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM