前言
首先確保已經搭建好Hadoop集群環境,可以參考《Linux下Hadoop集群環境的搭建》一文的內容。我在測試mapreduce任務時,發現相比於使用Job.setNumReduceTasks(int)控制reduce任務數量而言,控制map任務數量一直是一個困擾我的問題。好在經過很多摸索與實驗,終於梳理出來,希望對在工作中進行Hadoop進行性能調優的新人們有個借鑒。本文只針對FileInputFormat的任務划分進行分析,其它類型的InputFormat的划分方式又各有不同。雖然如此,都可以按照本文類似的方法進行分析和總結。
為了簡便起見,本文以Hadoop2.6.0自帶的word count例子為例,進行展開。
wordcount
我們首先准備好wordcount所需的數據,一共有兩份文件,都位於hdfs的/wordcount/input目錄下:
這兩個文件的內容分別為:
On the top of the Crumpretty Tree The Quangle Wangle sat, But his face you could not see, On account of his Beaver Hat.
和
But his face you could not see, On account of his Beaver Hat.
有關如何操作hdfs並准備好數據的細節,本文不作贅述。
現在我們不作任何性能優化(不增加任何配置參數),然后執行hadoop-mapreduce-examples子項目(有關此項目介紹,可以閱讀《Hadoop2.6.0子項目hadoop-mapreduce-examples的簡單介紹》一文)中自帶的wordcount例子:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount /wordcount/input /wordcount/output/result1
當然也可以使用朴素的方式運行wordcount例子:
hadoop org.apache.hadoop.examples.WordCount -D mapreduce.input.fileinputformat.split.maxsize=1 /wordcount/input /wordcount/output/result1
最后執行的結果在hdfs的/wordcount/output/result1目錄下:
執行結果可以查看/wordcount/output/result1/part-r-00000的內容:
第一次優化
wordcount例子,查看運行結果不是本文的目的。在執行wordcount例子時,在任務運行信息中可以看到創建的map及reduce任務的數量:
可以看到FileInputFormat的輸入文件有2個,JobSubmitter任務划分的數量是2,最后產生的map任務數量也是2,看到這我們可以猜想由於我們提供了兩個輸入文件,所以會有2個map任務。我們此處姑且不論這種猜測正確與否,現在我們打算改變map任務的數量。通過查看文檔,很多人知道使用mapreduce.job.maps參數可以快速修改map任務的數量,事實果真如此?讓我們先來實驗一番,輸入以下命令:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.job.maps=1 /wordcount/input /wordcount/output/result2
執行以上命令后,觀察輸出的信息,與之前未添加mapreduce.job.maps參數的輸出信息幾乎沒有變化。難道Hadoop的實現人員開了一個玩笑,亦或者這是一個bug?我們先給這個問題在我們的大腦中設置一個檢查點,最后再來看看究竟是怎么回事。
第二次優化
用mapreduce.job.maps調整map任務數量沒有見效,我們翻翻文檔,發現還有mapreduce.input.fileinputformat.split.minsize參數,它可以控制map任務輸入划分的最小字節數。這個參數和mapreduce.input.fileinputformat.split.maxsize通常配合使用,后者控制map任務輸入划分的最大字節數。我們目前只調整mapreduce.input.fileinputformat.split.minsize的大小,划分最小的尺寸變小是否預示着任務划分數量變多?來看看會發生什么?輸入以下命令:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.minsize=1 /wordcount/input /wordcount/output/result3
執行以上命令后,觀察輸出信息,依然未發生改變。好吧,弟弟不給力,我們用它的兄弟參數mapreduce.input.fileinputformat.split.maxsize來控制。如果我們將mapreduce.input.fileinputformat.split.maxsize改得很小,會怎么樣?輸入以下命令:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize=1 /wordcount/input /wordcount/output/result4
這是的信息有了改變,我們似乎取得了想要的結果:
呵呵,任務划分成了177個,想想也是,我們把最大的划分字節數僅僅設置為1字節。接着往下看確實執行了177個map任務:
我們還可以通過Web UI觀察map任務所分配的Container。首先查看Slave1節點上分配的Container情況:
再來看看Slave2節點上分配的Container情況:
確實說明最多有15個Container分配給當前作業執行map任務。由於在YARN中yarn.nodemanager.resource.cpu-vcores參數的默認值是8,所以Slave1和Slave2兩台機器上的虛擬cpu總數是16,由於ResourceManager會為mapreduce任務分配一個Container給ApplicationMaster(即MrAppMaster),所以整個集群只剩余了15個Container用於ApplicationMaster向NodeManager申請和運行map任務。
第三次優化
閱讀文檔我們知道dfs.blocksize可以控制塊的大小,看看這個參數能否發揮作用。為便於測試,我們首先需要修改hdfs-site.xml中dfs.blocksize的大小為10m(最小就只能這么小,Hadoop限制了參數單位至少是10m)。
<property> <name>dfs.blocksize</name> <value>10m</value> </property>
然后,將此配置復制到集群的所有NameNode和DataNode上。為了使此配置在不重啟的情況下生效,在NameNode節點上執行以下命令:
hadoop dfsadmin -refreshNodes
yarn rmadmin -refreshNodes
我們使用以下命令查看下系統內的文件所占用的blocksize大小:
hadoop dfs -stat "%b %n %o %r %y" /wordcount/input/quangle*
輸出結果如下:
可以看到雖然quangle.txt和quangle2.txt的字節數分別是121字節和56字節,但是在hdfs中這兩個文件的blockSize已經是10m了。現在我們試試以下命令:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount /wordcount/input /wordcount/output/result5
觀察輸出信息,發現沒有任何效果。
源碼分析
經過以上3次不同實驗,發現只有mapreduce.input.fileinputformat.split.maxsize參數確實影響了map任務的數量。現在我們通過源碼分析,來一探究竟吧。
首先我們看看WordCount例子的源碼,其中和任務划分有關的代碼如下:
for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1);
我們看到使用的InputFormat是FileOutputFormat,任務執行調用了Job的waitForCompletion方法。waitForCompletion方法中真正提交job的代碼如下:
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } // 省略本文不關心的代碼 return isSuccessful(); }
這里的submit方法的實現如下:
public void submit() throws IOException, InterruptedException, ClassNotFoundException { // 省略本文不關心的代碼</span> final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
submit方法首先創建了JobSubmitter實例,然后異步調用了JobSubmitter的submitJobInternal方法。JobSubmitter的submitJobInternal方法有關划分任務的代碼如下:
// Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps);
writeSplits方法的實現如下:
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { maps = writeNewSplits(job, jobSubmitDir); } else { maps = writeOldSplits(jConf, jobSubmitDir); } return maps; }
由於WordCount使用的是新的mapreduce API,所以最終會調用writeNewSplits方法。writeNewSplits的實現如下:
private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
writeNewSplits方法中,划分任務數量最關鍵的代碼即為InputFormat的getSplits方法(提示:大家可以直接通過此處的調用,查看不同InputFormat的划分任務實現)。根據前面的分析我們知道此時的InputFormat即為FileOutputFormat,其getSplits方法的實現如下:
public List<InputSplit> getSplits(JobContext job) throws IOException { Stopwatch sw = new Stopwatch().start(); long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis()); } return splits; }
getFormatMinSplitSize方法固定返回1,getMinSplitSize方法實際就是mapreduce.input.fileinputformat.split.minsize參數的值(默認為1),那么變量minSize的大小為mapreduce.input.fileinputformat.split.minsize與1之間的最大值。
getMaxSplitSize方法實際是mapreduce.input.fileinputformat.split.maxsize參數的值,那么maxSize即為mapreduce.input.fileinputformat.split.maxsize參數的值。
由於我的試驗中有兩個輸入源文件,所以List<FileStatus> files = listStatus(job);方法返回的files列表的大小為2。
在遍歷files列表的過程中,會獲取每個文件的blockSize,最終調用computeSplitSize方法計算每個輸入文件應當划分的任務數。computeSplitSize方法的實現如下:
protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
因此我們知道每個輸入文件被划分的公式如下:
map任務要划分的大小(splitSize )=(maxSize與blockSize之間的最小值)與minSize之間的最大值
bytesRemaining 是單個輸入源文件未划分的字節數
根據getSplits方法,我們知道map任務划分的數量=輸入源文件數目 * (bytesRemaining / splitSize個划分任務+bytesRemaining不能被splitSize 整除的剩余大小單獨划分一個任務 )
總結
根據源碼分析得到的計算方法和之前的優化結果,我們最后總結一下:
對於第一次優化,由於FileOutputFormat壓根沒有采用mapreduce.job.maps參數指定的值,所以它當然不會有任何作用。
對於第二次優化,minSize幾乎由mapreduce.input.fileinputformat.split.minsize控制;mapreduce.input.fileinputformat.split.maxsize默認的大小是Long.MAX_VALUE,所以blockSize即為maxSize與blockSize之間的最小值;blockSize的默認大小是128m,所以blockSize與值為1的mapreduce.input.fileinputformat.split.minsize之間的最大值為blockSize,即map任務要划分的大小的大小與blockSize相同。
對於第三次優化,雖然我們將blockSize設置為10m(最小也只能這么小了,hdfs對於block大小的最低限制),根據以上公式maxSize與blockSize之間的最小值必然是blockSize,而blockSize與minSize之間的最大值也必然是blockSize。說明blockSize實際上已經發揮了作用,它決定了splitSize的大小就是blockSize。由於blockSize大於bytesRemaining,所以並沒有對map任務數量產生影響。
針對以上分析,我們用更加容易理解的方式列出這些配置參數的關系:
當mapreduce.input.fileinputformat.split.maxsize > mapreduce.input.fileinputformat.split.minsize > dfs.blockSize的情況下,此時的splitSize 將由mapreduce.input.fileinputformat.split.minsize參數決定。
當mapreduce.input.fileinputformat.split.maxsize > dfs.blockSize > mapreduce.input.fileinputformat.split.minsize的情況下,此時的splitSize 將由dfs.blockSize配置決定。(第二次優化符合此種情況)
當dfs.blockSize > mapreduce.input.fileinputformat.split.maxsize > mapreduce.input.fileinputformat.split.minsize的情況下,此時的splitSize 將由mapreduce.input.fileinputformat.split.maxsize參數決定。
鳴謝
我在試驗的過程中,遇到很多問題。但是很多問題在網絡上都能找到,特此感謝在互聯網上分享經驗的同仁們。
后記:個人總結整理的《深入理解Spark:核心思想與源碼分析》一書現在已經正式出版上市,目前京東、當當、天貓等網站均有銷售,歡迎感興趣的同學購買。
京東(現有滿150減50活動)):http://item.jd.com/11846120.html
當當:http://product.dangdang.com/23838168.html