MapReduce-CombineTextInputFormat 切片機制


MapReduce 框架默認的 TextInputFormat 切片機制是對任務按文件規划切片,如果有大量小文件,就會產生大量的 MapTask,處理小文件效率非常低。

CombineTextInputFormat:用於小文件過多的場景,它可以將多個小文件從邏輯上規划到一個切片中,這樣,多個小文件就可以交給一個 MapTask 處理。

CombineTextInputFormat 切片機制過程包括:虛擬存儲過程和切片過程二部分

假設 setMaxInputSplitSize 值為 4M,有如下四個文件
a.txt 1.7M
b.txt 5.1M
c.txt 3.4M
d.txt 6.8M

(1)虛擬存儲過程
(1.1)將輸入目錄下所有文件大小,依次和設置的 setMaxInputSplitSize 值比較,如果不大於設置的最大值,邏輯上划分一個塊。
(1.2)如果輸入文件大於設置的最大值且大於兩倍,那么以最大值切割一塊,當剩余數據大小超過設置的最大值且不大於最大值2倍,此時將文件均分成2個虛擬存儲塊(防止出現太小切片)。 1.7M < 4M 划分一塊 5.1M > 4M 但是小於 2*4M 划分二塊:塊1=2.55M,塊2=2.55M 3.4M < 4M 划分一塊 6.8M > 4M 但是小於 2*4M 划分二塊:塊1=3.4M,塊2=3.4M 最終存儲的文件: 1.7M 2.55M,2.55M 3.4M 3.4M,3.4M (2)切片過程 (2.1)判斷虛擬存儲的文件大小是否大於 setlMaxIputSplitSize 值,大於等於則單獨形成一個切片。 (2.2)如果不大於則跟下一個虛擬存儲文件進行合並,共同形成一個切片。 最終會形成3個切片: (1.7+2.55)M,(2.55+3.4)M,(34+3.4)M

 

測試讀取數據的方式

控制台日志

可以看到讀取方式與 TextInputFormat  一樣,k 為偏移量,v 為一行的值,按行讀取

 

以 WordCount 為例進行測試,測試切片數

測試數據

測試代碼

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount {

    static {
        try {
            // 設置 HADOOP_HOME 環境變量
            System.setProperty("hadoop.home.dir", "D:/DevelopTools/hadoop-2.9.2/");
            // 日志初始化
            BasicConfigurator.configure();
            // 加載庫文件
            System.load("D:/DevelopTools/hadoop-2.9.2/bin/hadoop.dll");
        } catch (UnsatisfiedLinkError e) {
            System.err.println("Native code library failed to load.\n" + e);
            System.exit(1);
        }
    }

    public static void main(String[] args) throws Exception {
        args = new String[]{"D:\\tmp\\input", "D:\\tmp\\456"};
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);

        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 設置 InputFormat,默認為 TextInputFormat.class
        job.setInputFormatClass(CombineTextInputFormat.class);
        // 設置最大值即可 128M
        CombineTextInputFormat.setMaxInputSplitSize(job, 1024 * 1024 * 128);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            // 查看 k-v
            // System.out.println(key + "\t" + value);
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
}

由於所有文件加起來大小都沒有 128M,所以切片數為 1

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM