hadoop2.7作業提交詳解之文件分片


在前面一篇文章中(hadoop2.7之作業提交詳解(上))中涉及到文件的分片。

JobSubmitter.submitJobInternal方法中調用了
int maps = writeSplits(job, submitJobDir); //設置map的數量,而map的數量是根據文件的大小和分片的大小,以及文件的數量決定的

接下來我們看一下JobSubmitter.writeSplits方法:

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
    Path jobSubmitDir) throws IOException,
    InterruptedException, ClassNotFoundException {
  JobConf jConf = (JobConf)job.getConfiguration();
  int maps;
  if (jConf.getUseNewMapper()) {
    maps = writeNewSplits(job, jobSubmitDir); //這里我們使用新的方式
  } else {
    maps = writeOldSplits(jConf, jobSubmitDir);
  }
  return maps;
}

接下來繼續看JobSubmitter.writeNewSplits方法:

private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
    InterruptedException, ClassNotFoundException {
  Configuration conf = job.getConfiguration();
  InputFormat<?, ?> input =
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);  //輸入對象,InputFormat是個抽象類  

  List<InputSplit> splits = input.getSplits(job); //調用InputFormat實現類的getSplits方法
  T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

  // sort the splits into order based on size, so that the biggest
  // go first
  Arrays.sort(array, new SplitComparator()); //對切片的大小進行排序,最大的放最前面
  JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
      jobSubmitDir.getFileSystem(conf), array);//創建Split文件 
  return array.length;
}

接下來看一下InputFormat這個抽象類:

public abstract class InputFormat<K, V> {
    //用來返回分片結果
    public abstract 
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;
    //RecordReader是用來從一個輸入分片中讀取一個一個的K-V對的抽象類,我們可以將其看作是在InputSplit上的迭代器。
    //最主要的方法就是nextKeyvalue()方法,由它獲取分片上的下一個K-V 對。
    public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;

}

接下來我們繼續看這個抽象類的實現類:

public class TextInputFormat extends FileInputFormat;
public abstract class FileInputFormat<K, V> extends InputFormat;
public abstract class InputFormat。

由於TextInputFormat從抽象類FileInputFormat中繼承,所以大部分的方法都來自於FileInputFormat類,TextInputFormat類只重寫了兩個方法:如下:

public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text> 
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
      //LineRecordReader由一個FileSplit構造出來,start是這個FileSplit的起始位置,pos是當前讀取分片的位置,
      //end是分片結束位置,in是打開的一個讀取這個分片的輸入流,它是使用這個FileSplit對應的文件名來打開的。
      //key和value則分別是每次讀取的K-V對。然后我們還看到可以利用getProgress()來跟蹤讀取分片的進度,
      //這個函數就是根據已經讀取的K-V對占總K-V對的比例來顯示進度的
    return new LineRecordReader(recordDelimiterBytes);
  }

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
 //如果是壓縮文件就不切分,非壓縮文件就切分。
    final CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    if (null == codec) {
      return true;
    }
    return codec instanceof SplittableCompressionCodec;
  }
}

我們在返回到JobSubmitter.writeNewSplits方法中,有List<InputSplit> splits = input.getSplits(job);主要是調用了TextInputFormat.getSplits()方法,而TextInputFormat繼承了FileInputFormat類,所以調用的就是FileInputFormat.getSplits()方法:

public List<InputSplit> getSplits(JobContext job) throws IOException {
  StopWatch sw = new StopWatch().start();//用來計算納秒級別的時間
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //最小值默認為1
  long maxSize = getMaxSplitSize(job); //最大值為long的最大值,默認為0x7fffffffffffffffL

  // generate splits
  List<InputSplit> splits = new ArrayList<InputSplit>();
  List<FileStatus> files = listStatus(job); //獲得所有的輸入文件
  for (FileStatus file: files) {
    Path path = file.getPath(); //文件路徑
    long length = file.getLen(); //文件大小
    if (length != 0) {
      BlockLocation[] blkLocations;
      if (file instanceof LocatedFileStatus) {//如果是個含有數據塊位置信息的文件 
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();
      } else { //一般文件 
        FileSystem fs = path.getFileSystem(job.getConfiguration());
        blkLocations = fs.getFileBlockLocations(file, 0, length);
      }
      if (isSplitable(job, path)) { //判斷是否可以分片
        long blockSize = file.getBlockSize(); //128M
        long splitSize = computeSplitSize(blockSize, minSize, maxSize); //計算分片的大小,默認為128M 

        long bytesRemaining = length;
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //判斷剩余文件大小是否大於128M*1.1 
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//f返回每個分片起始位置
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                      blkLocations[blkIndex].getHosts(),
                      blkLocations[blkIndex].getCachedHosts()));
          bytesRemaining -= splitSize; // 依次減去分片的大小,對剩余長度再次分片
        }
// 多次分片后,最后的數據長度仍不為0但又不足一個分片大小
        if (bytesRemaining != 0) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                     blkLocations[blkIndex].getHosts(),
                     blkLocations[blkIndex].getCachedHosts()));
        }
//不可分,則把整個文件作為一個分片
      } else { // not splitable
        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                    blkLocations[0].getCachedHosts()));
      }
    } else { 
//創建空的分片
      //Create empty hosts array for zero length files
      splits.add(makeSplit(path, 0, length, new String[0]));
    }
  }
  // Save the number of input files for metrics/loadgen
  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); //設置參數NUM_INPUT_FILES
  sw.stop();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Total # of splits generated by getSplits: " + splits.size()
        + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
  }
  return splits;
}
//public class FileSplit extends InputSplit implements Writable {
//  private Path file;//輸入文件路徑 
//  private long start;//分片在文件中的位置(起點)
//  private long length;//分片長度
//  private String[] hosts;//這個分片所在數據塊的多個復份所在節點
//  private SplitLocationInfo[] hostInfos;//每個數據塊復份所在節點,以及是否緩存 
//}
//makeSplit方法存放的分片格式
protected FileSplit makeSplit(Path file, long start, long length, 
                              String[] hosts, String[] inMemoryHosts) {
  return new FileSplit(file, start, length, hosts, inMemoryHosts);
}

//計算分片的大小
protected long computeSplitSize(long blockSize, long minSize,
                                long maxSize) {
  return Math.max(minSize, Math.min(maxSize, blockSize));
}

通過FileInputFormat.getSplits(),可以返回一個存放分片的ArraryList,接下繼續回到JobSubmitter.writeNewSplits方法中:

接下來將ArrayList轉換為數組,並根據分片的大小排序。然后調用JobSplitWriter.createSplitFiles()方法創建split文件。最后返回數組的長度,也就是map的個數。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM