MapReduce 框架默認的 TextInputFormat 切片機制是對任務按文件規划切片,如果有大量小文件,就會產生大量的 MapTask,處理小文件效率非常低。
CombineTextInputFormat:用於小文件過多的場景,它可以將多個小文件從邏輯上規划到一個切片中,這樣,多個小文件就可以交給一個 MapTask 處理。
CombineTextInputFormat 切片機制過程包括:虛擬存儲過程和切片過程二部分 假設 setMaxInputSplitSize 值為 4M,有如下四個文件 a.txt 1.7M b.txt 5.1M c.txt 3.4M d.txt 6.8M (1)虛擬存儲過程 (1.1)將輸入目錄下所有文件大小,依次和設置的 setMaxInputSplitSize 值比較,如果不大於設置的最大值,邏輯上划分一個塊。
(1.2)如果輸入文件大於設置的最大值且大於兩倍,那么以最大值切割一塊,當剩余數據大小超過設置的最大值且不大於最大值2倍,此時將文件均分成2個虛擬存儲塊(防止出現太小切片)。 1.7M < 4M 划分一塊 5.1M > 4M 但是小於 2*4M 划分二塊:塊1=2.55M,塊2=2.55M 3.4M < 4M 划分一塊 6.8M > 4M 但是小於 2*4M 划分二塊:塊1=3.4M,塊2=3.4M 最終存儲的文件: 1.7M 2.55M,2.55M 3.4M 3.4M,3.4M (2)切片過程 (2.1)判斷虛擬存儲的文件大小是否大於 setlMaxIputSplitSize 值,大於等於則單獨形成一個切片。 (2.2)如果不大於則跟下一個虛擬存儲文件進行合並,共同形成一個切片。 最終會形成3個切片: (1.7+2.55)M,(2.55+3.4)M,(34+3.4)M
測試讀取數據的方式
控制台日志
可以看到讀取方式與 TextInputFormat 一樣,k 為偏移量,v 為一行的值,按行讀取
以 WordCount 為例進行測試,測試切片數
測試數據
測試代碼
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.BasicConfigurator; import java.io.IOException; import java.util.StringTokenizer; public class WordCount { static { try { // 設置 HADOOP_HOME 環境變量 System.setProperty("hadoop.home.dir", "D:/DevelopTools/hadoop-2.9.2/"); // 日志初始化 BasicConfigurator.configure(); // 加載庫文件 System.load("D:/DevelopTools/hadoop-2.9.2/bin/hadoop.dll"); } catch (UnsatisfiedLinkError e) { System.err.println("Native code library failed to load.\n" + e); System.exit(1); } } public static void main(String[] args) throws Exception { args = new String[]{"D:\\tmp\\input", "D:\\tmp\\456"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 設置 InputFormat,默認為 TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); // 設置最大值即可 128M CombineTextInputFormat.setMaxInputSplitSize(job, 1024 * 1024 * 128); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 查看 k-v // System.out.println(key + "\t" + value); StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } }
由於所有文件加起來大小都沒有 128M,所以切片數為 1