Hadoop2.6.0的FileInputFormat的任務切分原理分析(即如何控制FileInputFormat的map任務數量)


前言

  首先確保已經搭建好Hadoop集群環境,可以參考《Linux下Hadoop集群環境的搭建》一文的內容。我在測試mapreduce任務時,發現相比於使用Job.setNumReduceTasks(int)控制reduce任務數量而言,控制map任務數量一直是一個困擾我的問題。好在經過很多摸索與實驗,終於梳理出來,希望對在工作中進行Hadoop進行性能調優的新人們有個借鑒。本文只針對FileInputFormat的任務划分進行分析,其它類型的InputFormat的划分方式又各有不同。雖然如此,都可以按照本文類似的方法進行分析和總結。
為了簡便起見,本文以Hadoop2.6.0自帶的word count例子為例,進行展開。

wordcount

  我們首先准備好wordcount所需的數據,一共有兩份文件,都位於hdfs的/wordcount/input目錄下:

這兩個文件的內容分別為:

On the top of the Crumpretty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

But his face you could not see,
On account of his Beaver Hat.

有關如何操作hdfs並准備好數據的細節,本文不作贅述。
現在我們不作任何性能優化(不增加任何配置參數),然后執行hadoop-mapreduce-examples子項目(有關此項目介紹,可以閱讀《Hadoop2.6.0子項目hadoop-mapreduce-examples的簡單介紹》一文)中自帶的wordcount例子:

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount /wordcount/input /wordcount/output/result1

當然也可以使用朴素的方式運行wordcount例子:

hadoop org.apache.hadoop.examples.WordCount -D mapreduce.input.fileinputformat.split.maxsize=1 /wordcount/input /wordcount/output/result1

最后執行的結果在hdfs的/wordcount/output/result1目錄下:

執行結果可以查看/wordcount/output/result1/part-r-00000的內容:

第一次優化

  wordcount例子,查看運行結果不是本文的目的。在執行wordcount例子時,在任務運行信息中可以看到創建的map及reduce任務的數量:

可以看到FileInputFormat的輸入文件有2個,JobSubmitter任務划分的數量是2,最后產生的map任務數量也是2,看到這我們可以猜想由於我們提供了兩個輸入文件,所以會有2個map任務。我們此處姑且不論這種猜測正確與否,現在我們打算改變map任務的數量。通過查看文檔,很多人知道使用mapreduce.job.maps參數可以快速修改map任務的數量,事實果真如此?讓我們先來實驗一番,輸入以下命令:

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.job.maps=1 /wordcount/input /wordcount/output/result2

執行以上命令后,觀察輸出的信息,與之前未添加mapreduce.job.maps參數的輸出信息幾乎沒有變化。難道Hadoop的實現人員開了一個玩笑,亦或者這是一個bug?我們先給這個問題在我們的大腦中設置一個檢查點,最后再來看看究竟是怎么回事。

第二次優化

  用mapreduce.job.maps調整map任務數量沒有見效,我們翻翻文檔,發現還有mapreduce.input.fileinputformat.split.minsize參數,它可以控制map任務輸入划分的最小字節數。這個參數和mapreduce.input.fileinputformat.split.maxsize通常配合使用,后者控制map任務輸入划分的最大字節數。我們目前只調整mapreduce.input.fileinputformat.split.minsize的大小,划分最小的尺寸變小是否預示着任務划分數量變多?來看看會發生什么?輸入以下命令:

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.minsize=1 /wordcount/input /wordcount/output/result3

執行以上命令后,觀察輸出信息,依然未發生改變。好吧,弟弟不給力,我們用它的兄弟參數mapreduce.input.fileinputformat.split.maxsize來控制。如果我們將mapreduce.input.fileinputformat.split.maxsize改得很小,會怎么樣?輸入以下命令:

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize=1 /wordcount/input /wordcount/output/result4

這是的信息有了改變,我們似乎取得了想要的結果:

呵呵,任務划分成了177個,想想也是,我們把最大的划分字節數僅僅設置為1字節。接着往下看確實執行了177個map任務:

我們還可以通過Web UI觀察map任務所分配的Container。首先查看Slave1節點上分配的Container情況:

再來看看Slave2節點上分配的Container情況:

確實說明最多有15個Container分配給當前作業執行map任務。由於在YARN中yarn.nodemanager.resource.cpu-vcores參數的默認值是8,所以Slave1和Slave2兩台機器上的虛擬cpu總數是16,由於ResourceManager會為mapreduce任務分配一個Container給ApplicationMaster(即MrAppMaster),所以整個集群只剩余了15個Container用於ApplicationMaster向NodeManager申請和運行map任務。

第三次優化

  閱讀文檔我們知道dfs.blocksize可以控制塊的大小,看看這個參數能否發揮作用。為便於測試,我們首先需要修改hdfs-site.xml中dfs.blocksize的大小為10m(最小就只能這么小,Hadoop限制了參數單位至少是10m)。

<property>
  <name>dfs.blocksize</name>
  <value>10m</value>
</property>

然后,將此配置復制到集群的所有NameNode和DataNode上。為了使此配置在不重啟的情況下生效,在NameNode節點上執行以下命令:

hadoop dfsadmin -refreshNodes
yarn rmadmin -refreshNodes

我們使用以下命令查看下系統內的文件所占用的blocksize大小:

hadoop dfs -stat "%b %n %o %r %y" /wordcount/input/quangle*

輸出結果如下:

可以看到雖然quangle.txt和quangle2.txt的字節數分別是121字節和56字節,但是在hdfs中這兩個文件的blockSize已經是10m了。現在我們試試以下命令:

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount /wordcount/input /wordcount/output/result5

觀察輸出信息,發現沒有任何效果。

源碼分析

  經過以上3次不同實驗,發現只有mapreduce.input.fileinputformat.split.maxsize參數確實影響了map任務的數量。現在我們通過源碼分析,來一探究竟吧。
首先我們看看WordCount例子的源碼,其中和任務划分有關的代碼如下:

for (int i = 0; i < otherArgs.length - 1; ++i) {
  FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);

我們看到使用的InputFormat是FileOutputFormat,任務執行調用了Job的waitForCompletion方法。waitForCompletion方法中真正提交job的代碼如下:

public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {
 if (state == JobState.DEFINE) {
    submit();
  }
  // 省略本文不關心的代碼
  return isSuccessful();
}

這里的submit方法的實現如下:

public void submit() throws IOException, InterruptedException, ClassNotFoundException {
  // 省略本文不關心的代碼</span>
  final JobSubmitter submitter = 
    getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
  status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
    public JobStatus run() throws IOException, InterruptedException, 
    ClassNotFoundException {
      return submitter.submitJobInternal(Job.this, cluster);
    }
  });
  state = JobState.RUNNING;
  LOG.info("The url to track the job: " + getTrackingURL());
}

submit方法首先創建了JobSubmitter實例,然后異步調用了JobSubmitter的submitJobInternal方法。JobSubmitter的submitJobInternal方法有關划分任務的代碼如下:

// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);

writeSplits方法的實現如下:

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
  Path jobSubmitDir) throws IOException,
  InterruptedException, ClassNotFoundException {
  JobConf jConf = (JobConf)job.getConfiguration();
  int maps;
  if (jConf.getUseNewMapper()) {
    maps = writeNewSplits(job, jobSubmitDir);
  } else {
    maps = writeOldSplits(jConf, jobSubmitDir);
  }
  return maps;
}

由於WordCount使用的是新的mapreduce API,所以最終會調用writeNewSplits方法。writeNewSplits的實現如下:

private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
  Configuration conf = job.getConfiguration();
  InputFormat<?, ?> input =
  ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

  List<InputSplit> splits = input.getSplits(job);
  T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

  // sort the splits into order based on size, so that the biggest
  // go first
  Arrays.sort(array, new SplitComparator());
  JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
  jobSubmitDir.getFileSystem(conf), array);
  return array.length;
}

writeNewSplits方法中,划分任務數量最關鍵的代碼即為InputFormat的getSplits方法(提示:大家可以直接通過此處的調用,查看不同InputFormat的划分任務實現)。根據前面的分析我們知道此時的InputFormat即為FileOutputFormat,其getSplits方法的實現如下:

public List<InputSplit> getSplits(JobContext job) throws IOException {
  Stopwatch sw = new Stopwatch().start();
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  long maxSize = getMaxSplitSize(job);

  // generate splits
  List<InputSplit> splits = new ArrayList<InputSplit>();
  List<FileStatus> files = listStatus(job);
  for (FileStatus file: files) {
    Path path = file.getPath();
    long length = file.getLen();
    if (length != 0) {
      BlockLocation[] blkLocations;
      if (file instanceof LocatedFileStatus) {
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();
      } else {
        FileSystem fs = path.getFileSystem(job.getConfiguration());
        blkLocations = fs.getFileBlockLocations(file, 0, length);
      }
      if (isSplitable(job, path)) {
        long blockSize = file.getBlockSize();
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);

        long bytesRemaining = length;
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
            blkLocations[blkIndex].getHosts(),
            blkLocations[blkIndex].getCachedHosts()));
          bytesRemaining -= splitSize;
        }

        if (bytesRemaining != 0) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
            blkLocations[blkIndex].getHosts(),
            blkLocations[blkIndex].getCachedHosts()));
        }
      } else { // not splitable
        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
        blkLocations[0].getCachedHosts()));
      }
    } else { 
      //Create empty hosts array for zero length files
      splits.add(makeSplit(path, 0, length, new String[0]));
    }
  }
  // Save the number of input files for metrics/loadgen
  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  sw.stop();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Total # of splits generated by getSplits: " + splits.size()
      + ", TimeTaken: " + sw.elapsedMillis());
  }
  return splits;
}

getFormatMinSplitSize方法固定返回1,getMinSplitSize方法實際就是mapreduce.input.fileinputformat.split.minsize參數的值(默認為1),那么變量minSize的大小為mapreduce.input.fileinputformat.split.minsize與1之間的最大值。
getMaxSplitSize方法實際是mapreduce.input.fileinputformat.split.maxsize參數的值,那么maxSize即為mapreduce.input.fileinputformat.split.maxsize參數的值。
由於我的試驗中有兩個輸入源文件,所以List<FileStatus> files = listStatus(job);方法返回的files列表的大小為2。
在遍歷files列表的過程中,會獲取每個文件的blockSize,最終調用computeSplitSize方法計算每個輸入文件應當划分的任務數。computeSplitSize方法的實現如下:

protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
  return Math.max(minSize, Math.min(maxSize, blockSize));
}

因此我們知道每個輸入文件被划分的公式如下:
map任務要划分的大小(splitSize )=(maxSize與blockSize之間的最小值)與minSize之間的最大值
bytesRemaining 是單個輸入源文件未划分的字節數
根據getSplits方法,我們知道map任務划分的數量=輸入源文件數目 * (bytesRemaining / splitSize個划分任務+bytesRemaining不能被splitSize 整除的剩余大小單獨划分一個任務 )
總結
根據源碼分析得到的計算方法和之前的優化結果,我們最后總結一下:
對於第一次優化,由於FileOutputFormat壓根沒有采用mapreduce.job.maps參數指定的值,所以它當然不會有任何作用。
對於第二次優化,minSize幾乎由mapreduce.input.fileinputformat.split.minsize控制;mapreduce.input.fileinputformat.split.maxsize默認的大小是Long.MAX_VALUE,所以blockSize即為maxSize與blockSize之間的最小值;blockSize的默認大小是128m,所以blockSize與值為1的mapreduce.input.fileinputformat.split.minsize之間的最大值為blockSize,即map任務要划分的大小的大小與blockSize相同。
對於第三次優化,雖然我們將blockSize設置為10m(最小也只能這么小了,hdfs對於block大小的最低限制),根據以上公式maxSize與blockSize之間的最小值必然是blockSize,而blockSize與minSize之間的最大值也必然是blockSize。說明blockSize實際上已經發揮了作用,它決定了splitSize的大小就是blockSize。由於blockSize大於bytesRemaining,所以並沒有對map任務數量產生影響。
針對以上分析,我們用更加容易理解的方式列出這些配置參數的關系:
當mapreduce.input.fileinputformat.split.maxsize > mapreduce.input.fileinputformat.split.minsize > dfs.blockSize的情況下,此時的splitSize 將由mapreduce.input.fileinputformat.split.minsize參數決定。
當mapreduce.input.fileinputformat.split.maxsize > dfs.blockSize > mapreduce.input.fileinputformat.split.minsize的情況下,此時的splitSize 將由dfs.blockSize配置決定。(第二次優化符合此種情況)
當dfs.blockSize > mapreduce.input.fileinputformat.split.maxsize > mapreduce.input.fileinputformat.split.minsize的情況下,此時的splitSize 將由mapreduce.input.fileinputformat.split.maxsize參數決定。

鳴謝

  我在試驗的過程中,遇到很多問題。但是很多問題在網絡上都能找到,特此感謝在互聯網上分享經驗的同仁們。

 

后記:個人總結整理的《深入理解Spark:核心思想與源碼分析》一書現在已經正式出版上市,目前京東、當當、天貓等網站均有銷售,歡迎感興趣的同學購買。

京東(現有滿150減50活動)):http://item.jd.com/11846120.html
當當:http://product.dangdang.com/23838168.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM