(一)MapReduce篇之InputFormat,InputSplit,RecordReader(轉)


    平時我們寫MapReduce程序的時候,在設置輸入格式的時候,總會調用形如job.setInputFormatClass(KeyValueTextInputFormat.class);來保證輸入文件按照我們想要的格式被讀取。所有的輸入格式都繼承於InputFormat,這是一個抽象類,其子類有專門用於讀取普通文件的FileInputFormat,用來讀取數據庫的DBInputFormat等等。

 

 

  不同的InputFormat都會按自己的實現來讀取輸入數據並產生輸入分片,一個輸入分片會被單獨的map task作為數據源。下面我們先看看這些輸入分片(inputSplit)是什么樣的。

InputSplit:

  我們知道Mappers的輸入是一個一個的輸入分片,稱InputSplit。InputSplit是一個抽象類,它在邏輯上包含了提供給處理這個InputSplit的Mapper的所有K-V對。

 

[html]  view plain copy print ?
  1. public abstract class InputSplit {  
  2.   public abstract long getLength() throws IOException, InterruptedException;  
  3.   
  4.   public abstract   
  5.     String[] getLocations() throws IOException, InterruptedException;  
  6. }  

  getLength()用來獲取InputSplit的大小,以支持對InputSplits進行排序,而getLocations()則用來獲取存儲分片的位置列表。
  我們來看一個簡單InputSplit子類:FileSplit。

[html]  view plain copy print ?
  1. public class FileSplit extends InputSplit implements Writable {  
  2.   private Path file;  
  3.   private long start;  
  4.   private long length;  
  5.   private String[] hosts;  
  6.   
  7.   FileSplit() {}  
  8.   
  9.   public FileSplit(Path file, long start, long length, String[] hosts) {  
  10.     this.file = file;  
  11.     this.start = start;  
  12.     this.length = length;  
  13.     this.hosts = hosts;  
  14.   }  
  15.  //序列化、反序列化方法,獲得hosts等等……  
  16. }  


  從上面的源碼我們可以看到,一個FileSplit是由文件路徑,分片開始位置,分片大小和存儲分片數據的hosts列表組成,由這些信息我們就可以從輸入文件中切分出提供給單個Mapper的輸入數據。這些屬性會在Constructor設置,我們在后面會看到這會在InputFormat的getSplits()中構造這些分片。

 

  我們再看CombineFileSplit:

[html]  view plain copy print ?
  1. public class CombineFileSplit extends InputSplit implements Writable {  
  2.   
  3.   private Path[] paths;  
  4.   private long[] startoffset;  
  5.   private long[] lengths;  
  6.   private String[] locations;  
  7.   private long totLength;  
  8.   
  9.   public CombineFileSplit() {}  
  10.   public CombineFileSplit(Path[] files, long[] start,   
  11.                           long[] lengths, String[] locations) {  
  12.     initSplit(files, start, lengths, locations);  
  13.   }  
  14.   
  15.   public CombineFileSplit(Path[] files, long[] lengths) {  
  16.     long[] startoffset = new long[files.length];  
  17.     for (int i = 0; i < startoffset.length; i++) {  
  18.       startoffset[i] = 0;  
  19.     }  
  20.     String[] locations = new String[files.length];  
  21.     for (int i = 0; i < locations.length; i++) {  
  22.       locations[i] = "";  
  23.     }  
  24.     initSplit(files, startoffset, lengths, locations);  
  25.   }  
  26.     
  27.   private void initSplit(Path[] files, long[] start,   
  28.                          long[] lengths, String[] locations) {  
  29.     this.startoffset = start;  
  30.     this.lengths = lengths;  
  31.     this.paths = files;  
  32.     this.totLength = 0;  
  33.     this.locations = locations;  
  34.     for(long length : lengths) {  
  35.       totLength += length;  
  36.     }  
  37.   }  
  38.   //一些getter和setter方法,和序列化方法  
  39. }  


  與FileSplit類似,CombineFileSplit同樣包含文件路徑,分片起始位置,分片大小和存儲分片數據的host列表,由於CombineFileSplit是針對小文件的,它把很多小文件包在一個InputSplit內,這樣一個Mapper就可以處理很多小文件。要知道我們上面的FileSplit是對應一個輸入文件的,也就是說如果用FileSplit對應的FileInputFormat來作為輸入格式,那么即使文件特別小,也是單獨計算成一個輸入分片來處理的。當我們的輸入是由大量小文件組成的,就會導致有同樣大量的InputSplit,從而需要同樣大量的Mapper來處理,這將很慢,想想有一堆map task要運行!!這是不符合Hadoop的設計理念的,Hadoop是為處理大文件優化的。

 

  最后介紹TagInputSplit,這個類就是封裝了一個InputSplit,然后加了一些tags在里面滿足我們需要這些tags數據的情況,我們從下面就可以一目了然。

[html]  view plain copy print ?
  1. class TaggedInputSplit extends InputSplit implements Configurable, Writable {  
  2.   
  3.   private Class<? extends InputSplit> inputSplitClass;  
  4.   
  5.   private InputSplit inputSplit;  
  6.   
  7.   @SuppressWarnings("unchecked")  
  8.   private Class<? extends InputFormat> inputFormatClass;  
  9.   
  10.   @SuppressWarnings("unchecked")  
  11.   private Class<? extends Mapper> mapperClass;  
  12.   
  13.   private Configuration conf;  
  14.   //getters and setters,序列化方法,getLocations()、getLength()等  
  15. }  

     現在我們對InputSplit的概念有了一些了解,我們繼續看它是怎么被使用和計算出來的。

InputFormat:

  通過使用InputFormat,MapReduce框架可以做到:

  1、驗證作業的輸入的正確性

  2、將輸入文件切分成邏輯的InputSplits,一個InputSplit將被分配給一個單獨的Mapper task

  3、提供RecordReader的實現,這個RecordReader會從InputSplit中正確讀出一條一條的K-V對供Mapper使用。

 

[html]  view plain copy print ?
  1. public abstract class InputFormat<K, V> {  
  2.   
  3.   public abstract   
  4.     List<InputSplit> getSplits(JobContext context  
  5.                                ) throws IOException, InterruptedException;  
  6.     
  7.   public abstract   
  8.     RecordReader<K,V> createRecordReader(InputSplit split,  
  9.                                          TaskAttemptContext context  
  10.                                         ) throws IOException,   
  11.                                                  InterruptedException;  
  12.   
  13. }  

 

  上面是InputFormat的源碼,getSplits用來獲取由輸入文件計算出來的InputSplits,我們在后面會看到計算InputSplits的時候會考慮到輸入文件是否可分割、文件存儲時分塊的大小和文件大小等因素;而createRecordReader()提供了前面第三點所說的RecordReader的實現,以將K-V對從InputSplit中正確讀出來,比如LineRecordReader就以偏移值為key,一行的數據為value,這就使得所有其createRecordReader()返回了LineRecordReader的InputFormat都是以偏移值為key,一行數據為value的形式讀取輸入分片的。

 

FileInputFormat:

  PathFilter被用來進行文件篩選,這樣我們就可以控制哪些文件要作為輸入,哪些不作為輸入。PathFilter有一個accept(Path)方法,當接收的Path要被包含進來,就返回true,否則返回false。可以通過設置mapred.input.pathFilter.class來設置用戶自定義的PathFilter。

 

[html]  view plain copy print ?
  1. public interface PathFilter {  
  2.   boolean accept(Path path);  
  3. }  

 

  FileInputFormat是InputFormat的子類,它包含了一個MultiPathFilter,這個MultiPathFilter由一個過濾隱藏文件(名字前綴為'-'或'.')的PathFilter和一些可能存在的用戶自定義的PathFilters組成,MultiPathFilter會在listStatus()方法中使用,而listStatus()方法又被getSplits()方法用來獲取輸入文件,也就是說實現了在獲取輸入分片前先進行文件過濾。

[html]  view plain copy print ?
  1. private static class MultiPathFilter implements PathFilter {  
  2.   private List<PathFilter> filters;  
  3.   
  4.   public MultiPathFilter(List<PathFilter> filters) {  
  5.     this.filters = filters;  
  6.   }  
  7.   
  8.   public boolean accept(Path path) {  
  9.     for (PathFilter filter : filters) {  
  10.       if (!filter.accept(path)) {  
  11.         return false;  
  12.       }  
  13.     }  
  14.     return true;  
  15.   }  
  16. }  

   這些PathFilter會在listStatus()方法中用到,listStatus()是用來獲取輸入數據列表的。

  下面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它們會被用來計算分片大小。可以通過設置mapred.min.split.size和mapred.max.split.size來設置。splits鏈表用來存儲計算得到的輸入分片,files則存儲作為由listStatus()獲取的輸入文件列表。然后對於每個輸入文件,判斷是否可以分割,通過computeSplitSize計算出分片大小splitSize,計算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保證在minSize和maxSize之間,且如果minSize<=blockSize<=maxSize,則設為blockSize。然后我們根據這個splitSize計算出每個文件的inputSplits集合,然后加入分片列表splits中。注意到我們生成InputSplit的時候按上面說的使用文件路徑,分片起始位置,分片大小和存放這個文件的hosts列表來創建。最后我們還設置了輸入文件數量:mapreduce.input.num.files。

[plain]  view plain copy print ?
  1. public List<InputSplit> getSplits(JobContext job  
  2.                                   ) throws IOException {  
  3.   long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  
  4.   long maxSize = getMaxSplitSize(job);  
  5.   
  6.   // generate splits  
  7.   List<InputSplit> splits = new ArrayList<InputSplit>();  
  8.   List<FileStatus>files = listStatus(job);  
  9.   for (FileStatus file: files) {  
  10.     Path path = file.getPath();  
  11.     FileSystem fs = path.getFileSystem(job.getConfiguration());  
  12.     long length = file.getLen();  
  13.     BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);  
  14.     if ((length != 0) && isSplitable(job, path)) {   
  15.       long blockSize = file.getBlockSize();  
  16.       long splitSize = computeSplitSize(blockSize, minSize, maxSize);  
  17.   
  18.       long bytesRemaining = length;  
  19.       while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
  20.         int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
  21.         splits.add(new FileSplit(path, length-bytesRemaining, splitSize,   
  22.                                  blkLocations[blkIndex].getHosts()));  
  23.         bytesRemaining -= splitSize;  
  24.       }  
  25.         
  26.       if (bytesRemaining != 0) {  
  27.         splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,   
  28.                    blkLocations[blkLocations.length-1].getHosts()));  
  29.       }  
  30.     } else if (length != 0) {  
  31.       splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));  
  32.     } else {   
  33.       //Create empty hosts array for zero length files  
  34.       splits.add(new FileSplit(path, 0, length, new String[0]));  
  35.     }  
  36.   }  
  37.     
  38.   // Save the number of input files in the job-conf  
  39.   job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());  
  40.   
  41.   LOG.debug("Total # of splits: " + splits.size());  
  42.   return splits;  
  43. }  
  44. //……setters and getters  


  就這樣,利用FileInputFormat 的getSplits方法,我們就計算出了我們的作業的所有輸入分片了。

 

  那這些計算出來的分片是怎么被map讀取出來的呢?就是InputFormat中的另一個方法createRecordReader(),FileInputFormat並沒有對這個方法做具體的要求,而是交給子類自行去實現它。
RecordReader
  RecordReader是用來從一個輸入分片中讀取一個一個的K -V 對的抽象類,我們可以將其看作是在InputSplit上的迭代器。我們從類圖中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它獲取分片上的下一個K-V 對。

  我們再深入看看上面提到的RecordReader的一個子類:LineRecordReader。

  LineRecordReader由一個FileSplit構造出來,start是這個FileSplit的起始位置,pos是當前讀取分片的位置,end是分片結束位置,in是打開的一個讀取這個分片的輸入流,它是使用這個FileSplit對應的文件名來打開的。key和value則分別是每次讀取的K-V對。然后我們還看到可以利用getProgress()來跟蹤讀取分片的進度,這個函數就是根據已經讀取的K-V對占總K-V對的比例來顯示進度的。

 

[html]  view plain copy print ?
  1. public class LineRecordReader extends RecordReader<LongWritable, Text> {  
  2.   private static final Log LOG = LogFactory.getLog(LineRecordReader.class);  
  3.   
  4.   private CompressionCodecFactory compressionCodecs = null;  
  5.   private long start;  
  6.   private long pos;  
  7.   private long end;  
  8.   private LineReader in;  
  9.   private int maxLineLength;  
  10.   private LongWritable key = null;  
  11.   private Text value = null;  
  12.   
  13.   //我們知道LineRecordReader是讀取一個InputSplit的,它從InputSplit中不斷以其定義的格式讀取K-V對  
  14.   //initialize函數主要是計算分片的始末位置,以及打開想要的輸入流以供讀取K-V對,輸入流另外處理分片經過壓縮的情況  
  15.   public void initialize(InputSplit genericSplit,  
  16.                          TaskAttemptContext context) throws IOException {  
  17.     FileSplit split = (FileSplit) genericSplit;  
  18.     Configuration job = context.getConfiguration();  
  19.     this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",  
  20.                                     Integer.MAX_VALUE);  
  21.     start = split.getStart();  
  22.     end = start + split.getLength();  
  23.     final Path file = split.getPath();  
  24.     compressionCodecs = new CompressionCodecFactory(job);  
  25.     final CompressionCodec codec = compressionCodecs.getCodec(file);  
  26.   
  27.     // open the file and seek to the start of the split  
  28.     FileSystem fs = file.getFileSystem(job);  
  29.     FSDataInputStream fileIn = fs.open(split.getPath());  
  30.     boolean skipFirstLine = false;  
  31.     if (codec != null) {  
  32.       in = new LineReader(codec.createInputStream(fileIn), job);  
  33.       end = Long.MAX_VALUE;  
  34.     } else {  
  35.       if (start != 0) {  
  36.         skipFirstLine = true;  
  37.         --start;  
  38.         fileIn.seek(start);  
  39.       }  
  40.       in = new LineReader(fileIn, job);  
  41.     }  
  42.     if (skipFirstLine) {  // skip first line and re-establish "start".  
  43.       start += in.readLine(new Text(), 0,  
  44.                            (int)Math.min((long)Integer.MAX_VALUE, end - start));  
  45.     }  
  46.     this.pos = start;  
  47.   }  
  48.     
  49.   public boolean nextKeyValue() throws IOException {  
  50.     if (key == null) {  
  51.       key = new LongWritable();  
  52.     }  
  53.     key.set(pos); //對於LineRecordReader來說,它以偏移值為key,以一行為value  
  54.     if (value == null) {  
  55.       value = new Text();  
  56.     }  
  57.     int newSize = 0;  
  58.     while (pos < end) {  
  59.       newSize = in.readLine(value, maxLineLength,  
  60.                             Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),  
  61.                                      maxLineLength));  
  62.       if (newSize == 0) {  
  63.         break;  
  64.       }  
  65.       pos += newSize;  
  66.       if (newSize < maxLineLength) {  
  67.         break;  
  68.       }  
  69.   
  70.       // line too long. try again  
  71.       LOG.info("Skipped line of size " + newSize + " at pos " +   
  72.                (pos - newSize));  
  73.     }  
  74.     if (newSize == 0) {  
  75.       key = null;  
  76.       value = null;  
  77.       return false;  
  78.     } else {  
  79.       return true;  
  80.     }  
  81.   }  
  82.   
  83.   @Override  
  84.   public LongWritable getCurrentKey() {  
  85.     return key;  
  86.   }  
  87.   
  88.   @Override  
  89.   public Text getCurrentValue() {  
  90.     return value;  
  91.   }  
  92.   
  93.   /**  
  94.    * Get the progress within the split  
  95.    */  
  96.   public float getProgress() {  
  97.     if (start == end) {  
  98.       return 0.0f;  
  99.     } else {  
  100.       return Math.min(1.0f, (pos - start) / (float)(end - start));//讀取進度由已讀取InputSplit大小比總InputSplit大小  
  101.     }  
  102.   }  
  103.     
  104.   public synchronized void close() throws IOException {  
  105.     if (in != null) {  
  106.       in.close();   
  107.     }  
  108.   }  
  109. }  

其它的一些RecordReader如SequenceFileRecordReaderCombineFileRecordReader.java等則對應不同的InputFormat。

 

  下面繼續看看這些RecordReader是如何被MapReduce框架使用的。

  我們先看看Mapper.class是什么樣的:

 

[html]  view plain copy print ?
  1. public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {  
  2.   
  3.   public class Context   
  4.     extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {  
  5.     public Context(Configuration conf, TaskAttemptID taskid,  
  6.                    RecordReader<KEYIN,VALUEIN> reader,  
  7.                    RecordWriter<KEYOUT,VALUEOUT> writer,  
  8.                    OutputCommitter committer,  
  9.                    StatusReporter reporter,  
  10.                    InputSplit split) throws IOException, InterruptedException {  
  11.       super(conf, taskid, reader, writer, committer, reporter, split);  
  12.     }  
  13.   }  
  14.     
  15.   /**  
  16.    * Called once at the beginning of the task.  
  17.    */  
  18.   protected void setup(Context context  
  19.                        ) throws IOException, InterruptedException {  
  20.     // NOTHING  
  21.   }  
  22.   
  23.   /**  
  24.    * Called once for each key/value pair in the input split. Most applications  
  25.    * should override this, but the default is the identity function.  
  26.    */  
  27.   @SuppressWarnings("unchecked")  
  28.   protected void map(KEYIN key, VALUEIN value,   
  29.                      Context context) throws IOException, InterruptedException {  
  30.     context.write((KEYOUT) key, (VALUEOUT) value);  
  31.   }  
  32.   
  33.   /**  
  34.    * Called once at the end of the task.  
  35.    */  
  36.   protected void cleanup(Context context  
  37.                          ) throws IOException, InterruptedException {  
  38.     // NOTHING  
  39.   }  
  40.     
  41.   /**  
  42.    * Expert users can override this method for more complete control over the  
  43.    * execution of the Mapper.  
  44.    * @param context  
  45.    * @throws IOException  
  46.    */  
  47.   public void run(Context context) throws IOException, InterruptedException {  
  48.     setup(context);  
  49.     while (context.nextKeyValue()) {  
  50.       map(context.getCurrentKey(), context.getCurrentValue(), context);  
  51.     }  
  52.     cleanup(context);  
  53.   }  


  我們寫MapReduce程序的時候,我們寫的mapper都要繼承這個Mapper.class,通常我們會重寫map()方法,map()每次接受一個K-V對,然后我們對這個K-V對進行處理,再分發出處理后的數據。我們也可能重寫setup()以對這個map task進行一些預處理,比如創建一個List之類的;我們也可能重寫cleanup()方法對做一些處理后的工作,當然我們也可能在cleanup()中寫出K-V對。舉個例子就是:InputSplit的數據是一些整數,然后我們要在mapper中算出它們的和。我們就可以在先設置個sum屬性,然后map()函數處理一個K-V對就是將其加到sum上,最后在cleanup()函數中調用context.write(key,value);

 

    最后我們看看Mapper.class中的run()方法,它相當於map task的驅動,我們可以看到run()方法首先調用setup()進行初始操作,然后對每個context.nextKeyValue()獲取的K-V對,就調用map()函數進行處理,最后調用cleanup()做最后的處理。事實上,從text他.nextKeyValue()就是使用了相應的RecordReader來獲取K-V對的。

   我們看看Mapper.class中的Context類,它繼承與MapContext,使用了一個RecordReader進行構造。下面我們再看這個MapContext

[html]  view plain copy print ?
  1. public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>   
  2.   extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {  
  3.   private RecordReader<KEYIN,VALUEIN> reader;  
  4.   private InputSplit split;  
  5.   
  6.   public MapContext(Configuration conf, TaskAttemptID taskid,  
  7.                     RecordReader<KEYIN,VALUEIN> reader,  
  8.                     RecordWriter<KEYOUT,VALUEOUT> writer,  
  9.                     OutputCommitter committer,  
  10.                     StatusReporter reporter,  
  11.                     InputSplit split) {  
  12.     super(conf, taskid, writer, committer, reporter);  
  13.     this.reader = reader;  
  14.     this.split = split;  
  15.   }  
  16.   
  17.   /**  
  18.    * Get the input split for this map.  
  19.    */  
  20.   public InputSplit getInputSplit() {  
  21.     return split;  
  22.   }  
  23.   
  24.   @Override  
  25.   public KEYIN getCurrentKey() throws IOException, InterruptedException {  
  26.     return reader.getCurrentKey();  
  27.   }  
  28.   
  29.   @Override  
  30.   public VALUEIN getCurrentValue() throws IOException, InterruptedException {  
  31.     return reader.getCurrentValue();  
  32.   }  
  33.   
  34.   @Override  
  35.   public boolean nextKeyValue() throws IOException, InterruptedException {  
  36.     return reader.nextKeyValue();  
  37.   }  
  38.   
  39. }  
  40.       


  我們可以看到MapContext直接是使用傳入的RecordReader來進行K-V對的讀取了。

 到現在,我們已經知道輸入文件是如何被讀取、過濾、分片、讀出K-V對,然后交給我們的Mapper類來處理的了。

 最后,我們來看看FileInputFormat的幾個子類。

TextInputFormat:

  TextInputFormat是FileInputFormat的子類,其createRecordReader()方法返回的就是LineRecordReader。

 

[html]  view plain copy print ?
  1. public class TextInputFormat extends FileInputFormat<LongWritable, Text> {  
  2.   
  3.   @Override  
  4.   public RecordReader<LongWritable, Text>   
  5.     createRecordReader(InputSplit split,  
  6.                        TaskAttemptContext context) {  
  7.     return new LineRecordReader();  
  8.   }  
  9.   
  10.   @Override  
  11.   protected boolean isSplitable(JobContext context, Path file) {  
  12.     CompressionCodec codec =   
  13.       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);  
  14.     return codec == null;  
  15.   }  
  16. }  


  我們還看到isSplitable()方法,當文件使用壓縮的形式,這個文件就不可分割,否則就讀取不到正確的數據了。這從某種程度上將影響分片的計算。有時我們希望一個文件只被一個Mapper處理的時候,我們就可以重寫isSplitable()方法,告訴MapReduce框架,我哪些文件可以分割,哪些文件不能分割而只能作為一個分片。

  

NLineInputFormat;

  NLineInputFormat也是FileInputFormat的子類,與名字一致,它是根據行數來划分InputSplits而不是像TextInputFormat那樣依賴分片大小和行的長度的。也就是說,TextInputFormat當一行很長或分片比較小時,獲取的分片可能只包含很少的K-V對,這樣一個map task處理的K-V對就很少,這可能很不理想。因此我們可以使用NLineInputFormat來控制一個map task處理的K-V對,這是通過分割InputSplits時按行數分割的方法來實現的,這我們在代碼中可以看出來。我們可以設置mapreduce.input.lineinputformat.linespermap來設置這個行數。
[html]  view plain copy print ?
  1. public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {   
  2.   public static final String LINES_PER_MAP =   
  3.     "mapreduce.input.lineinputformat.linespermap";  
  4.   
  5.   public RecordReader<LongWritable, Text> createRecordReader(  
  6.       InputSplit genericSplit, TaskAttemptContext context)   
  7.       throws IOException {  
  8.     context.setStatus(genericSplit.toString());  
  9.     return new LineRecordReader();  
  10.   }  
  11.   
  12.   /**   
  13.    * Logically splits the set of input files for the job, splits N lines  
  14.    * of the input as one split.  
  15.    *   
  16.    * @see FileInputFormat#getSplits(JobContext)  
  17.    */  
  18.   public List<InputSplit> getSplits(JobContext job)  
  19.   throws IOException {  
  20.     List<InputSplit> splits = new ArrayList<InputSplit>();  
  21.     int numLinesPerSplit = getNumLinesPerSplit(job);  
  22.     for (FileStatus status : listStatus(job)) {  
  23.       splits.addAll(getSplitsForFile(status,  
  24.         job.getConfiguration(), numLinesPerSplit));  
  25.     }  
  26.     return splits;  
  27.   }  
  28.     
  29.   public static List<FileSplit> getSplitsForFile(FileStatus status,  
  30.       Configuration conf, int numLinesPerSplit) throws IOException {  
  31.     List<FileSplit> splits = new ArrayList<FileSplit> ();  
  32.     Path fileName = status.getPath();  
  33.     if (status.isDir()) {  
  34.       throw new IOException("Not a file: " + fileName);  
  35.     }  
  36.     FileSystem  fs = fileName.getFileSystem(conf);  
  37.     LineReader lr = null;  
  38.     try {  
  39.       FSDataInputStream in  = fs.open(fileName);  
  40.       lr = new LineReader(in, conf);  
  41.       Text line = new Text();  
  42.       int numLines = 0;  
  43.       long begin = 0;  
  44.       long length = 0;  
  45.       int num = -1;  
  46.       while ((num = lr.readLine(line)) > 0) {  
  47.         numLines++;  
  48.         length += num;  
  49.         if (numLines == numLinesPerSplit) {  
  50.           // NLineInputFormat uses LineRecordReader, which always reads  
  51.           // (and consumes) at least one character out of its upper split  
  52.           // boundary. So to make sure that each mapper gets N lines, we  
  53.           // move back the upper split limits of each split   
  54.           // by one character here.  
  55.           if (begin == 0) {  
  56.             splits.add(new FileSplit(fileName, begin, length - 1,  
  57.               new String[] {}));  
  58.           } else {  
  59.             splits.add(new FileSplit(fileName, begin - 1, length,  
  60.               new String[] {}));  
  61.           }  
  62.           begin += length;  
  63.           length = 0;  
  64.           numLines = 0;  
  65.         }  
  66.       }  
  67.       if (numLines != 0) {  
  68.         splits.add(new FileSplit(fileName, begin, length, new String[]{}));  
  69.       }  
  70.     } finally {  
  71.       if (lr != null) {  
  72.         lr.close();  
  73.       }  
  74.     }  
  75.     return splits;   
  76.   }  
  77.     
  78.   /**  
  79.    * Set the number of lines per split  
  80.    * @param job the job to modify  
  81.    * @param numLines the number of lines per split  
  82.    */  
  83.   public static void setNumLinesPerSplit(Job job, int numLines) {  
  84.     job.getConfiguration().setInt(LINES_PER_MAP, numLines);  
  85.   }  
  86.   
  87.   /**  
  88.    * Get the number of lines per split  
  89.    * @param job the job  
  90.    * @return the number of lines per split  
  91.    */  
  92.   public static int getNumLinesPerSplit(JobContext job) {  
  93.     return job.getConfiguration().getInt(LINES_PER_MAP, 1);  
  94.   }  


  現在,我們對Hadoop的輸入格式和其在MapReduce中如何被使用有了具體的了解了。
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM