平時我們寫MapReduce程序的時候,在設置輸入格式的時候,總會調用形如job.setInputFormatClass(KeyValueTextInputFormat.class);來保證輸入文件按照我們想要的格式被讀取。所有的輸入格式都繼承於InputFormat,這是一個抽象類,其子類有專門用於讀取普通文件的FileInputFormat,用來讀取數據庫的DBInputFormat等等。

不同的InputFormat都會按自己的實現來讀取輸入數據並產生輸入分片,一個輸入分片會被單獨的map task作為數據源。下面我們先看看這些輸入分片(inputSplit)是什么樣的。
InputSplit:
我們知道Mappers的輸入是一個一個的輸入分片,稱InputSplit。InputSplit是一個抽象類,它在邏輯上包含了提供給處理這個InputSplit的Mapper的所有K-V對。
- public abstract class InputSplit {
- public abstract long getLength() throws IOException, InterruptedException;
- public abstract
- String[] getLocations() throws IOException, InterruptedException;
- }
getLength()用來獲取InputSplit的大小,以支持對InputSplits進行排序,而getLocations()則用來獲取存儲分片的位置列表。
我們來看一個簡單InputSplit子類:FileSplit。
- public class FileSplit extends InputSplit implements Writable {
- private Path file;
- private long start;
- private long length;
- private String[] hosts;
- FileSplit() {}
- public FileSplit(Path file, long start, long length, String[] hosts) {
- this.file = file;
- this.start = start;
- this.length = length;
- this.hosts = hosts;
- }
- //序列化、反序列化方法,獲得hosts等等……
- }
從上面的源碼我們可以看到,一個FileSplit是由文件路徑,分片開始位置,分片大小和存儲分片數據的hosts列表組成,由這些信息我們就可以從輸入文件中切分出提供給單個Mapper的輸入數據。這些屬性會在Constructor設置,我們在后面會看到這會在InputFormat的getSplits()中構造這些分片。
我們再看CombineFileSplit:
- public class CombineFileSplit extends InputSplit implements Writable {
- private Path[] paths;
- private long[] startoffset;
- private long[] lengths;
- private String[] locations;
- private long totLength;
- public CombineFileSplit() {}
- public CombineFileSplit(Path[] files, long[] start,
- long[] lengths, String[] locations) {
- initSplit(files, start, lengths, locations);
- }
- public CombineFileSplit(Path[] files, long[] lengths) {
- long[] startoffset = new long[files.length];
- for (int i = 0; i < startoffset.length; i++) {
- startoffset[i] = 0;
- }
- String[] locations = new String[files.length];
- for (int i = 0; i < locations.length; i++) {
- locations[i] = "";
- }
- initSplit(files, startoffset, lengths, locations);
- }
- private void initSplit(Path[] files, long[] start,
- long[] lengths, String[] locations) {
- this.startoffset = start;
- this.lengths = lengths;
- this.paths = files;
- this.totLength = 0;
- this.locations = locations;
- for(long length : lengths) {
- totLength += length;
- }
- }
- //一些getter和setter方法,和序列化方法
- }
與FileSplit類似,CombineFileSplit同樣包含文件路徑,分片起始位置,分片大小和存儲分片數據的host列表,由於CombineFileSplit是針對小文件的,它把很多小文件包在一個InputSplit內,這樣一個Mapper就可以處理很多小文件。要知道我們上面的FileSplit是對應一個輸入文件的,也就是說如果用FileSplit對應的FileInputFormat來作為輸入格式,那么即使文件特別小,也是單獨計算成一個輸入分片來處理的。當我們的輸入是由大量小文件組成的,就會導致有同樣大量的InputSplit,從而需要同樣大量的Mapper來處理,這將很慢,想想有一堆map task要運行!!這是不符合Hadoop的設計理念的,Hadoop是為處理大文件優化的。
最后介紹TagInputSplit,這個類就是封裝了一個InputSplit,然后加了一些tags在里面滿足我們需要這些tags數據的情況,我們從下面就可以一目了然。
- class TaggedInputSplit extends InputSplit implements Configurable, Writable {
- private Class<? extends InputSplit> inputSplitClass;
- private InputSplit inputSplit;
- @SuppressWarnings("unchecked")
- private Class<? extends InputFormat> inputFormatClass;
- @SuppressWarnings("unchecked")
- private Class<? extends Mapper> mapperClass;
- private Configuration conf;
- //getters and setters,序列化方法,getLocations()、getLength()等
- }
現在我們對InputSplit的概念有了一些了解,我們繼續看它是怎么被使用和計算出來的。
InputFormat:
通過使用InputFormat,MapReduce框架可以做到:
1、驗證作業的輸入的正確性
2、將輸入文件切分成邏輯的InputSplits,一個InputSplit將被分配給一個單獨的Mapper task
3、提供RecordReader的實現,這個RecordReader會從InputSplit中正確讀出一條一條的K-V對供Mapper使用。
- public abstract class InputFormat<K, V> {
- public abstract
- List<InputSplit> getSplits(JobContext context
- ) throws IOException, InterruptedException;
- public abstract
- RecordReader<K,V> createRecordReader(InputSplit split,
- TaskAttemptContext context
- ) throws IOException,
- InterruptedException;
- }
上面是InputFormat的源碼,getSplits用來獲取由輸入文件計算出來的InputSplits,我們在后面會看到計算InputSplits的時候會考慮到輸入文件是否可分割、文件存儲時分塊的大小和文件大小等因素;而createRecordReader()提供了前面第三點所說的RecordReader的實現,以將K-V對從InputSplit中正確讀出來,比如LineRecordReader就以偏移值為key,一行的數據為value,這就使得所有其createRecordReader()返回了LineRecordReader的InputFormat都是以偏移值為key,一行數據為value的形式讀取輸入分片的。
FileInputFormat:
PathFilter被用來進行文件篩選,這樣我們就可以控制哪些文件要作為輸入,哪些不作為輸入。PathFilter有一個accept(Path)方法,當接收的Path要被包含進來,就返回true,否則返回false。可以通過設置mapred.input.pathFilter.class來設置用戶自定義的PathFilter。
- public interface PathFilter {
- boolean accept(Path path);
- }
FileInputFormat是InputFormat的子類,它包含了一個MultiPathFilter,這個MultiPathFilter由一個過濾隱藏文件(名字前綴為'-'或'.')的PathFilter和一些可能存在的用戶自定義的PathFilters組成,MultiPathFilter會在listStatus()方法中使用,而listStatus()方法又被getSplits()方法用來獲取輸入文件,也就是說實現了在獲取輸入分片前先進行文件過濾。
- private static class MultiPathFilter implements PathFilter {
- private List<PathFilter> filters;
- public MultiPathFilter(List<PathFilter> filters) {
- this.filters = filters;
- }
- public boolean accept(Path path) {
- for (PathFilter filter : filters) {
- if (!filter.accept(path)) {
- return false;
- }
- }
- return true;
- }
- }
這些PathFilter會在listStatus()方法中用到,listStatus()是用來獲取輸入數據列表的。
下面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它們會被用來計算分片大小。可以通過設置mapred.min.split.size和mapred.max.split.size來設置。splits鏈表用來存儲計算得到的輸入分片,files則存儲作為由listStatus()獲取的輸入文件列表。然后對於每個輸入文件,判斷是否可以分割,通過computeSplitSize計算出分片大小splitSize,計算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保證在minSize和maxSize之間,且如果minSize<=blockSize<=maxSize,則設為blockSize。然后我們根據這個splitSize計算出每個文件的inputSplits集合,然后加入分片列表splits中。注意到我們生成InputSplit的時候按上面說的使用文件路徑,分片起始位置,分片大小和存放這個文件的hosts列表來創建。最后我們還設置了輸入文件數量:mapreduce.input.num.files。
- public List<InputSplit> getSplits(JobContext job
- ) throws IOException {
- long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
- long maxSize = getMaxSplitSize(job);
- // generate splits
- List<InputSplit> splits = new ArrayList<InputSplit>();
- List<FileStatus>files = listStatus(job);
- for (FileStatus file: files) {
- Path path = file.getPath();
- FileSystem fs = path.getFileSystem(job.getConfiguration());
- long length = file.getLen();
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- if ((length != 0) && isSplitable(job, path)) {
- long blockSize = file.getBlockSize();
- long splitSize = computeSplitSize(blockSize, minSize, maxSize);
- long bytesRemaining = length;
- while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
- int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
- splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
- blkLocations[blkIndex].getHosts()));
- bytesRemaining -= splitSize;
- }
- if (bytesRemaining != 0) {
- splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
- blkLocations[blkLocations.length-1].getHosts()));
- }
- } else if (length != 0) {
- splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
- } else {
- //Create empty hosts array for zero length files
- splits.add(new FileSplit(path, 0, length, new String[0]));
- }
- }
- // Save the number of input files in the job-conf
- job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
- LOG.debug("Total # of splits: " + splits.size());
- return splits;
- }
- //……setters and getters
就這樣,利用FileInputFormat 的getSplits方法,我們就計算出了我們的作業的所有輸入分片了。
那這些計算出來的分片是怎么被map讀取出來的呢?就是InputFormat中的另一個方法createRecordReader(),FileInputFormat並沒有對這個方法做具體的要求,而是交給子類自行去實現它。
RecordReader:
RecordReader是用來從一個輸入分片中讀取一個一個的K -V 對的抽象類,我們可以將其看作是在InputSplit上的迭代器。我們從類圖中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它獲取分片上的下一個K-V 對。
我們再深入看看上面提到的RecordReader的一個子類:LineRecordReader。
LineRecordReader由一個FileSplit構造出來,start是這個FileSplit的起始位置,pos是當前讀取分片的位置,end是分片結束位置,in是打開的一個讀取這個分片的輸入流,它是使用這個FileSplit對應的文件名來打開的。key和value則分別是每次讀取的K-V對。然后我們還看到可以利用getProgress()來跟蹤讀取分片的進度,這個函數就是根據已經讀取的K-V對占總K-V對的比例來顯示進度的。
- public class LineRecordReader extends RecordReader<LongWritable, Text> {
- private static final Log LOG = LogFactory.getLog(LineRecordReader.class);
- private CompressionCodecFactory compressionCodecs = null;
- private long start;
- private long pos;
- private long end;
- private LineReader in;
- private int maxLineLength;
- private LongWritable key = null;
- private Text value = null;
- //我們知道LineRecordReader是讀取一個InputSplit的,它從InputSplit中不斷以其定義的格式讀取K-V對
- //initialize函數主要是計算分片的始末位置,以及打開想要的輸入流以供讀取K-V對,輸入流另外處理分片經過壓縮的情況
- public void initialize(InputSplit genericSplit,
- TaskAttemptContext context) throws IOException {
- FileSplit split = (FileSplit) genericSplit;
- Configuration job = context.getConfiguration();
- this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
- Integer.MAX_VALUE);
- start = split.getStart();
- end = start + split.getLength();
- final Path file = split.getPath();
- compressionCodecs = new CompressionCodecFactory(job);
- final CompressionCodec codec = compressionCodecs.getCodec(file);
- // open the file and seek to the start of the split
- FileSystem fs = file.getFileSystem(job);
- FSDataInputStream fileIn = fs.open(split.getPath());
- boolean skipFirstLine = false;
- if (codec != null) {
- in = new LineReader(codec.createInputStream(fileIn), job);
- end = Long.MAX_VALUE;
- } else {
- if (start != 0) {
- skipFirstLine = true;
- --start;
- fileIn.seek(start);
- }
- in = new LineReader(fileIn, job);
- }
- if (skipFirstLine) { // skip first line and re-establish "start".
- start += in.readLine(new Text(), 0,
- (int)Math.min((long)Integer.MAX_VALUE, end - start));
- }
- this.pos = start;
- }
- public boolean nextKeyValue() throws IOException {
- if (key == null) {
- key = new LongWritable();
- }
- key.set(pos); //對於LineRecordReader來說,它以偏移值為key,以一行為value
- if (value == null) {
- value = new Text();
- }
- int newSize = 0;
- while (pos < end) {
- newSize = in.readLine(value, maxLineLength,
- Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
- maxLineLength));
- if (newSize == 0) {
- break;
- }
- pos += newSize;
- if (newSize < maxLineLength) {
- break;
- }
- // line too long. try again
- LOG.info("Skipped line of size " + newSize + " at pos " +
- (pos - newSize));
- }
- if (newSize == 0) {
- key = null;
- value = null;
- return false;
- } else {
- return true;
- }
- }
- @Override
- public LongWritable getCurrentKey() {
- return key;
- }
- @Override
- public Text getCurrentValue() {
- return value;
- }
- /**
- * Get the progress within the split
- */
- public float getProgress() {
- if (start == end) {
- return 0.0f;
- } else {
- return Math.min(1.0f, (pos - start) / (float)(end - start));//讀取進度由已讀取InputSplit大小比總InputSplit大小
- }
- }
- public synchronized void close() throws IOException {
- if (in != null) {
- in.close();
- }
- }
- }
其它的一些RecordReader如SequenceFileRecordReader,CombineFileRecordReader.java等則對應不同的InputFormat。
下面繼續看看這些RecordReader是如何被MapReduce框架使用的。
我們先看看Mapper.class是什么樣的:
- public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
- public class Context
- extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
- public Context(Configuration conf, TaskAttemptID taskid,
- RecordReader<KEYIN,VALUEIN> reader,
- RecordWriter<KEYOUT,VALUEOUT> writer,
- OutputCommitter committer,
- StatusReporter reporter,
- InputSplit split) throws IOException, InterruptedException {
- super(conf, taskid, reader, writer, committer, reporter, split);
- }
- }
- /**
- * Called once at the beginning of the task.
- */
- protected void setup(Context context
- ) throws IOException, InterruptedException {
- // NOTHING
- }
- /**
- * Called once for each key/value pair in the input split. Most applications
- * should override this, but the default is the identity function.
- */
- @SuppressWarnings("unchecked")
- protected void map(KEYIN key, VALUEIN value,
- Context context) throws IOException, InterruptedException {
- context.write((KEYOUT) key, (VALUEOUT) value);
- }
- /**
- * Called once at the end of the task.
- */
- protected void cleanup(Context context
- ) throws IOException, InterruptedException {
- // NOTHING
- }
- /**
- * Expert users can override this method for more complete control over the
- * execution of the Mapper.
- * @param context
- * @throws IOException
- */
- public void run(Context context) throws IOException, InterruptedException {
- setup(context);
- while (context.nextKeyValue()) {
- map(context.getCurrentKey(), context.getCurrentValue(), context);
- }
- cleanup(context);
- }
我們寫MapReduce程序的時候,我們寫的mapper都要繼承這個Mapper.class,通常我們會重寫map()方法,map()每次接受一個K-V對,然后我們對這個K-V對進行處理,再分發出處理后的數據。我們也可能重寫setup()以對這個map task進行一些預處理,比如創建一個List之類的;我們也可能重寫cleanup()方法對做一些處理后的工作,當然我們也可能在cleanup()中寫出K-V對。舉個例子就是:InputSplit的數據是一些整數,然后我們要在mapper中算出它們的和。我們就可以在先設置個sum屬性,然后map()函數處理一個K-V對就是將其加到sum上,最后在cleanup()函數中調用context.write(key,value);
最后我們看看Mapper.class中的run()方法,它相當於map task的驅動,我們可以看到run()方法首先調用setup()進行初始操作,然后對每個context.nextKeyValue()獲取的K-V對,就調用map()函數進行處理,最后調用cleanup()做最后的處理。事實上,從text他.nextKeyValue()就是使用了相應的RecordReader來獲取K-V對的。
我們看看Mapper.class中的Context類,它繼承與MapContext,使用了一個RecordReader進行構造。下面我們再看這個MapContext。
- public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
- extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
- private RecordReader<KEYIN,VALUEIN> reader;
- private InputSplit split;
- public MapContext(Configuration conf, TaskAttemptID taskid,
- RecordReader<KEYIN,VALUEIN> reader,
- RecordWriter<KEYOUT,VALUEOUT> writer,
- OutputCommitter committer,
- StatusReporter reporter,
- InputSplit split) {
- super(conf, taskid, writer, committer, reporter);
- this.reader = reader;
- this.split = split;
- }
- /**
- * Get the input split for this map.
- */
- public InputSplit getInputSplit() {
- return split;
- }
- @Override
- public KEYIN getCurrentKey() throws IOException, InterruptedException {
- return reader.getCurrentKey();
- }
- @Override
- public VALUEIN getCurrentValue() throws IOException, InterruptedException {
- return reader.getCurrentValue();
- }
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return reader.nextKeyValue();
- }
- }
我們可以看到MapContext直接是使用傳入的RecordReader來進行K-V對的讀取了。
到現在,我們已經知道輸入文件是如何被讀取、過濾、分片、讀出K-V對,然后交給我們的Mapper類來處理的了。
最后,我們來看看FileInputFormat的幾個子類。
TextInputFormat:
TextInputFormat是FileInputFormat的子類,其createRecordReader()方法返回的就是LineRecordReader。
- public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
- @Override
- public RecordReader<LongWritable, Text>
- createRecordReader(InputSplit split,
- TaskAttemptContext context) {
- return new LineRecordReader();
- }
- @Override
- protected boolean isSplitable(JobContext context, Path file) {
- CompressionCodec codec =
- new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
- return codec == null;
- }
- }
我們還看到isSplitable()方法,當文件使用壓縮的形式,這個文件就不可分割,否則就讀取不到正確的數據了。這從某種程度上將影響分片的計算。有時我們希望一個文件只被一個Mapper處理的時候,我們就可以重寫isSplitable()方法,告訴MapReduce框架,我哪些文件可以分割,哪些文件不能分割而只能作為一個分片。
NLineInputFormat;
- public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {
- public static final String LINES_PER_MAP =
- "mapreduce.input.lineinputformat.linespermap";
- public RecordReader<LongWritable, Text> createRecordReader(
- InputSplit genericSplit, TaskAttemptContext context)
- throws IOException {
- context.setStatus(genericSplit.toString());
- return new LineRecordReader();
- }
- /**
- * Logically splits the set of input files for the job, splits N lines
- * of the input as one split.
- *
- * @see FileInputFormat#getSplits(JobContext)
- */
- public List<InputSplit> getSplits(JobContext job)
- throws IOException {
- List<InputSplit> splits = new ArrayList<InputSplit>();
- int numLinesPerSplit = getNumLinesPerSplit(job);
- for (FileStatus status : listStatus(job)) {
- splits.addAll(getSplitsForFile(status,
- job.getConfiguration(), numLinesPerSplit));
- }
- return splits;
- }
- public static List<FileSplit> getSplitsForFile(FileStatus status,
- Configuration conf, int numLinesPerSplit) throws IOException {
- List<FileSplit> splits = new ArrayList<FileSplit> ();
- Path fileName = status.getPath();
- if (status.isDir()) {
- throw new IOException("Not a file: " + fileName);
- }
- FileSystem fs = fileName.getFileSystem(conf);
- LineReader lr = null;
- try {
- FSDataInputStream in = fs.open(fileName);
- lr = new LineReader(in, conf);
- Text line = new Text();
- int numLines = 0;
- long begin = 0;
- long length = 0;
- int num = -1;
- while ((num = lr.readLine(line)) > 0) {
- numLines++;
- length += num;
- if (numLines == numLinesPerSplit) {
- // NLineInputFormat uses LineRecordReader, which always reads
- // (and consumes) at least one character out of its upper split
- // boundary. So to make sure that each mapper gets N lines, we
- // move back the upper split limits of each split
- // by one character here.
- if (begin == 0) {
- splits.add(new FileSplit(fileName, begin, length - 1,
- new String[] {}));
- } else {
- splits.add(new FileSplit(fileName, begin - 1, length,
- new String[] {}));
- }
- begin += length;
- length = 0;
- numLines = 0;
- }
- }
- if (numLines != 0) {
- splits.add(new FileSplit(fileName, begin, length, new String[]{}));
- }
- } finally {
- if (lr != null) {
- lr.close();
- }
- }
- return splits;
- }
- /**
- * Set the number of lines per split
- * @param job the job to modify
- * @param numLines the number of lines per split
- */
- public static void setNumLinesPerSplit(Job job, int numLines) {
- job.getConfiguration().setInt(LINES_PER_MAP, numLines);
- }
- /**
- * Get the number of lines per split
- * @param job the job
- * @return the number of lines per split
- */
- public static int getNumLinesPerSplit(JobContext job) {
- return job.getConfiguration().getInt(LINES_PER_MAP, 1);
- }


