轉自:http://blog.csdn.net/androidlushangderen/article/details/41278351
花了大約1周的時間,終於把MapReduce的5大階段的源碼學習結束掉了,收獲不少,就算本人對Hadoop學習的一個里程碑式的紀念吧。今天花了一點點的時間,把MapReduce的最后一個階段,輸出OutputFormat給做了分析,這個過程跟InputFormat剛剛好是對着干的,二者極具對稱性。為什么這么說呢,待我一一分析。
OutputFormat過程的作用就是定義數據key-value的輸出格式,給你處理好后的數據,究竟以什么樣的形式輸出呢,才能讓下次別人拿到這個文件的時候能准確的提取出里面的數據。這里,我們撇開這個話題,僅僅我知道的一些定義的數據格式的方法,比如在Redis中會有這樣的設計:
[key-length][key][value-length][value][key-length][key][value-length][value]...
或者說不一定非要省空間,直接搞過分隔符
[key] [value]\n
[key] [value]\n
[key] [value]\n
.....
這樣逐行讀取,再以空格隔開,取出里面的鍵值對,這么做簡單是簡單,就是不緊湊,空間浪費得有點多。在MapReduce的OutputFormat的有種格式用的就是這種方式。
首先必須得了解OutputFormat里面到底有什么東西:
- public interface OutputFormat<K, V> {
- /**
- * Get the {@link RecordWriter} for the given job.
- * 獲取輸出記錄鍵值記錄
- *
- * @param ignored
- * @param job configuration for the job whose output is being written.
- * @param name the unique name for this part of the output.
- * @param progress mechanism for reporting progress while writing to file.
- * @return a {@link RecordWriter} to write the output for the job.
- * @throws IOException
- */
- RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
- String name, Progressable progress)
- throws IOException;
- /**
- * Check for validity of the output-specification for the job.
- *
- * <p>This is to validate the output specification for the job when it is
- * a job is submitted. Typically checks that it does not already exist,
- * throwing an exception when it already exists, so that output is not
- * overwritten.</p>
- * 作業運行之前進行的檢測工作,例如配置的輸出目錄是否存在等
- *
- * @param ignored
- * @param job job configuration.
- * @throws IOException when output should not be attempted
- */
- void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;
- }
很簡單的2個方法,RecordWriter比較重要,后面的key-value的寫入操作都是根據他來完成的。但是他是一個接口,在MapReduce中,我們用的最多的他的子類是FileOutputFormat:
- /** A base class for {@link OutputFormat}. */
- public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
他是一個抽象類,但是實現了接口中的第二個方法checkOutputSpecs()方法:
- public void checkOutputSpecs(FileSystem ignored, JobConf job)
- throws FileAlreadyExistsException,
- InvalidJobConfException, IOException {
- // Ensure that the output directory is set and not already there
- Path outDir = getOutputPath(job);
- if (outDir == null && job.getNumReduceTasks() != 0) {
- throw new InvalidJobConfException("Output directory not set in JobConf.");
- }
- if (outDir != null) {
- FileSystem fs = outDir.getFileSystem(job);
- // normalize the output directory
- outDir = fs.makeQualified(outDir);
- setOutputPath(job, outDir);
- // get delegation token for the outDir's file system
- TokenCache.obtainTokensForNamenodes(job.getCredentials(),
- new Path[] {outDir}, job);
- // check its existence
- if (fs.exists(outDir)) {
- //如果輸出目錄以及存在,則拋異常
- throw new FileAlreadyExistsException("Output directory " + outDir +
- " already exists");
- }
- }
- }
就是檢查輸出目錄在不在的操作。在這個類里還出現了一個輔助類:
- public static Path getTaskOutputPath(JobConf conf, String name)
- throws IOException {
- // ${mapred.out.dir}
- Path outputPath = getOutputPath(conf);
- if (outputPath == null) {
- throw new IOException("Undefined job output-path");
- }
- //根據OutputCommitter獲取輸出路徑
- OutputCommitter committer = conf.getOutputCommitter();
- Path workPath = outputPath;
- TaskAttemptContext context = new TaskAttemptContext(conf,
- TaskAttemptID.forName(conf.get("mapred.task.id")));
- if (committer instanceof FileOutputCommitter) {
- workPath = ((FileOutputCommitter)committer).getWorkPath(context,
- outputPath);
- }
- // ${mapred.out.dir}/_temporary/_${taskid}/${name}
- return new Path(workPath, name);
- }
就是上面OutputCommiter,里面定義了很多和Task,job作業相關的方法。很多時候都會與OutputFormat合作的形式出現。他也有自己的子類實現FileOutputCommiter:
- public class FileOutputCommitter extends OutputCommitter {
- public static final Log LOG = LogFactory.getLog(
- "org.apache.hadoop.mapred.FileOutputCommitter");
- /**
- * Temporary directory name
- */
- public static final String TEMP_DIR_NAME = "_temporary";
- public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
- static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
- "mapreduce.fileoutputcommitter.marksuccessfuljobs";
- public void setupJob(JobContext context) throws IOException {
- JobConf conf = context.getJobConf();
- Path outputPath = FileOutputFormat.getOutputPath(conf);
- if (outputPath != null) {
- Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fileSys = tmpDir.getFileSystem(conf);
- if (!fileSys.mkdirs(tmpDir)) {
- LOG.error("Mkdirs failed to create " + tmpDir.toString());
- }
- }
- }
- ....
在Reduce階段的后面的寫階段,FileOutputFormat是默認的輸出的類型:
- //獲取輸出的key,value
- final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(
- reduceOutputCounter, job, reporter, finalName);
- OutputCollector<OUTKEY,OUTVALUE> collector =
- new OutputCollector<OUTKEY,OUTVALUE>() {
- public void collect(OUTKEY key, OUTVALUE value)
- throws IOException {
- //將處理后的key,value寫入輸出流中,最后寫入HDFS作為最終結果
- out.write(key, value);
- // indicate that progress update needs to be sent
- reporter.progress();
- }
- };
out就是直接發揮作用的類,但是是哪個Formtat的返回的呢,我們進入OldTrackingRecordWriter繼續看:
- public OldTrackingRecordWriter(
- org.apache.hadoop.mapred.Counters.Counter outputRecordCounter,
- JobConf job, TaskReporter reporter, String finalName)
- throws IOException {
- this.outputRecordCounter = outputRecordCounter;
- //默認是FileOutputFormat文件輸出方式
- this.fileOutputByteCounter = reporter
- .getCounter(FileOutputFormat.Counter.BYTES_WRITTEN);
- Statistics matchedStats = null;
- if (job.getOutputFormat() instanceof FileOutputFormat) {
- matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
- }
- fsStats = matchedStats;
- FileSystem fs = FileSystem.get(job);
- long bytesOutPrev = getOutputBytes(fsStats);
- //從配置中獲取作業的輸出方式
- this.real = job.getOutputFormat().getRecordWriter(fs, job, finalName,
- reporter);
- long bytesOutCurr = getOutputBytes(fsStats);
- fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
- }
果然是我們所想的那樣,FileOutputFormat,但是不要忘了它的getRecordWriter()是抽象方法,調用它還必須由它的子類來實現:
- public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,
- JobConf job, String name,
- Progressable progress)
- throws IOException;
在這里我們舉出其中在InputFormat舉過的例子,TextOutputFormat,SequenceFileOutputFormat,與TextInputFormat,SequenceFileInputFormat對應。也就說說上面2個子類定義了2種截然不同的輸出格式,也就返回了不一樣的RecordWriter實現類.在TextOutputFormat中,他定義了一個叫LineRecordWriter的定義:
- public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
- JobConf job,
- String name,
- Progressable progress)
- throws IOException {
- //從配置判斷輸出是否要壓縮
- boolean isCompressed = getCompressOutput(job);
- //配置中獲取加在key-value的分隔符
- String keyValueSeparator = job.get("mapred.textoutputformat.separator",
- "\t");
- //根據是否壓縮獲取相應的LineRecordWriter
- if (!isCompressed) {
- Path file = FileOutputFormat.getTaskOutputPath(job, name);
- FileSystem fs = file.getFileSystem(job);
- FSDataOutputStream fileOut = fs.create(file, progress);
- return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
- } else {
- Class<? extends CompressionCodec> codecClass =
- getOutputCompressorClass(job, GzipCodec.class);
- // create the named codec
- CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
- // build the filename including the extension
- Path file =
- FileOutputFormat.getTaskOutputPath(job,
- name + codec.getDefaultExtension());
- FileSystem fs = file.getFileSystem(job);
- FSDataOutputStream fileOut = fs.create(file, progress);
- return new LineRecordWriter<K, V>(new DataOutputStream
- (codec.createOutputStream(fileOut)),
- keyValueSeparator);
- }
他以一個內部類的形式存在於TextOutputFormat。而在SequenceFileOutputFormat中,他的形式是怎樣的呢:
- public RecordWriter<K, V> getRecordWriter(
- FileSystem ignored, JobConf job,
- String name, Progressable progress)
- throws IOException {
- // get the path of the temporary output file
- Path file = FileOutputFormat.getTaskOutputPath(job, name);
- FileSystem fs = file.getFileSystem(job);
- CompressionCodec codec = null;
- CompressionType compressionType = CompressionType.NONE;
- if (getCompressOutput(job)) {
- // find the kind of compression to do
- compressionType = getOutputCompressionType(job);
- // find the right codec
- Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
- DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, job);
- }
- final SequenceFile.Writer out =
- SequenceFile.createWriter(fs, job, file,
- job.getOutputKeyClass(),
- job.getOutputValueClass(),
- compressionType,
- codec,
- progress);
- return new RecordWriter<K, V>() {
- public void write(K key, V value)
- throws IOException {
- out.append(key, value);
- }
- public void close(Reporter reporter) throws IOException { out.close();}
- };
- }
關鍵的操作都在於SequenceFile.Writer中。有不同的RecordWriter就會有不同的寫入數據的方式,這里我們舉LineRecordWriter的例子。看看他的寫入方法:
- //往輸出流中寫入key-value
- public synchronized void write(K key, V value)
- throws IOException {
- //判斷鍵值對是否為空
- boolean nullKey = key == null || key instanceof NullWritable;
- boolean nullValue = value == null || value instanceof NullWritable;
- //如果k-v都為空,則操作失敗,不寫入直接返回
- if (nullKey && nullValue) {
- return;
- }
- //如果key不空,則寫入key
- if (!nullKey) {
- writeObject(key);
- }
- //如果key,value都不為空,則中間寫入k-v分隔符,在這里為\t空格符
- if (!(nullKey || nullValue)) {
- out.write(keyValueSeparator);
- }
- //最后寫入value
- if (!nullValue) {
- writeObject(value);
- }
在這個方法里,我們就能看出他的存儲形式就是我剛剛在上面講的第二種存儲方式。這個方法將會在下面的代碼中被執行:
- OutputCollector<OUTKEY,OUTVALUE> collector =
- new OutputCollector<OUTKEY,OUTVALUE>() {
- public void collect(OUTKEY key, OUTVALUE value)
- throws IOException {
- //將處理后的key,value寫入輸出流中,最后寫入HDFS作為最終結果
- out.write(key, value);
- // indicate that progress update needs to be sent
- reporter.progress();
- }
- };
過程可以這么理解:
collector.collect()------->out.write(key, value)------->LineRecordWriter.write(key, value)------->DataOutputStream.write(key, value).
DataOutputStream是內置於LineRecordWriter的作為里面的變量存在的。這樣從Reduce末尾階段到Output的過程也完全打通了。下面可以看看這上面涉及的完整的類目關系。
下一階段的學習,可能或偏向於Task,Job階段的過程分析,更加宏觀過程上的一個分析。也可能會分析某個功能塊的實現過程,比如Hadoop的IPC過程,據說用了很多JAVA NIO的東西。