Spark讀取HDFS小文件優化


Spark讀取HDFS目錄,若該目錄下存在大量小文件時,每個文件都會生成一個Task,當存在大量任務時,可能存在性能不足的問題,可以使用CombineTextInputFormat類代替TextInputFormat類進行優化,同時配合使用hadoop參數mapreduce.input.fileinputformat.split.maxsize


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.CombineTextInputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;

import java.io.IOException;

public class HDFSFileCompare {
    public static void main(String[] args) throws IOException {

        SparkSession session = SparkSession.builder().getOrCreate();
        JavaSparkContext sc = JavaSparkContext.fromSparkContext(session.sparkContext());
        LongAccumulator basFileTotalCount = sc.sc().longAccumulator();

        
        String path = args[0];
//        JavaRDD<String> basRDD = sc.hadoopFile(path, TextInputFormat.class, LongWritable.class, Text.class).map(p -> {
//            return new String(p._2.getBytes(),0,p._2.getLength(),"gbk");
//        });


//一個split最大為64M,否則只會生成一個任務
        sc.hadoopConfiguration().set("mapreduce.input.fileinputformat.split.maxsize","67108864");
        JavaRDD<String> basRDD = sc.hadoopFile(path, CombineTextInputFormat.class, LongWritable.class, Text.class).map(p -> {
            return new String(p._2.getBytes(),0,p._2.getLength(),"gbk");
        });
        //System.out.println("helllllloll\n" +basRDD.count());
        basRDD.foreach(r -> {
            basFileTotalCount.add(1);
        });
        System.out.println("helllllloll\n" +basFileTotalCount.value());

    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM