引言:
無論HDFS還是MapReduce,在處理小文件時效率都非常低,但又難免面臨處理大量小文件的場景,此時,就需要有相應解決方案。可以自定義InputFormat實現小文件的合並。(對外是一個整文件,對內仍是原先的小文件,節省MapTask)
需求如下:
將多個小文件合並成一個SequenceFile文件(SequenceFile文件是Hadoop用來存儲二進制形式的key-value對的文件格式),SequenceFile里面存儲着多個文件,存儲的形式為文件路徑+名稱為key,文件內容為value。
(1)輸入數據

(2)期望輸出文件格式

步驟:

程序實現
(1)自定義InputFromat
package cn.mark.mrInputFormat; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; //存儲的形式為文件路徑+名稱為key,文件內容為value。讀全部文件用到流,byte //故 輸入Key類型為Text,輸入Value類型為BytesWritable public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable> { // 定義類繼承FileInputFormat @Override protected boolean isSplitable(JobContext context, Path filename) { return false;//單個文件不允許再切片 } @Override public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeRecordReader recordReader = new WholeRecordReader(); recordReader.initialize(split,context); return recordReader; } }
(2)自定義RecordReader類(核心)
package cn.mark.mrInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; //RecordReader<Text, BytesWritable> 固有的輸入KV格式 public class WholeRecordReader extends RecordReader<Text, BytesWritable> { // 主要針對缺什么補什么 FileSplit split; Configuration configuration; Text k = new Text(); BytesWritable v = new BytesWritable(); // 標記位 boolean isProgress = true; //************************************ @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // 初始化 this.split = (FileSplit) split; configuration = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // 核心業務邏輯處理 對key和value進行封裝 if (isProgress){ /** The number of bytes in the file to process. 獲取文件字節的全部數量 public long getLength() { return fs.getLength(); } */ byte[] buf = new byte[(int) split.getLength()]; // 1.獲取fs對象 Path path = split.getPath(); FileSystem fs = path.getFileSystem(configuration); // 2.獲取輸入流 FSDataInputStream fis = fs.open(path); // 3.拷貝 // readFully(InputStream in, byte buf[], int off, int len) // 4各參數: 1.要讀的流 2.目的地 3.讀的大小的起始位置 4.讀的長度 /** * Reads len bytes in a loop. * @param in InputStream to read from * @param buf The buffer to fill * @param off offset from the buffer :緩沖區的偏移量,即開始位置 * @param len the length of bytes to read 先開辟一段相應長度的字節緩沖區,再讀內容進去 */ IOUtils.readFully(fis,buf,0,buf.length); // 4.封裝v v是文件的內容 /**Set the value to a copy of the given byte range * @param newData the new values to copy in * @param offset the offset in newData to start at * @param length the number of bytes to copy public void set(byte[] newData, int offset, int length) 再將之前的緩沖區內容通過set方法,設置成v的值 */ v.set(buf,0,buf.length); // 5.封裝K ,k本身就是路徑名 path.toString():既有路徑又有文件名稱 k.set(path.toString()); // 6.關閉資源 IOUtils.closeStream(fis); // 能進來說明能讀到數據,而且每次調用nextKeyValue函數時候是說明已經新讀一個文件, // 本WholeRecordReader類會重新創建對象,重新初始化,isProgress都會重新設為true isProgress = false;//說明本文件已經讀完! return true; //只有return true才會執行下面的函數 /** public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { //<往常只讀一行,有數據則true> map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } 下如果只寫ture 則會無限循環,如果只寫false則會不進循環,不進行讀寫操作, 故需要一個標記*/ } return false; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return k; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return v; } //*********************************** @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { } }
(3)編寫SequenceFileMapper類處理流程
package cn.mark.mrInputFormat; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SequenceFileMapper extends Mapper<Text, BytesWritable,Text,BytesWritable> { @Override protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { // 不是一次讀取一行,是一次讀取一整個文件 context.write(key,value); } }
(4)編寫SequenceFileReducer類處理流程
package cn.mark.mrInputFormat; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; // 要知道傳過來的是什么數據及其類型 // 傳過來a.txt , b.txt 輸出 : <路徑名文件名,文件內容> public class SequenceFileReducer extends Reducer<Text, BytesWritable,Text, BytesWritable> { @Override protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { // 循環寫出 每次都是一個文件的全部內容 for (BytesWritable value : values) { context.write(key,value); } } }
(5)編寫SequenceFileDriver類處理流程
package cn.mark.mrInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import java.io.IOException; public class SequenceFileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置 args = new String[] { "C:\\Users\\Administrator\\Downloads\\input\\123", "C:\\Users\\Administrator\\Downloads\\input\\output" }; // 1 獲取job對象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 設置jar包存儲位置、關聯自定義的mapper和reducer job.setJarByClass(SequenceFileDriver.class); job.setMapperClass(SequenceFileMapper.class); job.setReducerClass(SequenceFileReducer.class); // 7設置輸入的inputFormat job.setInputFormatClass(WholeFileInputformat.class); // 8設置輸出的outputFormat 默認是Text.class job.setOutputFormatClass(SequenceFileOutputFormat.class); // 3 設置map輸出端的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); // 4 設置最終輸出端的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); // 5 設置輸入輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 提交job job.waitForCompletion(true); } }

ok!
