自定義InputFormat


回顧:

  在上一篇https://www.cnblogs.com/superlsj/p/11857691.html詳細介紹了InputFormat的原理和常見的實現類。總結來說,InputFormat是將文件切片----->再轉化為<key--value>對轉交給Mapper處理。

  所以我們看到在InputFormat類中只有兩個方法,一個負責切片,一個返回能將切片信息轉化為相應的鍵值對的對象:

public abstract class InputFormat<K, V> {
    public InputFormat() {
    }

    public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;

    public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}

  以KeyValueInputFormat為例:

@Stable
public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {
    public KeyValueTextInputFormat() {
    }

    protected boolean isSplitable(JobContext context, Path file) {
        CompressionCodec codec = (new CompressionCodecFactory(context.getConfiguration())).getCodec(file);
        return null == codec ? true : codec instanceof SplittableCompressionCodec;
    }

    public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
        context.setStatus(genericSplit.toString());
        return new KeyValueLineRecordReader(context.getConfiguration());
    }
}

  我們知道:當使用KeyValueInputFormat並設置分隔符后,Mapper以分隔符前的內容作為Key來接收,以分隔符后面的內容作為Value來接收。那么在數據提交到Mapper之前,數據就必須被格式化為滿足Mapper接收的格式,這個工作就是由InputFormat來完成的,而InputFormat實際上並不能完成這項工作,而是創建一個RecordReader來完成這項轉換工作。順帶一提:isSplitable方法返回文件是否可以切片,當返回false時,表示在格式化輸入文件時,不對文件進行切片,直接進行文本數據至鍵值對的轉化。

一、設計自己的InputFormat:

  現有的InputFormat肯定是無法滿足現實中花里胡哨的需求,所以自定義InputFormat是一項不可避免的工作。下面以將三個小文件合並處理,以文件名作為Key,文件內容的byte數組作為value,轉換成一個SquenceFile文件的案例來演示自定義InputFormat的流程。SuquenceFile文件是Hadoop用來存儲二進制形式的key-value對的文件格式,相比於純文本文件,格式更加緊湊。

  1、自定義InputFormat

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
    public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new WholeFileRecordReader();
    }

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
}

  2、自定義RecordReader

public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {
    private boolean notRead = true;
    private Text key = new Text();
    private BytesWritable value = new BytesWritable();
    private FSDataInputStream inputStream;
    private FileSplit fs;
    /**
     * 初始化方法,框架會在讀取切片數據之前調用此方法,因此,一些在RecordReader工作時需要使用的資源可以在此方法中進行初始化(這些資源必須是可以在inputSplit和taskAttemptContext中可以獲取到的)。
* InputSplit inputSplit:當前RecordReader正在處理的切片
   * TaskAttemptContext taskAttemptContext:當前Job的上下文,可以通過此對象獲取job的配置對象
*/
  public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {   
    //
轉換切片類型到文件切片
  
    fs = (FileSplit)inputSplit;
    //通過切片獲得路徑 
    Path path = fs.getPath();
    //通過路徑獲取文件系統
    FileSystem fileSystem = path.getFileSystem(taskAttemptContext.getConfiguration());

    //開流
    inputStream = fileSystem.open(path);
  }

  /**
  * 此方法用於讀取下一組key-value,類似於迭代器,如果讀到數據返回true
  * 此方法會被Mapper的run()調用:
  * run(){
  *   setUp();
  *   while(RecorderReader.nextKeyValue()){
  *     // 如果讀取到下一組key-value對,就交給map方法處理
  *     map(recordReader.getCurrentKey(),recordReader.getCurrentValue(),context);
  *   } 
  * }
  * 因為將路徑+文件名作為key,文件內容作為value,所以一個文件只會讀取一次,要么沒讀過,要么讀過
  */
  public boolean nextKeyValue() throws IOException, InterruptedException {
    if(notRead){
      //具體讀文件的操作
      //讀Key
      key.set(fs.getPath().toString());
      //讀Value
      byte[] bytes = new byte[(int)fs.getLength()];
      inputStream.read(bytes);
      value.set(bytes,
0,bytes.length);
      notRead
= true; return true;
    }
else{
      return false;
    }
  }

  /**
  * 獲取當前讀到的Key-value對並返回
  */
  public Text getCurrentKey() throws IOException, InterruptedException {
    return key;
  }

  public BytesWritable getCurrentValue() throws IOException, InterruptedException {
    return value;
  }

  /**
  * 返回當前數據的讀取進度:0.0~1.0
  * 由於本案例中以路徑+整個文件名作為Key,只存在一個K-V對,
  * 所以讀取進度只存在兩種情況:要么0沒讀,要么1讀完了。
  */
  public float getProgress() throws IOException, InterruptedException {
    return notRead ? 0 : 1;
  }

  /**
  * 常用於關閉資源
  */
  public void close() throws IOException {
    IOUtils.closeStream(inputStream);
  }
}

  3、測試,本案例中Mapper和Redu啥也不用干,所以不用寫,用默認提供的就行,是需要寫一個Driver。

public class WholeFileDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(WholeFileDriver.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);//【注意】

        FileInputFormat.setInputPaths(job, new Path("d:\\input"));
        FileOutputFormat.setOutputPath(job, new Path("d:\\output"));

        boolean b = job.waitForCompletion(true);
        System.out.println(b ? 0:1);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM