回顧:
在上一篇https://www.cnblogs.com/superlsj/p/11857691.html詳細介紹了InputFormat的原理和常見的實現類。總結來說,InputFormat是將文件切片----->再轉化為<key--value>對轉交給Mapper處理。
所以我們看到在InputFormat類中只有兩個方法,一個負責切片,一個返回能將切片信息轉化為相應的鍵值對的對象:
public abstract class InputFormat<K, V> { public InputFormat() { } public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException; public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException; }
以KeyValueInputFormat為例:
@Stable public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> { public KeyValueTextInputFormat() { } protected boolean isSplitable(JobContext context, Path file) { CompressionCodec codec = (new CompressionCodecFactory(context.getConfiguration())).getCodec(file); return null == codec ? true : codec instanceof SplittableCompressionCodec; } public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) throws IOException { context.setStatus(genericSplit.toString()); return new KeyValueLineRecordReader(context.getConfiguration()); } }
我們知道:當使用KeyValueInputFormat並設置分隔符后,Mapper以分隔符前的內容作為Key來接收,以分隔符后面的內容作為Value來接收。那么在數據提交到Mapper之前,數據就必須被格式化為滿足Mapper接收的格式,這個工作就是由InputFormat來完成的,而InputFormat實際上並不能完成這項工作,而是創建一個RecordReader來完成這項轉換工作。順帶一提:isSplitable方法返回文件是否可以切片,當返回false時,表示在格式化輸入文件時,不對文件進行切片,直接進行文本數據至鍵值對的轉化。
一、設計自己的InputFormat:
現有的InputFormat肯定是無法滿足現實中花里胡哨的需求,所以自定義InputFormat是一項不可避免的工作。下面以將三個小文件合並處理,以文件名作為Key,文件內容的byte數組作為value,轉換成一個SquenceFile文件的案例來演示自定義InputFormat的流程。SuquenceFile文件是Hadoop用來存儲二進制形式的key-value對的文件格式,相比於純文本文件,格式更加緊湊。
1、自定義InputFormat
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new WholeFileRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } }
2、自定義RecordReader
public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> { private boolean notRead = true; private Text key = new Text(); private BytesWritable value = new BytesWritable(); private FSDataInputStream inputStream; private FileSplit fs; /** * 初始化方法,框架會在讀取切片數據之前調用此方法,因此,一些在RecordReader工作時需要使用的資源可以在此方法中進行初始化(這些資源必須是可以在inputSplit和taskAttemptContext中可以獲取到的)。
* InputSplit inputSplit:當前RecordReader正在處理的切片
* TaskAttemptContext taskAttemptContext:當前Job的上下文,可以通過此對象獲取job的配置對象
*/
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
//轉換切片類型到文件切片
fs = (FileSplit)inputSplit;
//通過切片獲得路徑
Path path = fs.getPath();
//通過路徑獲取文件系統
FileSystem fileSystem = path.getFileSystem(taskAttemptContext.getConfiguration());
//開流
inputStream = fileSystem.open(path);
}
/**
* 此方法用於讀取下一組key-value,類似於迭代器,如果讀到數據返回true
* 此方法會被Mapper的run()調用:
* run(){
* setUp();
* while(RecorderReader.nextKeyValue()){
* // 如果讀取到下一組key-value對,就交給map方法處理
* map(recordReader.getCurrentKey(),recordReader.getCurrentValue(),context);
* }
* }
* 因為將路徑+文件名作為key,文件內容作為value,所以一個文件只會讀取一次,要么沒讀過,要么讀過
*/
public boolean nextKeyValue() throws IOException, InterruptedException {
if(notRead){
//具體讀文件的操作
//讀Key
key.set(fs.getPath().toString());
//讀Value
byte[] bytes = new byte[(int)fs.getLength()];
inputStream.read(bytes);
value.set(bytes,0,bytes.length);
notRead = true; return true;
}else{
return false;
}
}
/**
* 獲取當前讀到的Key-value對並返回
*/
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
/**
* 返回當前數據的讀取進度:0.0~1.0
* 由於本案例中以路徑+整個文件名作為Key,只存在一個K-V對,
* 所以讀取進度只存在兩種情況:要么0沒讀,要么1讀完了。
*/
public float getProgress() throws IOException, InterruptedException {
return notRead ? 0 : 1;
}
/**
* 常用於關閉資源
*/
public void close() throws IOException {
IOUtils.closeStream(inputStream);
}
}
3、測試,本案例中Mapper和Redu啥也不用干,所以不用寫,用默認提供的就行,是需要寫一個Driver。
public class WholeFileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(WholeFileDriver.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class);//【注意】 FileInputFormat.setInputPaths(job, new Path("d:\\input")); FileOutputFormat.setOutputPath(job, new Path("d:\\output")); boolean b = job.waitForCompletion(true); System.out.println(b ? 0:1); } }