[Hadoop源碼詳解]之一MapReduce篇之InputFormat


個人小站,正在持續整理中,歡迎訪問:http://shitouer.cn

小站博文地址:[Hadoop源碼詳解]之一MapReduce篇之InputFormat

 

1. 概述

我們在設置MapReduce輸入格式的時候,會調用這樣一條語句:

job.setInputFormatClass(KeyValueTextInputFormat.class);

這條語句保證了輸入文件會按照我們預設的格式被讀取。KeyValueTextInputFormat即為我們設定的數據讀取格式。

所有的輸入格式類都繼承自InputFormat,這是一個抽象類。其子類有例如專門用於讀取普通文件的FileInputFormat,還有用來讀取數據庫的DBInputFormat等等。相關類圖簡單畫出如下(推薦新標簽中打開圖片查看):

2. InputFormat

從InputFormat類圖看,InputFormat抽象類僅有兩個抽象方法:

  • List<InputSplit> getSplits(), 獲取由輸入文件計算出輸入分片(InputSplit),解決數據或文件分割成片問題。
  • RecordReader<K,V> createRecordReader(),創建RecordReader,從InputSplit中讀取數據,解決讀取分片中數據問題。

在后面說到InputSplits的時候,會介紹在getSplits()時需要驗證輸入文件是否可分割、文件存儲時分塊的大小和文件大小等因素,所以總體來說,通過InputFormat,Mapreduce框架可以做到:

  • 驗證作業輸入的正確性
  • 將輸入文件切割成邏輯分片(InputSplit),一個InputSplit將會被分配給一個獨立的MapTask
  • 提供RecordReader實現,讀取InputSplit中的“K-V對”供Mapper使用

InputFormat抽象類源碼也很簡單,如下供參考(文章格式考慮,刪除了部分注釋,添加了部分中文注釋):

public abstract class InputFormat<K, V> {

	/**
	 * 僅僅是邏輯分片,並沒有物理分片,所以每一個分片類似於這樣一個元組 <input-file-path, start, offset>
	 */
	public abstract List<InputSplit> getSplits(JobContext context)
			throws IOException, InterruptedException;

	/**
	 * Create a record reader for a given split.
	 */
	public abstract RecordReader<K, V> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException,
			InterruptedException;

}

不同的InputFormat會各自實現不同的文件讀取方式以及分片方式,每個輸入分片會被單獨的map task作為數據源。下面詳細介紹輸入分片(inputSplit)是什么。

 3.InputSplit

Mappers的輸入是一個一個的輸入分片,稱InputSplit。看源碼可知,InputSplit也是一個抽象類,它在邏輯上包含了提供給處理這個InputSplit的Mapper的所有K-V對。

public abstract class InputSplit {
	  /**
	   * 獲取Split的大小,支持根據size對InputSplit排序.
	   */
	  public abstract long getLength() throws IOException, InterruptedException;

	  /**
	   * 獲取存儲該分片的數據所在的節點位置.
	   */
	  public abstract 
	    String[] getLocations() throws IOException, InterruptedException;
}

下面深入看一個InputSplit的子類:FileSplit類

public class FileSplit extends InputSplit implements Writable {
	private Path file;
	private long start;
	private long length;
	private String[] hosts;

	/**
	 * Constructs a split with host information
	 * 
	 * @param file
	 *            the file name
	 * @param start
	 *            the position of the first byte in the file to process
	 * @param length
	 *            the number of bytes in the file to process
	 * @param hosts
	 *            the list of hosts containing the block, possibly null
	 */
	public FileSplit(Path file, long start, long length, String[] hosts) {
		this.file = file;
		this.start = start;
		this.length = length;
		this.hosts = hosts;
	}

	/** The number of bytes in the file to process. */
	@Override
	public long getLength() {
		return length;
	}

	@Override
	public String[] getLocations() throws IOException {
		if (this.hosts == null) {
			return new String[] {};
		} else {
			return this.hosts;
		}
	}

	// 略掉部分方法
}

從源碼中可以看出,FileSplit有四個屬性:文件路徑,分片起始位置,分片長度和存儲分片的hosts。用這四項數據,就可以計算出提供給每個Mapper的分片數據。在InputFormat的getSplit()方法中構造分片,分片的四個屬性會通過調用FileSplit的Constructor設置。

再看一個InputSplit的子類:CombineFileSplit。源碼如下:

public class CombineFileSplit extends InputSplit implements Writable {

	private Path[] paths;
	private long[] startoffset;
	private long[] lengths;
	private String[] locations;
	private long totLength;

	public CombineFileSplit(Path[] files, long[] start, long[] lengths,
			String[] locations) {
		initSplit(files, start, lengths, locations);
	}

	private void initSplit(Path[] files, long[] start, long[] lengths,
			String[] locations) {
		this.startoffset = start;
		this.lengths = lengths;
		this.paths = files;
		this.totLength = 0;
		this.locations = locations;
		for (long length : lengths) {
			totLength += length;
		}
	}

	public long getLength() {
		return totLength;
	}

	/** Returns all the Paths where this input-split resides */
	public String[] getLocations() throws IOException {
		return locations;
	}

	//省略了部分構造函數和方法,深入學習請閱讀源文件
}

為什么介紹該類呢,因為接下來要學習《Hadoop學習(五) – 小文件處理》,深入理解該類,將有助於該節學習。

上面我們介紹的FileSplit對應的是一個輸入文件,也就是說,如果用FileSplit對應的FileInputFormat作為輸入格式,那么即使文件特別小,也是作為一個單獨的InputSplit來處理,而每一個InputSplit將會由一個獨立的Mapper Task來處理。在輸入數據是由大量小文件組成的情形下,就會有同樣大量的InputSplit,從而需要同樣大量的Mapper來處理,大量的Mapper Task創建銷毀開銷將是巨大的,甚至對集群來說,是災難性的!

CombineFileSplit是針對小文件的分片,它將一系列小文件封裝在一個InputSplit內,這樣一個Mapper就可以處理多個小文件。可以有效的降低進程開銷。與FileSplit類似,CombineFileSplit同樣包含文件路徑,分片起始位置,分片大小和分片數據所在的host列表四個屬性,只不過這些屬性不再是一個值,而是一個列表。

需要注意的一點是,CombineFileSplit的getLength()方法,返回的是這一系列數據的數據的總長度。

現在,我們已深入的了解了InputSplit的概念,看了其源碼,知道了其屬性。我們知道數據分片是在InputFormat中實現的,接下來,我們就深入InputFormat的一個子類,FileInputFormat看看分片是如何進行的。

4. FileInputFormat

FileInputFormat中,分片方法代碼及詳細注釋如下,就不再詳細解釋該方法:

public List<InputSplit> getSplits(JobContext job) throws IOException {
	// 首先計算分片的最大和最小值。這兩個值將會用來計算分片的大小。
	// 由源碼可知,這兩個值可以通過mapred.min.split.size和mapred.max.split.size來設置
	long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
	long maxSize = getMaxSplitSize(job);

	// splits鏈表用來存儲計算得到的輸入分片結果
	List<InputSplit> splits = new ArrayList<InputSplit>();
	// files鏈表存儲由listStatus()獲取的輸入文件列表,listStatus比較特殊,我們在下面詳細研究
	List<FileStatus> files = listStatus(job);
	for (FileStatus file : files) {
		Path path = file.getPath();
		FileSystem fs = path.getFileSystem(job.getConfiguration());
		long length = file.getLen();
		// 獲取該文件所有的block信息列表[hostname, offset, length]
		BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
				length);
		// 判斷文件是否可分割,通常是可分割的,但如果文件是壓縮的,將不可分割
		// 是否分割可以自行重寫FileInputFormat的isSplitable來控制
		if ((length != 0) && isSplitable(job, path)) {
			long blockSize = file.getBlockSize();
			// 計算分片大小
			// 即 Math.max(minSize, Math.min(maxSize, blockSize));
			// 也就是保證在minSize和maxSize之間,且如果minSize<=blockSize<=maxSize,則設為blockSize
			long splitSize = computeSplitSize(blockSize, minSize, maxSize);

			long bytesRemaining = length;
			// 循環分片。
			// 當剩余數據與分片大小比值大於Split_Slop時,繼續分片, 小於等於時,停止分片
			while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
				int blkIndex = getBlockIndex(blkLocations, length
						- bytesRemaining);
				splits.add(new FileSplit(path, length - bytesRemaining,
						splitSize, blkLocations[blkIndex].getHosts()));
				bytesRemaining -= splitSize;
			}
			// 處理余下的數據
			if (bytesRemaining != 0) {
				splits.add(new FileSplit(path, length - bytesRemaining,
						bytesRemaining,
						blkLocations[blkLocations.length - 1].getHosts()));
			}
		} else if (length != 0) {
			// 不可split,整塊返回
			splits.add(new FileSplit(path, 0, length, blkLocations[0]
					.getHosts()));
		} else {
			// 對於長度為0的文件,創建空Hosts列表,返回
			splits.add(new FileSplit(path, 0, length, new String[0]));
		}
	}

	// 設置輸入文件數量
	job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
	return splits;
}

在getSplits()方法中,我們提到了一個方法,listStatus(),我們先來看一下這個方法:

protected List<FileStatus> listStatus(JobContext job) throws IOException {

	// 省略部分代碼...

	List<PathFilter> filters = new ArrayList<PathFilter>();
	filters.add(hiddenFileFilter);
	PathFilter jobFilter = getInputPathFilter(job);
	if (jobFilter != null) {
		filters.add(jobFilter);
	}
	// 創建了一個MultiPathFilter,其內部包含了兩個PathFilter
	// 一個為過濾隱藏文件的Filter,一個為用戶自定義Filter(如果制定了)
	PathFilter inputFilter = new MultiPathFilter(filters);

	for (int i = 0; i < dirs.length; ++i) {
		Path p = dirs[i];
		FileSystem fs = p.getFileSystem(job.getConfiguration());
		FileStatus[] matches = fs.globStatus(p, inputFilter);
		if (matches == null) {
			errors.add(new IOException("Input path does not exist: " + p));
		} else if (matches.length == 0) {
			errors.add(new IOException("Input Pattern " + p
					+ " matches 0 files"));
		} else {
			for (FileStatus globStat : matches) {
				if (globStat.isDir()) {
					for (FileStatus stat : fs.listStatus(
							globStat.getPath(), inputFilter)) {
						result.add(stat);
					}
				} else {
					result.add(globStat);
				}
			}
		}
	}

	// 省略部分代碼
}
NLineInputFormat是一個很有意思的FileInputFormat的子類,有時間可以了解一下。

 5. PathFilter

PathFilter文件篩選器接口,使用它我們可以控制哪些文件要作為輸入,哪些不作為輸入。PathFilter有一個accept(Path)方法,當接收的Path要被包含進來,就返回true,否則返回false。可以通過設置mapred.input.pathFilter.class來設置用戶自定義的PathFilter。

public interface PathFilter {
  /**
   * Tests whether or not the specified abstract pathname should be
   * included in a pathname list.
   *
   * @param  path  The abstract pathname to be tested
   * @return  <code>true</code> if and only if <code>pathname</code>
   *          should be included
   */
  boolean accept(Path path);
}

FileInputFormat類有hiddenFileFilter屬性:

private static final PathFilter hiddenFileFilter = new PathFilter() {
	public boolean accept(Path p) {
		String name = p.getName();
		return !name.startsWith("_") && !name.startsWith(".");
	}
};

hiddenFileFilter過濾掉隱藏文件。

FileInputFormat類還有一個內部類:

private static class MultiPathFilter implements PathFilter {
	private List<PathFilter> filters;

	public MultiPathFilter(List<PathFilter> filters) {
		this.filters = filters;
	}

	public boolean accept(Path path) {
		for (PathFilter filter : filters) {
			if (!filter.accept(path)) {
				return false;
			}
		}
		return true;
	}
}

MultiPathFilter類類似於一個PathFilter代理,其內部有一個PathFilter list屬性,只有符合其內部所有filter的路徑,才被作為輸入。在FileInputFormat類中,它被listStatus()方法調用,而listStatus()又被getSplits()方法調用來獲取輸入文件,也即實現了在獲取輸入分片前進行文件過濾。

至此,我們已經利用PathFilter過濾了文件,利用FileInputFormat 的getSplits方法,計算出了Mapreduce的Map的InputSplit。作業的輸入分片有了,而這些分片,是怎么被Map讀取的呢?

這由InputFormat中的另一個方法createRecordReader()來負責。FileInputFormat沒有對於這個方法的實現,而是交給子類自行去實現它。

 6. RecordReader

RecordReader將讀入到Map的數據拆分成<key, value>對。RecordReader也是一個抽象類,下面我們通過源碼看一下,RecordReader主要做哪些工作:

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {

	/**
	 * 由一個InputSplit初始化
	 */
	public abstract void initialize(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException;

	/**
	 * 顧名思義,讀取分片下一個<key, value>對
	 */
	public abstract boolean nextKeyValue() throws IOException,
			InterruptedException;

	/**
	 * Get the current key
	 */
	public abstract KEYIN getCurrentKey() throws IOException,
			InterruptedException;

	/**
	 * Get the current value.
	 */
	public abstract VALUEIN getCurrentValue() throws IOException,
			InterruptedException;

	/**
	 * 跟蹤讀取分片的進度
	 */
	public abstract float getProgress() throws IOException,
			InterruptedException;

	/**
	 * Close the record reader.
	 */
	public abstract void close() throws IOException;
}

從源碼可以看出,一個RecordReader主要來完成這幾項功能。接下來,通過一個具體的RecordReader實現類,來詳細了解一下各功能的具體操作。

public class LineRecordReader extends RecordReader<LongWritable, Text> {
	private CompressionCodecFactory compressionCodecs = null;
	private long start;
	private long pos;
	private long end;
	private LineReader in;
	private int maxLineLength;
	private LongWritable key = null;
	private Text value = null;

	// initialize函數即對LineRecordReader的一個初始化
	// 主要是計算分片的始末位置,打開輸入流以供讀取K-V對,處理分片經過壓縮的情況等
	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException {
		FileSplit split = (FileSplit) genericSplit;
		Configuration job = context.getConfiguration();
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
				Integer.MAX_VALUE);
		start = split.getStart();
		end = start + split.getLength();
		final Path file = split.getPath();
		compressionCodecs = new CompressionCodecFactory(job);
		final CompressionCodec codec = compressionCodecs.getCodec(file);

		// 打開文件,並定位到分片讀取的起始位置
		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		boolean skipFirstLine = false;
		if (codec != null) {
			// 文件是壓縮文件的話,直接打開文件
			in = new LineReader(codec.createInputStream(fileIn), job);
			end = Long.MAX_VALUE;
		} else {
			//
			if (start != 0) {
				skipFirstLine = true;
				--start;
				// 定位到偏移位置,下次讀取就會從便宜位置開始
				fileIn.seek(start);
			}
			in = new LineReader(fileIn, job);
		}
		if (skipFirstLine) { // skip first line and re-establish "start".
			start += in.readLine(new Text(), 0,
					(int) Math.min((long) Integer.MAX_VALUE, end - start));
		}
		this.pos = start;
	}

	public boolean nextKeyValue() throws IOException {
		if (key == null) {
			key = new LongWritable();
		}
		key.set(pos);// key即為偏移量
		if (value == null) {
			value = new Text();
		}
		int newSize = 0;
		while (pos < end) {
			newSize = in.readLine(value, maxLineLength,
					Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
							maxLineLength));
			// 讀取的數據長度為0,則說明已讀完
			if (newSize == 0) {
				break;
			}
			pos += newSize;
			// 讀取的數據長度小於最大行長度,也說明已讀取完畢
			if (newSize < maxLineLength) {
				break;
			}
			// 執行到此處,說明該行數據沒讀完,繼續讀入
		}
		if (newSize == 0) {
			key = null;
			value = null;
			return false;
		} else {
			return true;
		}
	}
	// 省略了部分方法
}

數據從InputSplit分片中讀出已經解決,但是RecordReader是如何被Mapreduce框架利用的呢?我們先看一下Mapper類

 7. Mapper

 

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

	public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
		public Context(Configuration conf, TaskAttemptID taskid,
				RecordReader<KEYIN, VALUEIN> reader,
				RecordWriter<KEYOUT, VALUEOUT> writer,
				OutputCommitter committer, StatusReporter reporter,
				InputSplit split) throws IOException, InterruptedException {
			super(conf, taskid, reader, writer, committer, reporter, split);
		}
	}

	/**
	 * 預處理,僅在map task啟動時運行一次
	 */
	protected void setup(Context context) throws IOException,
			InterruptedException {
	}

	/**
	 * 對於InputSplit中的每一對<key, value>都會運行一次
	 */
	@SuppressWarnings("unchecked")
	protected void map(KEYIN key, VALUEIN value, Context context)
			throws IOException, InterruptedException {
		context.write((KEYOUT) key, (VALUEOUT) value);
	}

	/**
	 * 掃尾工作,比如關閉流等
	 */
	protected void cleanup(Context context) throws IOException,
			InterruptedException {
	}

	/**
	 * map task的驅動器
	 */
	public void run(Context context) throws IOException, InterruptedException {
		setup(context);
		while (context.nextKeyValue()) {
			map(context.getCurrentKey(), context.getCurrentValue(), context);
		}
		cleanup(context);
	}
}

 

重點看一下Mapper.class中的run()方法,它相當於map task的驅動。

  • run()方法首先調用setup()進行初始操作
  • 然后循環對每個從context.nextKeyValue()獲取的“K-V對”調用map()函數進行處理
  • 最后調用cleanup()做最后的處理

事實上,content.nextKeyValue()就是使用了相應的RecordReader來獲取“K-V對”。Mapper.class中的Context類,它繼承自MapContext類,使用一個RecordReader進行構造。下面我們再看這個MapContext。

public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
		TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
	private RecordReader<KEYIN, VALUEIN> reader;
	private InputSplit split;

	public MapContext(Configuration conf, TaskAttemptID taskid,
			RecordReader<KEYIN, VALUEIN> reader,
			RecordWriter<KEYOUT, VALUEOUT> writer, OutputCommitter committer,
			StatusReporter reporter, InputSplit split) {
		super(conf, taskid, writer, committer, reporter);
		this.reader = reader;
		this.split = split;
	}

	/**
	 * Get the input split for this map.
	 */
	public InputSplit getInputSplit() {
		return split;
	}

	@Override
	public KEYIN getCurrentKey() throws IOException, InterruptedException {
		return reader.getCurrentKey();
	}

	@Override
	public VALUEIN getCurrentValue() throws IOException, InterruptedException {
		return reader.getCurrentValue();
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		return reader.nextKeyValue();
	}

}

從MapContent類中的方法可見,content.getCurrentKey(),content.getCurrentValue()以及nextKeyValue(),其實都是對RecordReader方法的封裝,即MapContext是直接使用傳入的RecordReader來對InputSplit進行“K-V對”讀取的。

至此,我們已經清楚的知道Mapreduce的輸入文件是如何被過濾、讀取、分片、讀出“K-V對”,然后交給Mapper類來處理的。

 

原創作品,允許轉載,轉載時請務必以超鏈接形式標明文章 原始出處 、作者信息和 本聲明。否則將追究法律責任。http://shitouer.cn/2013/02/hadoop-source-code-analyse-mapreduce-inputformat/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM