【原創】大叔問題定位分享(12)Spark保存文本類型文件(text、csv、json等)到hdfs時為什么是壓縮格式的


問題重現

rdd.repartition(1).write.csv(outPath)

寫文件之后發現文件是壓縮過的

 

write時首先會獲取hadoopConf,然后從中獲取是否壓縮以及壓縮格式

org.apache.spark.sql.execution.datasources.DataSource

  def write(

org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand

    val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)

org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {

        Configuration conf = job.getConfiguration();

        boolean isCompressed = getCompressOutput(job);

        String keyValueSeparator = conf.get(SEPERATOR, "\t");

        CompressionCodec codec = null;

        String extension = "";

        if (isCompressed) {

            Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);

            codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);

            extension = codec.getDefaultExtension();

        }

isCompressed取的是mapreduce.output.fileoutputformat.compress,codecClass取的是mapreduce.output.fileoutputformat.compress.codec

 

hadoopConf初始化過程為

org.apache.spark.sql.internal.SessionState

  def newHadoopConf(): Configuration = {

    val hadoopConf = new Configuration(sparkSession.sparkContext.hadoopConfiguration)

org.apache.spark.SparkContext

  _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

 

  def newConfiguration(conf: SparkConf): Configuration = {

    val hadoopConf = new Configuration()

    appendS3AndSparkHadoopConfigurations(conf, hadoopConf)

    hadoopConf

  }

 

  def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = {

  ...

      conf.getAll.foreach { case (key, value) =>

        if (key.startsWith("spark.hadoop.")) {

          hadoopConf.set(key.substring("spark.hadoop.".length), value)

        }

      }

 

 

hadoopConf默認會從classpath中加載所有的hadoop相關配置文件,可以通過spark-shell來簡單測試:

scala> val hc = spark.sparkContext.hadoopConfiguration

hc: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml

scala> println(hc.get("mapreduce.output.fileoutputformat.compress"))

true

scala> println(hc.get("mapreduce.output.fileoutputformat.compress.codec"))

org.apache.hadoop.io.compress.DefaultCodec

 

綜上,只需要在創建SparkConf的時候設置spark.hadoop.mapreduce.output.fileoutputformat.compress=false即可不壓縮,

val sparkConf = new SparkConf().set("spark.hadoop.mapreduce.output.fileoutputformat.compress", "false")

另外還可以通過option來控制

rdd.repartition(1).write.option("compression", "none").csv(outPath)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM