OutputFormat輸出過程的學習


轉自:http://blog.csdn.net/androidlushangderen/article/details/41278351

 

花了大約1周的時間,終於把MapReduce的5大階段的源碼學習結束掉了,收獲不少,就算本人對Hadoop學習的一個里程碑式的紀念吧。今天花了一點點的時間,把MapReduce的最后一個階段,輸出OutputFormat給做了分析,這個過程跟InputFormat剛剛好是對着干的,二者極具對稱性。為什么這么說呢,待我一一分析。

            OutputFormat過程的作用就是定義數據key-value的輸出格式,給你處理好后的數據,究竟以什么樣的形式輸出呢,才能讓下次別人拿到這個文件的時候能准確的提取出里面的數據。這里,我們撇開這個話題,僅僅我知道的一些定義的數據格式的方法,比如在Redis中會有這樣的設計:

[key-length][key][value-length][value][key-length][key][value-length][value]...

或者說不一定非要省空間,直接搞過分隔符

[key]   [value]\n

[key]   [value]\n

[key]   [value]\n

.....

這樣逐行讀取,再以空格隔開,取出里面的鍵值對,這么做簡單是簡單,就是不緊湊,空間浪費得有點多。在MapReduce的OutputFormat的有種格式用的就是這種方式。

        首先必須得了解OutputFormat里面到底有什么東西:

 

[java]  view plain copy print ?
 
  1. public interface OutputFormat<K, V> {  
  2.   
  3.   /**  
  4.    * Get the {@link RecordWriter} for the given job. 
  5.    * 獲取輸出記錄鍵值記錄 
  6.    * 
  7.    * @param ignored 
  8.    * @param job configuration for the job whose output is being written. 
  9.    * @param name the unique name for this part of the output. 
  10.    * @param progress mechanism for reporting progress while writing to file. 
  11.    * @return a {@link RecordWriter} to write the output for the job. 
  12.    * @throws IOException 
  13.    */  
  14.   RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,  
  15.                                      String name, Progressable progress)  
  16.   throws IOException;  
  17.   
  18.   /**  
  19.    * Check for validity of the output-specification for the job. 
  20.    *   
  21.    * <p>This is to validate the output specification for the job when it is 
  22.    * a job is submitted.  Typically checks that it does not already exist, 
  23.    * throwing an exception when it already exists, so that output is not 
  24.    * overwritten.</p> 
  25.    * 作業運行之前進行的檢測工作,例如配置的輸出目錄是否存在等 
  26.    * 
  27.    * @param ignored 
  28.    * @param job job configuration. 
  29.    * @throws IOException when output should not be attempted 
  30.    */  
  31.   void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;  
  32. }  

很簡單的2個方法,RecordWriter比較重要,后面的key-value的寫入操作都是根據他來完成的。但是他是一個接口,在MapReduce中,我們用的最多的他的子類是FileOutputFormat:

 

 

[java]  view plain copy print ?
 
  1. /** A base class for {@link OutputFormat}. */  
  2. public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {  

他是一個抽象類,但是實現了接口中的第二個方法checkOutputSpecs()方法:

 

 

[java]  view plain copy print ?
 
  1. public void checkOutputSpecs(FileSystem ignored, JobConf job)   
  2.     throws FileAlreadyExistsException,   
  3.            InvalidJobConfException, IOException {  
  4.     // Ensure that the output directory is set and not already there  
  5.     Path outDir = getOutputPath(job);  
  6.     if (outDir == null && job.getNumReduceTasks() != 0) {  
  7.       throw new InvalidJobConfException("Output directory not set in JobConf.");  
  8.     }  
  9.     if (outDir != null) {  
  10.       FileSystem fs = outDir.getFileSystem(job);  
  11.       // normalize the output directory  
  12.       outDir = fs.makeQualified(outDir);  
  13.       setOutputPath(job, outDir);  
  14.         
  15.       // get delegation token for the outDir's file system  
  16.       TokenCache.obtainTokensForNamenodes(job.getCredentials(),   
  17.                                           new Path[] {outDir}, job);  
  18.         
  19.       // check its existence  
  20.       if (fs.exists(outDir)) {  
  21.         //如果輸出目錄以及存在,則拋異常  
  22.         throw new FileAlreadyExistsException("Output directory " + outDir +   
  23.                                              " already exists");  
  24.       }  
  25.     }  
  26.   }  

就是檢查輸出目錄在不在的操作。在這個類里還出現了一個輔助類:

 

 

[java]  view plain copy print ?
 
  1. public static Path getTaskOutputPath(JobConf conf, String name)   
  2.   throws IOException {  
  3.     // ${mapred.out.dir}  
  4.     Path outputPath = getOutputPath(conf);  
  5.     if (outputPath == null) {  
  6.       throw new IOException("Undefined job output-path");  
  7.     }  
  8.       
  9.     //根據OutputCommitter獲取輸出路徑  
  10.     OutputCommitter committer = conf.getOutputCommitter();  
  11.     Path workPath = outputPath;  
  12.     TaskAttemptContext context = new TaskAttemptContext(conf,  
  13.                 TaskAttemptID.forName(conf.get("mapred.task.id")));  
  14.     if (committer instanceof FileOutputCommitter) {  
  15.       workPath = ((FileOutputCommitter)committer).getWorkPath(context,  
  16.                                                               outputPath);  
  17.     }  
  18.       
  19.     // ${mapred.out.dir}/_temporary/_${taskid}/${name}  
  20.     return new Path(workPath, name);  
  21.   }  

就是上面OutputCommiter,里面定義了很多和Task,job作業相關的方法。很多時候都會與OutputFormat合作的形式出現。他也有自己的子類實現FileOutputCommiter:

 

 

[java]  view plain copy print ?
 
  1. public class FileOutputCommitter extends OutputCommitter {  
  2.   
  3.   public static final Log LOG = LogFactory.getLog(  
  4.       "org.apache.hadoop.mapred.FileOutputCommitter");  
  5. /** 
  6.    * Temporary directory name  
  7.    */  
  8.   public static final String TEMP_DIR_NAME = "_temporary";  
  9.   public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";  
  10.   static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =  
  11.     "mapreduce.fileoutputcommitter.marksuccessfuljobs";  
  12.   
  13.   public void setupJob(JobContext context) throws IOException {  
  14.     JobConf conf = context.getJobConf();  
  15.     Path outputPath = FileOutputFormat.getOutputPath(conf);  
  16.     if (outputPath != null) {  
  17.       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);  
  18.       FileSystem fileSys = tmpDir.getFileSystem(conf);  
  19.       if (!fileSys.mkdirs(tmpDir)) {  
  20.         LOG.error("Mkdirs failed to create " + tmpDir.toString());  
  21.       }  
  22.     }  
  23.   }  
  24.   ....  

在Reduce階段的后面的寫階段,FileOutputFormat是默認的輸出的類型:

 

 

[java]  view plain copy print ?
 
  1. //獲取輸出的key,value  
  2.     final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(  
  3.         reduceOutputCounter, job, reporter, finalName);  
  4.       
  5.     OutputCollector<OUTKEY,OUTVALUE> collector =   
  6.       new OutputCollector<OUTKEY,OUTVALUE>() {  
  7.         public void collect(OUTKEY key, OUTVALUE value)  
  8.           throws IOException {  
  9.           //將處理后的key,value寫入輸出流中,最后寫入HDFS作為最終結果  
  10.           out.write(key, value);  
  11.           // indicate that progress update needs to be sent  
  12.           reporter.progress();  
  13.         }  
  14.       };  

out就是直接發揮作用的類,但是是哪個Formtat的返回的呢,我們進入OldTrackingRecordWriter繼續看:

 

 

[java]  view plain copy print ?
 
  1. public OldTrackingRecordWriter(  
  2.         org.apache.hadoop.mapred.Counters.Counter outputRecordCounter,  
  3.         JobConf job, TaskReporter reporter, String finalName)  
  4.         throws IOException {  
  5.       this.outputRecordCounter = outputRecordCounter;  
  6.       //默認是FileOutputFormat文件輸出方式  
  7.       this.fileOutputByteCounter = reporter  
  8.           .getCounter(FileOutputFormat.Counter.BYTES_WRITTEN);  
  9.       Statistics matchedStats = null;  
  10.       if (job.getOutputFormat() instanceof FileOutputFormat) {  
  11.         matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);  
  12.       }  
  13.       fsStats = matchedStats;  
  14.   
  15.       FileSystem fs = FileSystem.get(job);  
  16.       long bytesOutPrev = getOutputBytes(fsStats);  
  17.       //從配置中獲取作業的輸出方式  
  18.       this.real = job.getOutputFormat().getRecordWriter(fs, job, finalName,  
  19.           reporter);  
  20.       long bytesOutCurr = getOutputBytes(fsStats);  
  21.       fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);  
  22.     }  

果然是我們所想的那樣,FileOutputFormat,但是不要忘了它的getRecordWriter()是抽象方法,調用它還必須由它的子類來實現:

 

 

[java]  view plain copy print ?
 
  1. public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,  
  2.                                               JobConf job, String name,  
  3.                                               Progressable progress)  
  4.    throws IOException;  

在這里我們舉出其中在InputFormat舉過的例子,TextOutputFormat,SequenceFileOutputFormat,與TextInputFormat,SequenceFileInputFormat對應。也就說說上面2個子類定義了2種截然不同的輸出格式,也就返回了不一樣的RecordWriter實現類.在TextOutputFormat中,他定義了一個叫LineRecordWriter的定義:

 

 

[java]  view plain copy print ?
 
  1. public RecordWriter<K, V> getRecordWriter(FileSystem ignored,  
  2.                                                  JobConf job,  
  3.                                                  String name,  
  4.                                                  Progressable progress)  
  5.    throws IOException {  
  6. //從配置判斷輸出是否要壓縮  
  7.    boolean isCompressed = getCompressOutput(job);  
  8.    //配置中獲取加在key-value的分隔符  
  9.    String keyValueSeparator = job.get("mapred.textoutputformat.separator",   
  10.                                       "\t");  
  11.    //根據是否壓縮獲取相應的LineRecordWriter  
  12.    if (!isCompressed) {  
  13.      Path file = FileOutputFormat.getTaskOutputPath(job, name);  
  14.      FileSystem fs = file.getFileSystem(job);  
  15.      FSDataOutputStream fileOut = fs.create(file, progress);  
  16.      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);  
  17.    } else {  
  18.      Class<? extends CompressionCodec> codecClass =  
  19.        getOutputCompressorClass(job, GzipCodec.class);  
  20.      // create the named codec  
  21.      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);  
  22.      // build the filename including the extension  
  23.      Path file =   
  24.        FileOutputFormat.getTaskOutputPath(job,   
  25.                                           name + codec.getDefaultExtension());  
  26.      FileSystem fs = file.getFileSystem(job);  
  27.      FSDataOutputStream fileOut = fs.create(file, progress);  
  28.      return new LineRecordWriter<K, V>(new DataOutputStream  
  29.                                        (codec.createOutputStream(fileOut)),  
  30.                                        keyValueSeparator);  
  31.    }  

他以一個內部類的形式存在於TextOutputFormat。而在SequenceFileOutputFormat中,他的形式是怎樣的呢:

 

 

[java]  view plain copy print ?
 
  1. public RecordWriter<K, V> getRecordWriter(  
  2.                                         FileSystem ignored, JobConf job,  
  3.                                         String name, Progressable progress)  
  4.   throws IOException {  
  5.   // get the path of the temporary output file   
  6.   Path file = FileOutputFormat.getTaskOutputPath(job, name);  
  7.     
  8.   FileSystem fs = file.getFileSystem(job);  
  9.   CompressionCodec codec = null;  
  10.   CompressionType compressionType = CompressionType.NONE;  
  11.   if (getCompressOutput(job)) {  
  12.     // find the kind of compression to do  
  13.     compressionType = getOutputCompressionType(job);  
  14.   
  15.     // find the right codec  
  16.     Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,  
  17.  DefaultCodec.class);  
  18.     codec = ReflectionUtils.newInstance(codecClass, job);  
  19.   }  
  20.   final SequenceFile.Writer out =   
  21.     SequenceFile.createWriter(fs, job, file,  
  22.                               job.getOutputKeyClass(),  
  23.                               job.getOutputValueClass(),  
  24.                               compressionType,  
  25.                               codec,  
  26.                               progress);  
  27.   
  28.   return new RecordWriter<K, V>() {  
  29.   
  30.       public void write(K key, V value)  
  31.         throws IOException {  
  32.   
  33.         out.append(key, value);  
  34.       }  
  35.   
  36.       public void close(Reporter reporter) throws IOException { out.close();}  
  37.     };  
  38. }  

關鍵的操作都在於SequenceFile.Writer中。有不同的RecordWriter就會有不同的寫入數據的方式,這里我們舉LineRecordWriter的例子。看看他的寫入方法:

 

 

[java]  view plain copy print ?
 
  1. //往輸出流中寫入key-value  
  2.     public synchronized void write(K key, V value)  
  3.       throws IOException {  
  4.   
  5.       //判斷鍵值對是否為空  
  6.       boolean nullKey = key == null || key instanceof NullWritable;  
  7.       boolean nullValue = value == null || value instanceof NullWritable;  
  8.         
  9.       //如果k-v都為空,則操作失敗,不寫入直接返回  
  10.       if (nullKey && nullValue) {  
  11.         return;  
  12.       }  
  13.         
  14.       //如果key不空,則寫入key  
  15.       if (!nullKey) {  
  16.         writeObject(key);  
  17.       }  
  18.         
  19.       //如果key,value都不為空,則中間寫入k-v分隔符,在這里為\t空格符  
  20.       if (!(nullKey || nullValue)) {  
  21.         out.write(keyValueSeparator);  
  22.       }  
  23.         
  24.       //最后寫入value  
  25.       if (!nullValue) {  
  26.         writeObject(value);  
  27.       }  

在這個方法里,我們就能看出他的存儲形式就是我剛剛在上面講的第二種存儲方式。這個方法將會在下面的代碼中被執行:

 

 

[java]  view plain copy print ?
 
  1. OutputCollector<OUTKEY,OUTVALUE> collector =   
  2.       new OutputCollector<OUTKEY,OUTVALUE>() {  
  3.         public void collect(OUTKEY key, OUTVALUE value)  
  4.           throws IOException {  
  5.           //將處理后的key,value寫入輸出流中,最后寫入HDFS作為最終結果  
  6.           out.write(key, value);  
  7.           // indicate that progress update needs to be sent  
  8.           reporter.progress();  
  9.         }  
  10.       };  

過程可以這么理解:

 

collector.collect()------->out.write(key, value)------->LineRecordWriter.write(key, value)------->DataOutputStream.write(key, value).

DataOutputStream是內置於LineRecordWriter的作為里面的變量存在的。這樣從Reduce末尾階段到Output的過程也完全打通了。下面可以看看這上面涉及的完整的類目關系。

      下一階段的學習,可能或偏向於Task,Job階段的過程分析,更加宏觀過程上的一個分析。也可能會分析某個功能塊的實現過程,比如Hadoop的IPC過程,據說用了很多JAVA NIO的東西。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM