Spark读取HDFS小文件优化


Spark读取HDFS目录,若该目录下存在大量小文件时,每个文件都会生成一个Task,当存在大量任务时,可能存在性能不足的问题,可以使用CombineTextInputFormat类代替TextInputFormat类进行优化,同时配合使用hadoop参数mapreduce.input.fileinputformat.split.maxsize


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.CombineTextInputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;

import java.io.IOException;

public class HDFSFileCompare {
    public static void main(String[] args) throws IOException {

        SparkSession session = SparkSession.builder().getOrCreate();
        JavaSparkContext sc = JavaSparkContext.fromSparkContext(session.sparkContext());
        LongAccumulator basFileTotalCount = sc.sc().longAccumulator();

        
        String path = args[0];
//        JavaRDD<String> basRDD = sc.hadoopFile(path, TextInputFormat.class, LongWritable.class, Text.class).map(p -> {
//            return new String(p._2.getBytes(),0,p._2.getLength(),"gbk");
//        });


//一个split最大为64M,否则只会生成一个任务
        sc.hadoopConfiguration().set("mapreduce.input.fileinputformat.split.maxsize","67108864");
        JavaRDD<String> basRDD = sc.hadoopFile(path, CombineTextInputFormat.class, LongWritable.class, Text.class).map(p -> {
            return new String(p._2.getBytes(),0,p._2.getLength(),"gbk");
        });
        //System.out.println("helllllloll\n" +basRDD.count());
        basRDD.foreach(r -> {
            basFileTotalCount.add(1);
        });
        System.out.println("helllllloll\n" +basFileTotalCount.value());

    }
}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM