輸入的InputFormat----SequenceFileInputFormat


繼承關系:SequenceFileInputFormat  extends FileInputFormat  implements InputFormat  。

 

SequenceFileInputFormat 代碼如下(其實很簡單):

  /**
   * 覆蓋了FileInputFormat的這個方法,FileInputFormat通過這個方法得到的FileStatus[]
   * 長度就是將要運行的map的長度,每個FileStatus對應一個文件
   */
  @Override
  protected FileStatus[] listStatus(JobConf job) throws IOException {
    FileStatus[] files = super.listStatus(job);
    /*調用父類的listStatus方法,然后進行了自己的處理,將得到的FileStatus[]遍歷一遍,
            遇到文件夾時候,看其是否為MapFile,是的話去除其中也是SequenceFile的data文件,否則將文件夾過濾掉。
     */
    for (int i = 0; i < files.length; i++) {
      FileStatus file = files[i];
      if (file.isDir()) {     // it's a MapFile
        Path dataFile = new Path(file.getPath(), MapFile.DATA_FILE_NAME);
        FileSystem fs = file.getPath().getFileSystem(job);
        // use the data file
        files[i] = fs.getFileStatus(dataFile);
      }
    }
    return files;
  }

 

下面看看FileInputFormat的listStatus(JobConf job)方法:

 protected FileStatus[] listStatus(JobConf job) throws IOException {
      //得到job配置中的所有輸入路徑,路徑中是以 ,號隔開的 。
      Path[] dirs = getInputPaths(job);
      if (dirs.length == 0) {
          throw new IOException("No input paths specified in job");
      }

    // get tokens for all the required FileSystems..
    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
    
    List<FileStatus> result = new ArrayList<FileStatus>();
    List<IOException> errors = new ArrayList<IOException>();
    
    // creates a MultiPathFilter with the hiddenFileFilter and the
    // user provided one (if any).
    //處理一個過路文件的filter,可以將輸入文件夾中的一些文件過濾掉
    List<PathFilter> filters = new ArrayList<PathFilter>();
    filters.add(hiddenFileFilter);
    PathFilter jobFilter = getInputPathFilter(job);
    if (jobFilter != null) {
      filters.add(jobFilter);
    }
    PathFilter inputFilter = new MultiPathFilter(filters);
    //對於每個輸入文件夾進行遍歷
    for (Path p: dirs) {
      FileSystem fs = p.getFileSystem(job); 
      //得到這個輸入文件下下的所有文件(夾)
      FileStatus[] matches = fs.globStatus(p, inputFilter);
      if (matches == null) {
        errors.add(new IOException("Input path does not exist: " + p));
      } else if (matches.length == 0) {
        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
      } else {
        //遍歷輸入問價夾下的每個文件(夾)
        for (FileStatus globStat: matches) {
          if (globStat.isDir()) {
            //文件夾的話,將該文件夾下的所有文件和文件夾添加到結果中
            //****注意此處沒有再往下層遍歷,而是將文件和文件夾都返回到結果中 。
            for(FileStatus stat: fs.listStatus(globStat.getPath(),
                inputFilter)) {
              result.add(stat);
            }          
          } else {
            //文件的話直接添加到result中,其實沒有任何判斷該文件是否是輸入需要的格式等等
            result.add(globStat);
          }
        }
      }
    }

    if (!errors.isEmpty()) {
      throw new InvalidInputException(errors);
    }
    LOG.info("Total input paths to process : " + result.size()); 
    return result.toArray(new FileStatus[result.size()]);
  }

 

是以總結SequenceFileInputFormat中輸出文件的規律(假設輸入文件夾是/input ):

1、輸入文件夾中的文件 ,即滿足:/input/***文件 。

2、輸入文件夾中子文件夾中的文件 ,/input/***/***文件 。

3、輸入文件夾中的子文件夾的子文件夾中的data文件 ,/input/***/***/data文件 ,該主要是針對MapFile的  。

 

得到一個個文件后,怎么將文件映射到InputSplit(有可能一個file映射1個InputSplit,也可能映射幾個InputSplit),代碼見下:

 

/** Splits files returned by {@link #listStatus(JobConf)} when
   * they're too big.*/ 
  @SuppressWarnings("deprecation")
  public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    FileStatus[] files = listStatus(job);
    
    // Save the number of input files in the job-conf
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDir()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
                            minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    //對於每個文件該分割成多少InputSplit的處理
    for (FileStatus file: files) {
      Path path = file.getPath();
      FileSystem fs = path.getFileSystem(job);
      long length = file.getLen();
      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
      //允許切割文件時候,即允許將一個file切割成多個InputSplit 。
      if ((length != 0) && isSplitable(fs, path)) { 
        long blockSize = file.getBlockSize();
        //分塊的大小,一般是以快為單位,即會選擇blockSize ,其實會將按照block來分塊,這樣比較合適
        long splitSize = computeSplitSize(goalSize, minSize, blockSize);

        long bytesRemaining = length;
        //按照分割設置(每塊大小),將文件從從offset=0 到offset=length 分割成 (length/splitSize+1) 個(其實也並不是這些個啦 ~ ~ ,最后一塊的大小可以是splitSize*SPLIT_SLOP) 
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          String[] splitHosts = getSplitHosts(blkLocations, 
              length-bytesRemaining, splitSize, clusterMap);
          splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
              splitHosts));
          bytesRemaining -= splitSize;
        }
        //返回splits
        if (bytesRemaining != 0) {
          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
                     blkLocations[blkLocations.length-1].getHosts()));
        }
      } else if (length != 0) {
        String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
        splits.add(new FileSplit(path, 0, length, splitHosts));
      } else { 
        //Create empty hosts array for zero length files
        splits.add(new FileSplit(path, 0, length, new String[0]));
      }
    }
    LOG.debug("Total # of splits: " + splits.size());
    return splits.toArray(new FileSplit[splits.size()]);
  }

ok ,一切都完了 ,但是你看上面的代碼可能會產生一個疑問:SequenceFile是存儲的一個個的key-value值,這樣分割文件的話,會不會破壞原有的數據結構,即 要是某一個key-value被分到了兩個FileSplit腫么辦 ?

見文章:http://www.cnblogs.com/serendipity/articles/2112613.html

 

 

個人感覺mapreduce的這個工具InputFormat比較亂,往往不看源代碼,你永遠也無法知道到底哪些文件被選上了  。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM