1 概述
該瞅瞅MapReduce的內部運行原理了,以前只知道個皮毛,再不搞搞,不然怎么死的都不曉得。下文會以2.4版本中的WordCount這個經典例子作為分析的切入點,一步步來看里面到底是個什么情況。
2 為什么要使用MapReduce
Map/Reduce,是一種模式,適合解決並行計算的問題,比如TopN、貝葉斯分類等。注意,是並行計算,而非迭代計算,像涉及到層次聚類的問題就不太適合了。
從名字可以看出,這種模式有兩個步驟,Map和Reduce。Map即數據的映射,用於把一組鍵值對映射成另一組新的鍵值對,而Reduce這個東東,以Map階段的輸出結果作為輸入,對數據做化簡、合並等操作。
而MapReduce是Hadoop生態系統中基於底層HDFS的一個計算框架,它的上層又可以是Hive、Pig等數據倉庫框架,也可以是Mahout這樣的數據挖掘工具。由於MapReduce依賴於HDFS,其運算過程中的數據等會保存到HDFS上,把對數據集的計算分發給各個節點,並將結果進行匯總,再加上各種狀態匯報、心跳匯報等,其只適合做離線計算。和實時計算框架Storm、Spark等相比,速度上沒有優勢。舊的Hadoop生態幾乎是以MapReduce為核心的,但是慢慢的發展,其擴展性差、資源利用率低、可靠性等問題都越來越讓人覺得不爽,於是才產生了Yarn這個新的東東,並且二代版的Hadoop生態都是以Yarn為核心。Storm、Spark等都可以基於Yarn使用。
3 怎么運行MapReduce
明白了哪些地方可以使用這個牛叉的MapReduce框架,那該怎么用呢?Hadoop的MapReduce源碼給我們提供了范例,在其hadoop-mapreduce-examples子工程中包含了MapReduce的Java版例子。在寫完類似的代碼后,打包成jar,在HDFS的客戶端運行:
bin/hadoop jar mapreduce_examples.jar mainClass args
即可。當然,也可以在IDE(如Eclipse)中,進行遠程運行、調試程序。
至於,HadoopStreaming方式,網上有很多。我們這里只討論Java的實現。
4 如何編寫MapReduce程序
如前文所說,MapReduce中有Map和Reduce,在實現MapReduce的過程中,主要分為這兩個階段,分別以兩類函數進行展現,一個是map函數,一個是reduce函數。map函數的參數是一個<key,value>鍵值對,其輸出結果也是鍵值對,reduce函數以map的輸出作為輸入進行處理。
4.1 代碼構成
實際的代碼中,需要三個元素,分別是Map、Reduce、運行任務的代碼。這里的Map類是繼承了org.apache.hadoop.mapreduce.Mapper,並實現其中的map方法;而Reduce類是繼承了org.apache.hadoop.mapreduce.Reducer,實現其中的reduce方法。至於運行任務的代碼,就是我們程序的入口。
下面是Hadoop提供的WordCount源碼。

1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.examples; 19 20 import java.io.IOException; 21 import java.util.StringTokenizer; 22 23 import org.apache.hadoop.conf.Configuration; 24 import org.apache.hadoop.fs.Path; 25 import org.apache.hadoop.io.IntWritable; 26 import org.apache.hadoop.io.Text; 27 import org.apache.hadoop.mapreduce.Job; 28 import org.apache.hadoop.mapreduce.Mapper; 29 import org.apache.hadoop.mapreduce.Reducer; 30 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 32 import org.apache.hadoop.util.GenericOptionsParser; 33 34 public class WordCount { 35 36 public static class TokenizerMapper 37 extends Mapper<Object, Text, Text, IntWritable>{ 38 39 private final static IntWritable one = new IntWritable(1); 40 private Text word = new Text(); 41 42 public void map(Object key, Text value, Context context 43 ) throws IOException, InterruptedException { 44 StringTokenizer itr = new StringTokenizer(value.toString()); 45 while (itr.hasMoreTokens()) { 46 word.set(itr.nextToken()); 47 context.write(word, one); 48 } 49 } 50 } 51 52 public static class IntSumReducer 53 extends Reducer<Text,IntWritable,Text,IntWritable> { 54 private IntWritable result = new IntWritable(); 55 56 public void reduce(Text key, Iterable<IntWritable> values, 57 Context context 58 ) throws IOException, InterruptedException { 59 int sum = 0; 60 for (IntWritable val : values) { 61 sum += val.get(); 62 } 63 result.set(sum); 64 context.write(key, result); 65 } 66 } 67 68 public static void main(String[] args) throws Exception { 69 Configuration conf = new Configuration(); 70 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 71 if (otherArgs.length != 2) { 72 System.err.println("Usage: wordcount <in> <out>"); 73 System.exit(2); 74 } 75 Job job = new Job(conf, "word count"); 76 job.setJarByClass(WordCount.class); 77 job.setMapperClass(TokenizerMapper.class); 78 job.setCombinerClass(IntSumReducer.class); 79 job.setReducerClass(IntSumReducer.class); 80 job.setOutputKeyClass(Text.class); 81 job.setOutputValueClass(IntWritable.class); 82 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 83 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 84 System.exit(job.waitForCompletion(true) ? 0 : 1); 85 } 86 }
4.2 入口類
4.2.1 參數獲取
首先定義配置文件類Configuration,此類是Hadoop各個模塊的公共使用類,用於加載類路徑下的各種配置文件,讀寫其中的配置選項。
第二步中,用到了GenericOptionsParser類,其目的是將命令行中參數自動設置到變量conf中。
GenericOptionsParser的構造方法進去之后,會進行到parseGeneralOptions,對傳入的參數進行解析:
1 private void parseGeneralOptions(Options opts, Configuration conf, 2 3 String[] args) throws IOException { 4 5 opts = buildGeneralOptions(opts); 6 7 CommandLineParser parser = new GnuParser(); 8 9 try { 10 11 commandLine = parser.parse(opts, preProcessForWindows(args), true); 12 13 processGeneralOptions(conf, commandLine); 14 15 } catch(ParseException e) { 16 17 LOG.warn("options parsing failed: "+e.getMessage()); 18 19 20 21 HelpFormatter formatter = new HelpFormatter(); 22 23 formatter.printHelp("general options are: ", opts); 24 25 } 26 27 }
而getRemainingArgs方法會獲得傳入的參數,接着在main方法中會進行判斷參數的個數,由於此處是WordCount計算,只需要傳入文件的輸入路徑和輸出路徑即可,因此參數的個數為2,否則將退出:
1 if (otherArgs.length != 2) { 2 3 System.err.println("Usage: wordcount <in> <out>"); 4 5 System.exit(2); 6 7 }
如果在代碼運行的時候傳入其他的參數,比如指定reduce的個數,可以根據GenericOptionsParser的命令行格式這么寫:
bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5
其規則是-D加MapReduce的配置選項,當然還支持-fs等其他參數傳入。當然,默認情況下Reduce的數目為1,Map的數目也為1。
4.2.2 Job定義
定義Job對象,其構造方法為:
1 public Job(Configuration conf, String jobName) throws IOException { 2 3 this(conf); 4 5 setJobName(jobName); 6 7 }
可見,傳入的"word count"就是Job的名字。而conf被傳遞給了JobConf進行環境變量的獲取:
1 public JobConf(Configuration conf) { 2 3 super(conf); 6 7 if (conf instanceof JobConf) { 8 9 JobConf that = (JobConf)conf; 10 11 credentials = that.credentials; 12 13 } 14 checkAndWarnDeprecation(); 19 }
Job已經實例化了,下面就得給這個Job加點佐料才能讓它按照我們的要求運行。於是依次給Job添加啟動Jar包、設置Mapper類、設置合並類、設置Reducer類、設置輸出鍵類型、設置輸出值的類型。
這里有必要說下設置Jar包的這個方法setJarByClass:
1 public void setJarByClass(Class<?> cls) { 2 3 ensureState(JobState.DEFINE); 4 5 conf.setJarByClass(cls); 6 7 }
它會首先判斷當前Job的狀態是否是運行中,接着通過class找到其所屬的jar文件,將jar路徑賦值給mapreduce.job.jar屬性。至於尋找jar文件的方法,則是通過classloader獲取類路徑下的資源文件,進行循環遍歷。具體實現見ClassUtil類中的findContainingJar方法。
搞完了上面的東西,緊接着就會給mapreduce.input.fileinputformat.inputdir參數賦值,這是Job的輸入路徑,還有mapreduce.input.fileinputformat.inputdir,這是Job的輸出路徑。具體的位置,就是我們前面main中傳入的Args。
4.2.3 Job提交
萬事俱備,那就運行吧。
這里調用的方法如下:
1 public boolean waitForCompletion(boolean verbose 2 3 ) throws IOException, InterruptedException, 4 5 ClassNotFoundException { 6 7 if (state == JobState.DEFINE) { 8 9 submit(); 10 11 } 12 13 if (verbose) { 14 15 monitorAndPrintJob(); 16 17 } else { 18 19 // get the completion poll interval from the client. 20 21 int completionPollIntervalMillis = 22 23 Job.getCompletionPollInterval(cluster.getConf()); 24 25 while (!isComplete()) { 26 27 try { 28 29 Thread.sleep(completionPollIntervalMillis); 30 31 } catch (InterruptedException ie) { 32 33 } 34 35 } 36 37 } 38 39 return isSuccessful(); 40 41 }
至於方法的參數verbose,如果想在控制台打印當前的進度,則設置為true。
至於submit方法,如果當前在HDFS的配置文件中配置了mapreduce.framework.name屬性為“yarn”的話,會創建一個YARNRunner對象來進行任務的提交。其構造方法如下:
1 public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate, 2 3 ClientCache clientCache) { 4 5 this.conf = conf; 6 7 try { 8 9 this.resMgrDelegate = resMgrDelegate; 10 11 this.clientCache = clientCache; 12 13 this.defaultFileContext = FileContext.getFileContext(this.conf); 14 15 } catch (UnsupportedFileSystemException ufe) { 16 17 throw new RuntimeException("Error in instantiating YarnClient", ufe); 18 19 } 20 21 }
其中,ResourceMgrDelegate實際上ResourceManager的代理類,其實現了YarnClient接口,通過ApplicationClientProtocol代理直接向RM提交Job,殺死Job,查看Job運行狀態等操作。同時,在ResourceMgrDelegate類中會通過YarnConfiguration來讀取yarn-site.xml、core-site.xml等配置文件中的配置屬性。
下面就到了客戶端最關鍵的時刻了,提交Job到集群運行。具體實現類是JobSubmitter類中的submitJobInternal方法。這個牛氣哄哄的方法寫了100多行,還不算其幾十行的注釋。我們看它干了點啥。
Step1:
檢查job的輸出路徑是否存在,如果存在則拋出異常。
Step2:
初始化用於存放Job相關資源的路徑。注意此路徑的構造方式為:
1 conf.get(MRJobConfig.MR_AM_STAGING_DIR, 2 3 MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) 4 5 + Path.SEPARATOR + user 6 7 + Path.SEPARATOR + STAGING_CONSTANT
其中,MRJobConfig.DEFAULT_MR_AM_STAGING_DIR為“/tmp/hadoop-yarn/staging”,STAGING_CONSTANT為".staging"。
Step3:
設置客戶端的host屬性:mapreduce.job.submithostname和mapreduce.job.submithostaddress。
Step4:
通過RPC,向Yarn的ResourceManager申請JobID對象。
Step5:
從HDFS的NameNode獲取驗證用的Token,並將其放入緩存。
Step6:
將作業文件上傳到HDFS,這里如果我們前面沒有對Job命名的話,默認的名稱就會在這里設置成jar的名字。並且,作業默認的副本數是10,如果屬性mapreduce.client.submit.file.replication沒有被設置的話。
Step7:
文件上傳到HDFS之后,還要被DistributedCache進行緩存起來。這是因為計算節點收到該作業的第一個任務后,就會有DistributedCache自動將作業文件Cache到節點本地目錄下,並且會對壓縮文件進行解壓,如:.zip,.jar,.tar等等,然后開始任務。
最后,對於同一個計算節點接下來收到的任務,DistributedCache不會重復去下載作業文件,而是直接運行任務。如果一個作業的任務數很多,這種設計避免了在同一個節點上對用一個job的文件會下載多次,大大提高了任務運行的效率。
Step8:
對每個輸入文件進行split划分。注意這只是個邏輯的划分,不是物理的。因為此處是輸入文件,因此執行的是FileInputFormat類中的getSplits方法。只有非壓縮的文件和幾種特定壓縮方式壓縮后的文件才分片。分片的大小由如下幾個參數決定:mapreduce.input.fileinputformat.split.maxsize、mapreduce.input.fileinputformat.split.minsize、文件的塊大小。
具體計算方式為:
Math.max(minSize, Math.min(maxSize, blockSize))
分片的大小有可能比默認塊大小64M要大,當然也有可能小於它,默認情況下分片大小為當前HDFS的塊大小,64M。
接下來就該正兒八經的獲取分片詳情了。代碼如下:
1 long bytesRemaining = length; 2 3 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 4 5 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 6 7 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 9 blkLocations[blkIndex].getHosts())); 10 11 bytesRemaining -= splitSize; 13 } 15 16 if (bytesRemaining != 0) { 18 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 19 20 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 22 blkLocations[blkIndex].getHosts())); 23 24 }
Step8.1:
將bytesRemaining(剩余未分片字節數)設置為整個文件的長度。
Step8.2:
如果bytesRemaining超過分片大小splitSize一定量才會將文件分成多個InputSplit,SPLIT_SLOP(默認1.1)。接着就會執行如下方法獲取block的索引,其中第二個參數是這個block在整個文件中的偏移量,在循環中會從0越來越大:
1 protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { 4 for (int i = 0 ; i < blkLocations.length; i++) { 5 // is the offset inside this block? 6 if ((blkLocations[i].getOffset() <= offset) && 7 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 8 return i; 9 } 10 } 11 12 BlockLocation last = blkLocations[blkLocations.length -1]; 13 long fileLength = last.getOffset() + last.getLength() -1; 14 throw new IllegalArgumentException("Offset " + offset + " is outside of file (0.." + fileLength + ")"); 17 }
將符合條件的塊的索引對應的block信息的主機節點以及文件的路徑名、開始的偏移量、分片大小splitSize封裝到一個InputSplit中加入List<InputSplit> splits。
Step8.3:
bytesRemaining -= splitSize修改剩余字節大小。剩余如果bytesRemaining還不為0,表示還有未分配的數據,將剩余的數據及最后一個block加入splits。
Step8.4
如果不允許分割isSplitable==false,則將第一個block、文件目錄、開始位置為0,長度為整個文件的長度封裝到一個InputSplit,加入splits中;如果文件的長度==0,則splits.add(new FileSplit(path, 0, length, new String[0]))沒有block,並且初始和長度都為0;
Step8.5
將輸入目錄下文件的個數賦值給 "mapreduce.input.num.files",方便以后校對,返回分片信息splits。
這就是getSplits獲取分片的過程。當使用基於FileInputFormat實現InputFormat時,為了提高MapTask的數據本地性,應盡量使InputSplit大小與block大小相同。
如果分片大小超過bolck大小,但是InputSplit中的封裝了單個block的所在主機信息啊,這樣能讀取多個bolck數據嗎?
比如當前文件很大,1G,我們設置的最小分片是100M,最大是200M,當前塊大小為64M,經過計算后的實際分片大小是100M,這個時候第二個分片中存放的也只是一個block的host信息。需要注意的是split是邏輯分片,不是物理分片,當Map任務需要的數據本地性發揮作用時,會從本機的block開始讀取,超過這個block的部分可能不在本機,這就需要從別的DataNode拉數據過來,因為實際獲取數據是一個輸入流,這個輸入流面向的是整個文件,不受split的影響,split的大小越大可能需要從別的節點拉的數據越多,從從而效率也會越慢,拉數據的多少是由getSplits方法中的splitSize決定的。所以為了更有效率,分片的大小盡量保持在一個block大小吧。
Step9:
將split信息和SplitMetaInfo都寫入HDFS中。使用方法:
1 JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
Step10:
對Map數目設置,上面獲得到的split的個數就是實際的Map任務的數目。
Step11:
相關配置寫入到job.xml中:
1 jobCopy.writeXml(out);
Step12:
通過如下代碼正式提交Job到Yarn:
1 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
這里就涉及到YarnClient和RresourceManager的RPC通信了。包括獲取applicationId、進行狀態檢查、網絡通信等。
Step13:
上面通過RPC的調用,最后會返回一個JobStatus對象,它的toString方法可以在JobClient端打印運行的相關日志信息。
4.2.4 另一種運行方式
提交MapReduce任務的方式除了上述源碼中給出的之外,還可以使用ToolRunner方式。具體方式為:
1 ToolRunner.run(new Configuration(),new WordCount(), args);
至此,我們的MapReduce的啟動類要做的事情已經分析完了。
-------------------------------------------------------------------------------
如果您看了本篇博客,覺得對您有所收獲,請點擊右下角的 [推薦]
如果您想轉載本博客,請注明出處
如果您對本文有意見或者建議,歡迎留言
感謝您的閱讀,請關注我的后續博客