本文是對Hadoop2.2.0版本的MapReduce進行詳細講解。請大家要注意版本,因為Hadoop的不同版本,源碼可能是不同的。
以下是本文的大綱:
1.獲取源碼
2.WordCount案例分析
3.客戶端源碼分析
4.小結
5.Mapper詳解
5.1.map輸入
5.2.map輸出
5.3.map小結
6.Reduce詳解
7.總結
若有不正之處,還請多多諒解,並希望批評指正。
請尊重作者勞動成果,轉發請標明blog地址
https://www.cnblogs.com/hongten/p/hongten_hadoop_mapreduce.html
1.獲取源碼
大家可以下載Hbase
Hbase: hbase-0.98.9-hadoop2-bin.tar.gz
在里面就包含了Hadoop2.2.0版本的jar文件和源碼。
2.WordCount案例分析
在做詳解之前,我們先來看一個例子,就是在一個文件中有一下的內容
hello hongten 1 hello hongten 2 hello hongten 3 hello hongten 4 hello hongten 5 ...... ......
文件中每一行包含一個hello,一個hongten,然后在每一行最后有一個數字,這個數字是遞增的。
我們要統計這個文件里面的單詞出現的次數(這個可以在網上找到很多相同的例子)
首先,我們要產生這個文件,大家可以使用以下的java代碼生成這個文件
1 import java.io.BufferedWriter; 2 import java.io.File; 3 import java.io.FileWriter; 4 5 /** 6 * @author Hongten 7 * @created 11 Nov 2018 8 */ 9 public class GenerateWord { 10 11 public static void main(String[] args) throws Exception { 12 13 double num = 12000000; 14 15 StringBuilder sb = new StringBuilder(); 16 for(int i=1;i<num;i++){ 17 sb.append("hello").append(" ").append("hongten").append(" ").append(i).append("\n"); 18 } 19 20 File writename = new File("/root/word.txt"); 21 writename.createNewFile(); 22 BufferedWriter out = new BufferedWriter(new FileWriter(writename)); 23 out.write(sb.toString()); 24 out.flush(); 25 out.close(); 26 System.out.println("done."); 27 } 28 }
進入Linux系統,編譯GenerateWord.java文件
javac GenerateWord.java
編譯好了以后,會生成GenerateWord.class文件,然后執行
java GenerateWord
等待一段時間....就會生成這個文件了(大概252MB左右)。
接下來,我們來寫統計單詞的map,reduce,以及客戶端的實現。
項目結構
這里總共有三個java文件
客戶端
首先,我們需要定義Configuration和job,然后就是job的set操作,最后到job.waitForCompletion()方法,才觸發了動作的提交。
這里可以理解為在客戶端,包含了一個配置分布式運行的相關配置信息,最后提交動作。
1 package com.b510.hongten.hadoop; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 10 11 /** 12 * @author Hongten 13 * @created 11 Nov 2018 14 */ 15 public class WordCount { 16 17 public static void main(String[] args) throws Exception { 18 //讀取配置文件 19 Configuration conf = new Configuration(); 20 //創建job 21 Job job = Job.getInstance(conf); 22 23 // Create a new Job 24 job.setJarByClass(WordCount.class); 25 26 // Specify various job-specific parameters 27 job.setJobName("wordcount"); 28 29 job.setMapperClass(MyMapper.class); 30 job.setMapOutputKeyClass(Text.class); 31 job.setMapOutputValueClass(IntWritable.class); 32 33 job.setReducerClass(MyReducer.class); 34 job.setOutputKeyClass(Text.class); 35 job.setOutputValueClass(IntWritable.class); 36 37 // job.setInputPath(new Path("/usr/input/wordcount")); 38 // job.setOutputPath(new Path("/usr/output/wordcount")); 39 40 FileInputFormat.addInputPath(job, new Path("/usr/input/wordcount1")); 41 42 Path output = new Path("/usr/output/wordcount"); 43 if (output.getFileSystem(conf).exists(output)) { 44 output.getFileSystem(conf).delete(output, true); 45 } 46 47 FileOutputFormat.setOutputPath(job, output); 48 49 // Submit the job, then poll for progress until the job is complete 50 job.waitForCompletion(true); 51 52 } 53 }
自定義的Mapper
1 package com.b510.hongten.hadoop; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 10 /** 11 * @author Hongten 12 * @created 11 Nov 2018 13 */ 14 public class MyMapper extends Mapper<Object, Text, Text, IntWritable> { 15 16 private final static IntWritable one = new IntWritable(1); 17 private Text word = new Text(); 18 19 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 20 StringTokenizer itr = new StringTokenizer(value.toString()); 21 while (itr.hasMoreTokens()) { 22 word.set(itr.nextToken()); 23 context.write(word, one); 24 } 25 } 26 27 }
自定義的Reduce
1 package com.b510.hongten.hadoop; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Reducer; 8 9 /** 10 * @author Hongten 11 * @created 11 Nov 2018 12 */ 13 public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 14 15 private IntWritable result = new IntWritable(); 16 17 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 18 int sum = 0; 19 for (IntWritable val : values) { 20 sum += val.get(); 21 } 22 result.set(sum); 23 context.write(key, result); 24 } 25 26 }
運行並查看結果
cd /home/hadoop-2.5/bin/ --創建測試文件夾 ./hdfs dfs -mkdir -p /usr/input/wordcount1 --把測試文件放入測試文件夾 ./hdfs dfs -put /root/word.txt /usr/input/wordcount1 --運行測試 ./hadoop jar /root/wordcount.jar com.b510.hongten.hadoop.WordCount --下載hdfs上面的文件 ./hdfs dfs -get /usr/output/wordcount/* ~/ --查看文件最后5行 tail -n5 /root/part-r-00000
運行結果
從yarn客戶端可以看到程序運行的時間長度
從11:47:46開始,到11:56:48結束,總共9min2s.(這是在我機器上面的虛擬機里面跑的結果,如果在真正的集群里面跑的話,應該要快很多)
數據條數:12000000-1條
3.客戶端源碼分析
當我們在客戶端進行了分布式作業的配置后,最后執行
// Submit the job, then poll for progress until the job is complete job.waitForCompletion(true);
那么在waiteForCompletion()方法里面都做了些什么事情呢?
//我們傳遞的verbose=true public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { //提交動作 submit(); } //verbose=true if (verbose) { //監控並且打印job的相關信息 //在客戶端執行分布式作業的時候,我們能夠看到很多輸出 //如果verbose=false,我們則看不到作業輸出信息 monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } //返回作業的狀態 return isSuccessful(); }
這個方法里面最重要的就是submit()方法,提交分布式作業。所以,我們需要進入submit()方法。
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); //設置新的API,我使用的2.2.0的HadoopAPI,區別於之前的API setUseNewAPI(); //和集群做連接,集群里面做出相應,分配作業ID connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { //提交作業 /* Internal method for submitting jobs to the system. The job submission process involves: 1. Checking the input and output specifications of the job. 2. Computing the InputSplits for the job. 3. Setup the requisite accounting information for the DistributedCache of the job, if necessary. 4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system. 5. Submitting the job to the JobTracker and optionally monitoring it's status. */ //在這個方法里面包含5件事情。 //1.檢查輸入和輸出 //2.為每個job計算輸入切片的數量 //3.4.提交資源文件 //5.提交作業,監控狀態 //這里要注意的是,在2.x里面,已經沒有JobTracker了。 //JobTracker is no longer used since M/R 2.x. //This is a dummy JobTracker class, which is used to be compatible with M/R 1.x applications. return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
所以我們需要進入submitter.submitJObInternal()方法去看看里面的實現。
//在這個方法里面包含5件事情。 //1.檢查輸入和輸出 //2.為每個job計算輸入切片的數量 //3.4.提交資源文件 //5.提交作業,監控狀態 //這里要注意的是,在2.x里面,已經沒有JobTracker了。 JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs checkSpecs(job); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, job.getConfiguration()); //configure the command line options correctly on the submitting dfs Configuration conf = job.getConfiguration(); InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); //設置Job的ID job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(SHUFFLE_KEY_LENGTH); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); //寫切片信息,我們主要關系這個方法 :)) int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } // Write job file to submit dir writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // //到這里才真正提交job printTokens(jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } }
在這里我們關心的是
int maps = writeSplits(job, submitJobDir);
進入writeSplites()方法
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { //可以從job里面獲取configuration信息 JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { //調用新的切片方法,我們使用的2.x的hadoop,因此 //使用的是新的切片方法 maps = writeNewSplits(job, jobSubmitDir); } else { //舊的切片方法 maps = writeOldSplits(jConf, jobSubmitDir); } return maps; }
我們使用的版本是2.x,所以,我們使用writeNewSplites()方法。
@SuppressWarnings("unchecked") private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { //可以從job里面獲取configuration信息 Configuration conf = job.getConfiguration(); //通過反射獲取一個輸入格式化 //這里面返回的是TextInputFormat,即默認值 InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); // == 1 == //輸入格式化進行切片計算 List<InputSplit> splits = input.getSplits(job); // == 2 == T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
我們看到‘== 1 ==’,這里是獲取輸入格式化,進入job.getInputFormatClass()方法
@SuppressWarnings("unchecked") public Class<? extends InputFormat<?,?>> getInputFormatClass() throws ClassNotFoundException { //如果配置信息里面INPUT_FORMAT_CLASS_ATTR(mapreduce.job.inputformat.class)沒有配置 //則返回TextInputFormat //如果有配置,則返回我們配置的信息 //意思是:默認值為TextInputFormat return (Class<? extends InputFormat<?,?>>) conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class); }
我們看到,系統默認的輸入格式化為TextInputFormat。
我們看到‘== 2 ==’,這里從輸入格式化里面進行切片計算。那么我們進入getSplites()方法
public List<InputSplit> getSplits(JobContext job) throws IOException { //minSize = Math.max(1, 1L)=1 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // == A == //maxSize = Long.MAX_VALUE long maxSize = getMaxSplitSize(job); // == B == // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); //獲取輸入文件列表 List<FileStatus> files = listStatus(job); //遍歷文件列表 for (FileStatus file: files) { //一個文件一個文件的處理 //然后計算文件的切片 Path path = file.getPath(); //文件大小 long length = file.getLen(); if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { //通過路徑獲取FileSystem FileSystem fs = path.getFileSystem(job.getConfiguration()); //獲取文件所有塊信息 blkLocations = fs.getFileBlockLocations(file, 0, length); } //判斷文件是否可以切片 if (isSplitable(job, path)) { //可以切片 //獲取文件塊大小 long blockSize = file.getBlockSize(); //切片大小 splitSize = blockSize //默認情況下,切片大小等於塊的大小 long splitSize = computeSplitSize(blockSize, minSize, maxSize); // == C == long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //塊的索引 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); // == D == //切片詳細信息 splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts())); } } else { // not splitable //不可切片 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); LOG.debug("Total # of splits: " + splits.size()); return splits; }
我們看‘== A ==’, getFormatMinSplitSize()方法返回1,getMinSplitSize()方法返回1L。
protected long getFormatMinSplitSize() { return 1; } public static long getMinSplitSize(JobContext job) { //如果我們在配置文件中有配置SPLIT_MINSIZE(mapreduce.input.fileinputformat.split.minsize),則取配置文件里面的 //否則返回默認值1L //這里我們,沒有配置,所以返回1L return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); }
我們看‘== B ==’,getMaxSplitSize()方法返回Long.MAX_VALUE(我們沒有進行對SPLIT_MAXSIZE進行配置)
public static long getMaxSplitSize(JobContext context) { //如果我們在配置文件中有配置SPLIT_MAXSIZE(mapreduce.input.fileinputformat.split.maxsize),則取配置文件里面的 //否則返回默認值Long.MAX_VALUE //這里我們,沒有配置,所以返回Long.MAX_VALUE return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE); }
我們看‘== C ==’,在我們沒有進行配置的情況下,切片大小等於塊大小。
//minSize=1 //maxSize=Long.MAX_VALUE protected long computeSplitSize(long blockSize, long minSize, long maxSize) { //Math.min(maxSize, blockSize) -> Math.min(Long.MAX_VALUE, blockSize) -> blockSize //Math.max(minSize, blockSize) -> Math.max(1, blockSize) -> blockSize return Math.max(minSize, Math.min(maxSize, blockSize)); }
我們看‘== D ==’,通過偏移量獲取塊的索引信息。
protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { //通過偏移量獲取塊的索引 for (int i = 0 ; i < blkLocations.length; i++) { // is the offset inside this block? if ((blkLocations[i].getOffset() <= offset) && (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ return i; } } BlockLocation last = blkLocations[blkLocations.length -1]; long fileLength = last.getOffset() + last.getLength() -1; throw new IllegalArgumentException("Offset " + offset + " is outside of file (0.." + fileLength + ")"); }
4.小結
用通俗的語言來描述上面的事情,可以用下面的圖來說明:
系統默認的塊大小為128MB,在我們沒有進行其他配置的時候,塊大小等於切片大小。
Type1:塊大小為45MB,小於系統默認大小128MB,
切片信息:path, 0, 45, [3, 8, 10]
切片信息:文件的位置path, 偏移量0, 切片大小45, 塊的位置信息[3, 8, 10]=該文件(塊)存在HDFS文件系統的datanode3,datanode8,datanode10上面。
Type2:塊大小為128MB,即等於系統默認大小128MB,不會分成兩個快,和Type1一樣。
Type3:塊大小為414MB,即大於系統默認128MB,那么在我們上傳該文件到HDFS的時候,系統就會把該文件分成很多塊,每一塊128MB,每一塊128MB,直到分完為止,最后剩下30MB單獨為一塊。那么,每一個切片信息由文件位置path, 偏移量,切片大小, 塊的位置信息構成。我們把這一串信息稱為文件的切片清單。
當系統拿到了文件的切片清單了以后,那么就會把這些清單提交給分布式系統,再由分布式系統去處理各個切片。
5.Mapper詳解
5.1.map輸入
map從HDFS獲取輸入流,然后定位到切片的位置,除了第一個切片,其他切片都是從第二行開始讀取數據進行處理。
在org.apache.hadoop.mapred.MapTask里面,包含了run()方法
//org.apache.hadoop.mapred.MapTask public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; if (isMapTask()) { // If there are no reducers then there won't be any sort. Hence the map // phase will govern the entire attempt's progress. //我們在客戶端可以設置reduce的個數 // job.setNumReduceTasks(10); //如果沒有Reduce,只有map階段, if (conf.getNumReduceTasks() == 0) { //那么就執行這行 mapPhase = getProgress().addPhase("map", 1.0f); } else { // If there are reducers then the entire attempt's progress will be // split between the map phase (67%) and the sort phase (33%). //只要有Reduce階段, mapPhase = getProgress().addPhase("map", 0.667f); //就要加入排序 sortPhase = getProgress().addPhase("sort", 0.333f); } } TaskReporter reporter = startReporter(umbilical); boolean useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; } //是否使用新的API if (useNewApi) { //我們使用的是new mapper runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter); }
我們進入到runNewMapper()方法,我們可以看到整個map的宏觀動作
1.輸入初始化
2.調用org.apache.hadoop.mapreduce.Mapper.run()方法
3.更新狀態
4.關閉輸入
5.關閉輸出
@SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException { // make a task context so we can get the classes //獲取任務上下文 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // make a mapper // 通過反射構造mapper // 得到我們寫的mapper類 org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // == AA == // make the input format // 通過反射獲取輸入格式化 // 通過輸入格式化,在這里,就可以獲取到文件的切片清單 org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // == BB == // rebuild the input split //一個map對應的是一個切片,即一個切片對應一個map org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info("Processing split: " + split); //這里new了一個NewTrackingRecordReader() org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext); // == CC == job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.RecordWriter output = null; // get an output object if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new NewOutputCollector(taskContext, job, umbilical, reporter); } //創建一個map上下文對象 //這里傳入input對象 //這里MapContext,NewTrackingRecordReader,LineRecordReader他們之間的關系是什么呢? //在MapContext,NewTrackingRecordReader,LineRecordReader類里面都包含了nextKeyValue(),getCurrentKey(), getCurrentValue()方法 //當我們調用MapContext里面的nextKeyValue()的時候,會去掉用NewTrackingRecordReader里面的nextKeyValue()方法,這個方法最終會去調用LineRecordReader里面的nextKeyValue()方法 //即LineRecordReader才是最終做事情的 org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split); // == EE == org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext); try { //============================= // 這里列出了mapper的宏觀動作 // 1. 輸入初始化 // 2. 調用org.apache.hadoop.mapreduce.Mapper.run()方法 // 3. 更新狀態 // 4. 關閉輸入 // 5. 關閉輸出 //============================= //輸入初始化 input.initialize(split, mapperContext); // == FF == //然后調用mapper里面的run()方法,即org.apache.hadoop.mapreduce.Mapper里面的run()方法 mapper.run(mapperContext); // == GG == //map結束 mapPhase.complete(); setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); //關閉輸入 input.close(); input = null; //關閉輸出 output.close(mapperContext); output = null; } finally { closeQuietly(input); closeQuietly(output, mapperContext); } }
我們看'== AA ==',由於我們在客戶端已經設置了我們自定義的mapper,所以系統會返回我們定義的mapper類
//在客戶端,我們通過job.setMapperClass(MyMapper.class); //設置了我們自定義的mapper類,因此這里返回我們寫的mapper @SuppressWarnings("unchecked") public Class<? extends Mapper<?,?,?,?>> getMapperClass() throws ClassNotFoundException { return (Class<? extends Mapper<?,?,?,?>>) conf.getClass(MAP_CLASS_ATTR, Mapper.class); }
我們看'== BB ==',在上面我們已經提到,系統默認為TextInputFormat輸入格式化
//系統默認為TextInputFormat @SuppressWarnings("unchecked") public Class<? extends InputFormat<?,?>> getInputFormatClass() throws ClassNotFoundException { return (Class<? extends InputFormat<?,?>>) conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class); }
我們看'== CC ==',這里返回一個RecordReader對象
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split, org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat, TaskReporter reporter, org.apache.hadoop.mapreduce.TaskAttemptContext taskContext) throws InterruptedException, IOException { this.reporter = reporter; this.inputRecordCounter = reporter .getCounter(TaskCounter.MAP_INPUT_RECORDS); this.fileInputByteCounter = reporter .getCounter(FileInputFormatCounter.BYTES_READ); List <Statistics> matchedStats = null; if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) { matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split) .getPath(), taskContext.getConfiguration()); } fsStats = matchedStats; long bytesInPrev = getInputBytes(fsStats); //客戶端輸入格式化計算切片 //而在map階段,輸入格式化會創建一個 //org.apache.hadoop.mapreduce.RecordReader<KEYIN, VALUEIN> this.real = inputFormat.createRecordReader(split, taskContext); // == DD == long bytesInCurr = getInputBytes(fsStats); fileInputByteCounter.increment(bytesInCurr - bytesInPrev); }
我們看'== DD ==', 這里直接new一個LineRecordReader行讀取器。這個在后面還會提到。因為真正做事情的就是這個行讀取器。
//org.apache.hadoop.mapreduce.lib.input.TextInputFormat @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); //這里創建了一個行讀取器 return new LineRecordReader(recordDelimiterBytes); }
我們看'== EE =='創建map上下文
//這里的reader就是org.apache.hadoop.mapreduce.RecordReader public MapContextImpl(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN,VALUEIN> reader, RecordWriter<KEYOUT,VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { super(conf, taskid, writer, committer, reporter); this.reader = reader; this.split = split; }
看到這里以后,這里MapContext,NewTrackingRecordReader,LineRecordReader他們之間的關系是什么呢?
這要看這三個類里面的一些共同的方法:
nextKeyValue()
getCurrentKey()
getCurrentValue()
當我們調用MapContext里面的nextKeyValue()的時候,會去掉用NewTrackingRecordReader里面的nextKeyValue()方法,這個方法最終會去調用LineRecordReader里面的nextKeyValue()方法。
即LineRecordReader才是最終做事情的
我們看'== FF ==',輸入初始化
//輸入初始化 public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); //起始偏移量 start = split.getStart(); //結束偏移量 end = start + split.getLength(); //位置信息 final Path file = split.getPath(); // open the file and seek to the start of the split //打開HDFS文件 final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null!=codec) { isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); if (null == this.recordDelimiterBytes){ in = new LineReader(cIn, job); } else { in = new LineReader(cIn, job, this.recordDelimiterBytes); } start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { if (null == this.recordDelimiterBytes) { in = new LineReader(codec.createInputStream(fileIn, decompressor), job); } else { in = new LineReader(codec.createInputStream(fileIn, decompressor), job, this.recordDelimiterBytes); } filePosition = fileIn; } } else { fileIn.seek(start); if (null == this.recordDelimiterBytes){ in = new LineReader(fileIn, job); } else { in = new LineReader(fileIn, job, this.recordDelimiterBytes); } filePosition = fileIn; } // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. //如果不是第一個切片,即從第二個切片開始,通常情況下,不會去讀取第一行 //而是從第二行開始讀取 if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; }
怎樣理解下面代碼呢?
if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); }
我們可以通過下圖可以知道
一個文件上傳到HDFS后,被分成很多block,然而每個block有一定的size,那么在切分這些文件的時候,就可能產生一個block的最后一行被放在兩個block里面
e.g.Block1里面的最后一行,原本應該是'hello hongten 5'
但是由於block的size的大小限制,該文本被分成兩部分'hello hong' 和 'ten 5'
現在切片個數大於1,那么Block2在讀取內容的時候,從第二行開始讀取,即從'hello hongten 6'開始讀取。而對於Block1在讀取內容的時候,則會讀取Block2的第一行,即'ten 5'。
這樣就保證了數據的完整性了。
我們看'== GG ==',調用org.apache.hadoop.mapreduce.Mapper.run()方法
public void run(Context context) throws IOException, InterruptedException { setup(context); try { //最終調用LineRecordReader.nextKeyValue(), // 這里是一行一行讀取數據 // 即讀一行數據,調用map()方法 while (context.nextKeyValue()) { //最終調用LineRecordReader.getCurrentKey(), LineRecordReader.getCurrentValue() map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
那么LineRecordReader里面的nextKeyValue()做了什么呢?
public boolean nextKeyValue() throws IOException { if (key == null) { //key為偏移量,默認為LongWritable key = new LongWritable(); } //給key賦值 key.set(pos); if (value == null) { //value默認為Text value = new Text(); } int newSize = 0; // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) //這里總是讀取多一行,為什么要讀取多一行呢?現在知道了吧 while (getFilePosition() <= end) { //給value賦值 newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength)); pos += newSize; if (newSize < maxLineLength) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } @Override public LongWritable getCurrentKey() { //因為在nextKeyValue()已經賦值,直接返回 return key; } @Override public Text getCurrentValue() { //因為在nextKeyValue()已經賦值,直接返回 return value; }
5.2.map輸出
@SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException { //.....其他代碼省略 org.apache.hadoop.mapreduce.RecordWriter output = null; // get an output object //如果沒有Reduce if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { //在我們客戶端定義了一個reduce output = new NewOutputCollector(taskContext, job, umbilical, reporter); } //.....其他代碼省略 }
在NewOutputCollector里面做了什么呢?
@SuppressWarnings("unchecked") NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { //創建一個collecter容器 collector = createSortingCollector(job, reporter); // == OO1 == //分區數量 = Reduce Task的數量 partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { //多個分區 partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); // == OO2 == } else { //第1個分區器,獲取0號分區器 partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; } }
我們看看'== OO1 ==',調用createSortingCollector()創建一個collector容器
@SuppressWarnings("unchecked") private <KEY, VALUE> MapOutputCollector<KEY, VALUE> createSortingCollector(JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException { MapOutputCollector<KEY, VALUE> collector = (MapOutputCollector<KEY, VALUE>) ReflectionUtils.newInstance( job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class, MapOutputCollector.class), job); LOG.info("Map output collector class = " + collector.getClass().getName()); MapOutputCollector.Context context = new MapOutputCollector.Context(this, job, reporter); //容器初始化 collector.init(context); //返回容器 return collector; }
調用init()方法,在該方法里面主要做了以下幾件事情:
1.設置內存緩沖區
2.設置排序器
3.設置比較器
4.設置合並器
5.設置溢寫線程
public void init(MapOutputCollector.Context context ) throws IOException, ClassNotFoundException { job = context.getJobConf(); reporter = context.getReporter(); mapTask = context.getMapTask(); mapOutputFile = mapTask.getMapOutputFile(); sortPhase = mapTask.getSortPhase(); spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS); partitions = job.getNumReduceTasks(); rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw(); //sanity checks //map處理數據的時候,需要放入內存緩沖區 //那么這里的100就是系統默認的緩沖區大小,即100MB。 //我們可以通過配置IO_SORT_MB(mapreduce.task.io.sort.mb)對緩沖區大小進行調節。 //0.8的是內存緩沖區閾值的意思,就是當這個緩沖區使用了80%,那么這個時候, //緩沖區里面的80%的數據就可以溢寫到磁盤。 // 我們可以通過配置MAP_SORT_SPILL_PERCENT(mapreduce.map.sort.spill.percent)對緩沖區閾值進行調節。 final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); if (spillper > (float)1.0 || spillper <= (float)0.0) { throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + "\": " + spillper); } if ((sortmb & 0x7FF) != sortmb) { throw new IOException( "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); } //排序器,默認為快速排序算法(QuickSort) //把map里面的亂序的數據,使用快速排序算法進行排序 //使得內存中亂序的數據進行排序,然后把排序好的數據,溢寫到磁盤 sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job); // buffers and accounting int maxMemUsage = sortmb << 20; maxMemUsage -= maxMemUsage % METASIZE; kvbuffer = new byte[maxMemUsage]; bufvoid = kvbuffer.length; kvmeta = ByteBuffer.wrap(kvbuffer) .order(ByteOrder.nativeOrder()) .asIntBuffer(); setEquator(0); bufstart = bufend = bufindex = equator; kvstart = kvend = kvindex; maxRec = kvmeta.capacity() / NMETA; softLimit = (int)(kvbuffer.length * spillper); bufferRemaining = softLimit; if (LOG.isInfoEnabled()) { LOG.info(JobContext.IO_SORT_MB + ": " + sortmb); LOG.info("soft limit at " + softLimit); LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid); LOG.info("kvstart = " + kvstart + "; length = " + maxRec); } // k/v serialization //比較器 comparator = job.getOutputKeyComparator(); // == OO3 == keyClass = (Class<K>)job.getMapOutputKeyClass(); valClass = (Class<V>)job.getMapOutputValueClass(); serializationFactory = new SerializationFactory(job); keySerializer = serializationFactory.getSerializer(keyClass); keySerializer.open(bb); valSerializer = serializationFactory.getSerializer(valClass); valSerializer.open(bb); // output counters mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES); mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); fileOutputByteCounter = reporter .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES); // compression if (job.getCompressMapOutput()) { Class<? extends CompressionCodec> codecClass = job.getMapOutputCompressorClass(DefaultCodec.class); codec = ReflectionUtils.newInstance(codecClass, job); } else { codec = null; } // combiner //合並器 final Counters.Counter combineInputCounter = reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS); combinerRunner = CombinerRunner.create(job, getTaskID(), combineInputCounter, reporter, null); if (combinerRunner != null) { final Counters.Counter combineOutputCounter = reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job); } else { combineCollector = null; } spillInProgress = false; //最小溢寫值,默認為3 //即在默認情況下,我們在定義了合並器, // 1. 在內存溢寫到磁盤的過程中,在溢寫之前,數據會在內存中進行合並。 // 2. 在溢寫的文件的過程中,文件數量>3,那么此時就會觸發合並器進行合並文件。 minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3); //溢寫線程 spillThread.setDaemon(true); spillThread.setName("SpillThread"); spillLock.lock(); try { spillThread.start(); while (!spillThreadRunning) { spillDone.await(); } } catch (InterruptedException e) { throw new IOException("Spill thread failed to initialize", e); } finally { spillLock.unlock(); } if (sortSpillException != null) { throw new IOException("Spill thread failed to initialize", sortSpillException); } }
這里涉及到環形緩沖區:
我們看看'== OO3 ==', 獲取比較器
public RawComparator getOutputKeyComparator() { // 1. 用戶配置了取用戶配置的 // 2. 用戶沒有配置,則取key自身的比較器 Class<? extends RawComparator> theClass = getClass( JobContext.KEY_COMPARATOR, null, RawComparator.class); if (theClass != null) return ReflectionUtils.newInstance(theClass, this); return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class)); }
我們看看'== OO2 ==',獲取分區器
@SuppressWarnings("unchecked") public Class<? extends Partitioner<?,?>> getPartitionerClass() throws ClassNotFoundException { //默認為HashPartitioner return (Class<? extends Partitioner<?,?>>) conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class); } //在HashPartitioner中包含getPartition()方法 public int getPartition(K key, V value, int numReduceTasks) { //分區 return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }
當我們客戶端的map調用write(key, value)方法時,系統會在內部調用collector.collect()方法,獲取key, value, partitions,即k,v,p
public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions)); }
最后會調用close()方法,關閉輸出
@Override public void close(TaskAttemptContext context ) throws IOException,InterruptedException { try { collector.flush(); } catch (ClassNotFoundException cnf) { throw new IOException("can't find class ", cnf); } collector.close(); }
5.3.map小結
在map輸入階段:每個map處理一個切片的數據量,需要seek(),讓出第一行,從第二行開始讀取數據(切片數量大於1)。
在map輸出階段:map輸出的是Key, value;但是map計算完成以后,會得到key, value, partition.也就是說,每個數據從map輸出只有,就知道歸屬於哪一個reduce task去處理了,歸屬於那個分區
之后,在內存中有一個內存緩沖區buffer in memory,這個內存緩沖區是環形緩沖區。
內存大小默認是100MB,為了是內存溢寫不阻塞,默認的閾值是80%,即只要大於等於80MB的時候,就會觸發溢寫,溢寫會把內存中的數據寫入到磁盤。在寫入磁盤之前要對數據進行快速排序,這是整個框架當中僅有的一次,把數據從亂序到有序。后面的排序都是把有序的數據進行歸並排序了。
在排序的時候,有一個判定。有可能我們定義了combiner,需要壓縮一下數據。
現在大數據,最大的瓶頸就是I/O,磁盤I/O,網絡I/O,都是慢I/O。
所以在I/O之前,能在內存里面排序就排序,能壓縮就盡量壓縮。那么在調用I/O的時候,寫的數據越少越好,速度就越快。
在溢寫的時候(partion, sort and spill to disk),先按分區排序,在分區內再按key排序。這是因為map計算的結果是key, value, partition.這樣的文件才能是內部有序。最后,溢寫很多的小文件要歸並成一個大文件。那么大文件也是按分區排序,文件里面再按key排序。
- 如果我們做了combiner,在歸並成大文件的時候,框架默認的小文件數量是3個
- 只要我們設置的值大於等於3(mapreduce.map.combine.minspills)
就會觸發combiner壓縮數據,這是為了減少在shuffer階段拉取網絡I/O,以及在拉完數據以后,讓Reduce處理數據量變少,加快計算速度。所以map的工作的核心目的,就是讓reduce跑的越來越快。
6.Reduce詳解
Reduce需要從Map那邊獲取Map的輸出,作為Reduce的輸入。
@Override @SuppressWarnings("unchecked") public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException { job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); //================= Shuffer階段從Map端拉取數據 開始 ============ if (isMapOrReduce()) { copyPhase = getProgress().addPhase("copy"); sortPhase = getProgress().addPhase("sort"); reducePhase = getProgress().addPhase("reduce"); } // start thread that will handle communication with parent TaskReporter reporter = startReporter(umbilical); boolean useNewApi = job.getUseNewReducer(); initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; } // Initialize the codec codec = initCodec(); RawKeyValueIterator rIter = null; ShuffleConsumerPlugin shuffleConsumerPlugin = null; boolean isLocal = false; // local if // 1) framework == local or // 2) framework == null and job tracker address == local String framework = job.get(MRConfig.FRAMEWORK_NAME); String masterAddr = job.get(MRConfig.MASTER_ADDRESS, "local"); if ((framework == null && masterAddr.equals("local")) || (framework != null && framework.equals(MRConfig.LOCAL_FRAMEWORK_NAME))) { isLocal = true; } if (!isLocal) { Class combinerClass = conf.getCombinerClass(); CombineOutputCollector combineCollector = (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null; Class<? extends ShuffleConsumerPlugin> clazz = job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job); LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin); ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter, shuffledMapsCounter, reduceShuffleBytes, failedShuffleCounter, mergedMapOutputsCounter, taskStatus, copyPhase, sortPhase, this, mapOutputFile); shuffleConsumerPlugin.init(shuffleContext); //rIter這個迭代器里面的數據就是從Map端拉取的數據集 //即接下來Reduce的數據輸入源 rIter = shuffleConsumerPlugin.run(); } else { // local job runner doesn't have a copy phase copyPhase.complete(); final FileSystem rfs = FileSystem.getLocal(job).getRaw(); rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(), job.getMapOutputValueClass(), codec, getMapFiles(rfs, true), !conf.getKeepFailedTaskFiles(), job.getInt(JobContext.IO_SORT_FACTOR, 100), new Path(getTaskID().toString()), job.getOutputKeyComparator(), reporter, spilledRecordsCounter, null, null); } // free up the data structures mapOutputFilesOnDisk.clear(); //================= Shuffer階段從Map端拉取數據 結束 ============ sortPhase.complete(); // sort is complete setPhase(TaskStatus.Phase.REDUCE); statusUpdate(umbilical); Class keyClass = job.getMapOutputKeyClass(); Class valueClass = job.getMapOutputValueClass(); //分組比較器 RawComparator comparator = job.getOutputValueGroupingComparator(); // === RR0 == if (useNewApi) { //使用新API runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); // === RR1 == } else { runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } if (shuffleConsumerPlugin != null) { shuffleConsumerPlugin.close(); } done(umbilical, reporter); }
我們看'=== RR0 ==',分組比較器
我們通過代碼可以看出里面的邏輯:
1.如果用戶設置了分組比較器,系統則使用
2.如果用戶沒有設置分組比較器,系統會查看用戶是否設置了排序比較器,如果有設置,則使用
3.如果用戶沒有設置分組比較器,排序比較器,那么系統會使用自身的key比較器
//1.用戶是否設置分組比較器GROUP_COMPARATOR_CLASS //2.用戶是否設置排序比較器KEY_COMPARATOR //3.如果用戶都沒有設置,則使用自身key比較器 public RawComparator getOutputValueGroupingComparator() { //通過反射獲取分組比較器 //用戶可以通過配置GROUP_COMPARATOR_CLASS(mapreduce.job.output.group.comparator.class)來定義比較器 Class<? extends RawComparator> theClass = getClass( JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class); if (theClass == null) { return getOutputKeyComparator(); } return ReflectionUtils.newInstance(theClass, this); } public RawComparator getOutputKeyComparator() { //用戶是否設置排序比較器KEY_COMPARATOR //如果用戶都沒有設置,則使用自身key比較器 Class<? extends RawComparator> theClass = getClass( JobContext.KEY_COMPARATOR, null, RawComparator.class); if (theClass != null) return ReflectionUtils.newInstance(theClass, this); return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class)); } //自身key比較器 public Class<?> getMapOutputKeyClass() { Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class); if (retv == null) { retv = getOutputKeyClass(); } return retv; }
我們看看‘=== RR1 ==’,
private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewReducer(JobConf job, final TaskUmbilicalProtocol umbilical, final TaskReporter reporter, RawKeyValueIterator rIter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass ) throws IOException,InterruptedException, ClassNotFoundException { // wrap value iterator to report progress. final RawKeyValueIterator rawIter = rIter; rIter = new RawKeyValueIterator() { public void close() throws IOException { rawIter.close(); } public DataInputBuffer getKey() throws IOException { return rawIter.getKey(); } public Progress getProgress() { return rawIter.getProgress(); } public DataInputBuffer getValue() throws IOException { return rawIter.getValue(); } public boolean next() throws IOException { boolean ret = rawIter.next(); reporter.setProgress(rawIter.getProgress().getProgress()); return ret; } }; // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // make a reducer org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getReducerClass(), job); org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext); job.setBoolean("mapred.skip.on", isSkipping()); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); //創建Reduce上下文 org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(), rIter, reduceInputKeyCounter, reduceInputValueCounter, trackedRW, committer, reporter, comparator, keyClass, valueClass); try { //調用org.apache.hadoop.mapreduce.Reducer.run()方法 reducer.run(reducerContext); } finally { trackedRW.close(reducerContext); } }
進入createReduceContext()方法
protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context createReduceContext(org.apache.hadoop.mapreduce.Reducer <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, org.apache.hadoop.mapreduce.OutputCommitter committer, org.apache.hadoop.mapreduce.StatusReporter reporter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass ) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> reduceContext = //創建ReduceContextImpl實例對象 new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context reducerContext = new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext( reduceContext); return reducerContext; }
進入ReduceContextImpl()方法
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter<KEYOUT,VALUEOUT> output, OutputCommitter committer, StatusReporter reporter, RawComparator<KEYIN> comparator, Class<KEYIN> keyClass, Class<VALUEIN> valueClass ) throws InterruptedException, IOException{ super(conf, taskid, output, committer, reporter); this.input = input; this.inputKeyCounter = inputKeyCounter; this.inputValueCounter = inputValueCounter; this.comparator = comparator; this.serializationFactory = new SerializationFactory(conf); this.keyDeserializer = serializationFactory.getDeserializer(keyClass); this.keyDeserializer.open(buffer); this.valueDeserializer = serializationFactory.getDeserializer(valueClass); this.valueDeserializer.open(buffer); hasMore = input.next(); this.keyClass = keyClass; this.valueClass = valueClass; this.conf = conf; this.taskid = taskid; }
最后會把map端的輸出,作為Reduce端的輸入傳遞到這里。
public void run(Context context) throws IOException, InterruptedException { setup(context); try { //循環每一個key while (context.nextKey()) { //調用reduce方法,這個方法我們已經重寫,所以每次調用的時候,會調用我們自己的reduce方法 reduce(context.getCurrentKey(), context.getValues(), context); // If a back up store is used, reset it Iterator<VALUEIN> iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); } } } finally { cleanup(context); } }
7.總結
========================================================
More reading,and english is important.
I'm Hongten
大哥哥大姐姐,覺得有用打賞點哦!你的支持是我最大的動力。謝謝。
Hongten博客排名在100名以內。粉絲過千。
Hongten出品,必是精品。
E | hongtenzone@foxmail.com B | http://www.cnblogs.com/hongten
========================================================