Spark-RDD持久化


多次对某个RDD进行transformation或者action,如果没有做RDD持久化,那么每次都要重新计算一个RDD,会消耗大量时间,降低Spark性能。

Spark非常重要的一个功能特性就是可以将RDD持久化在内存中。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition。这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。

巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍。对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。

要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。

cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中清楚缓存,那么可以使用unpersist()方法。

Spark自己也会在shuffle操作时,进行数据的持久化,比如写入磁盘,主要是为了在节点失败时,避免需要重新计算整个过程。

RDD持久化是可以手动选择不同的策略的。比如可以将RDD持久化在内存中、持久化到磁盘上、使用序列化的方式持久化,多持久化的数据进行多路复用。只要在调用persist()时传入对应的StorageLevel即可。

 

持久化级别 含义
MEMORY_ONLY 以非序列化的Java对象的方式持久化在JVM内存中。如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下一次需要使用它的时候,重新被计算

 

MEMORY_AND_DISK

同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用这些partition时,需要从磁盘上读取
MEMORY_ONLY_SER 同MEMORY_ONLY,但是会使用Java序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大CPU开销。
MEMORY_AND_DSK_SER 同MEMORY_AND_DSK。但是使用序列化方式持久化Java对象
DISK_ONLY 使用非序列化Java对象的方式持久化,完全存储到磁盘上

MEMORY_ONLY_2
MEMORY_AND_DISK_2
等等

如果是尾部加了2的持久化级别,表示会将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次计算,只需要使用备份数据即可。

 

 

 

 

 

 

 

 

 

如何选择RDD持久化策略?
Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。下面是一些通用的持久化级别的选择建议:

1、优先使用MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略。因为纯内存速度最快,而且没有序列化,不需要消耗CPU进行反序列化操作。
2、如果MEMORY_ONLY策略,无法存储的下所有数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化进行存储,纯内存操作还是非常快,只是要消耗CPU进行反序列化。
3、如果需要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了。
4、能不使用DISK相关的策略,就不用使用,有的时候,从磁盘读取数据,还不如重新计算一次。

 

不使用RDD持久化

使用RDD持久化

package cn.spark.study.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;

/**
 * @author: yangchun
 * @description:
 * @date: Created in 2020-05-07 13:28
 */
public class PersistOperation {
    public static void main(String[] args) {
        //第一步:创建SparkConf对象,设置Spark应用的配置信息
        //setMaster()可以设置Spark应用程序要连接的机器的master机器,设置为local表示本地运行
        SparkConf conf = new SparkConf().setAppName("WordCountLocal")
                .setMaster("local");
        //第二步:创建JavaSparkContext对象
        /*在Spark,SparkContext是所有Spark所有功能的一个入口,你无论是java,scala,还是python编写的
        都必须有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件,包括调度器
        (DAGSchedule,TaskSchedule),还会去Spark Master节点进行注册,等等。Spark Context是Spark中
        最重要的一个对象。不同类型的Spark应用程序,SparkContext不同
        Java的SparkContext,就是JavaSparkContext
        Spark SQL程序,SQLContext,HiveContext
        Spark Streaming SparkContext
        Scala 就是SparkContext
         */
        JavaSparkContext sc = new JavaSparkContext(conf);
        /*
        第三步,针对输入源创建R初始RDD,输入源中的数据会打散,分配到RDD的每个partition中,形成一个分布式数据集
        SparkContext根据本地文件创建RDD的方法叫做textFile(),Java中,创建的普通RDD,都叫做JavaRDD。RDD中有元素的
        概念,如果hdfs和本地文件,创建的RDD每一个元素相当于文件里面的一行
         */
        JavaRDD<String> lines = sc.textFile("E:\\spark\\spark.txt").cache();
        long start = System.currentTimeMillis();
        Long num  = lines.count();
        long end = System.currentTimeMillis();
        System.out.println("first "+(end - start));
        long start1 = System.currentTimeMillis();
        Long num1  = lines.count();
        long end1= System.currentTimeMillis();
        System.out.println("second "+(end1 - start1));
        sc.close();
    }
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM