個人小站,正在持續整理中,歡迎訪問:http://shitouer.cn
小站博文地址:[Hadoop源碼詳解]之一MapReduce篇之InputFormat
1. 概述
我們在設置MapReduce輸入格式的時候,會調用這樣一條語句:
job.setInputFormatClass(KeyValueTextInputFormat.class);
這條語句保證了輸入文件會按照我們預設的格式被讀取。KeyValueTextInputFormat即為我們設定的數據讀取格式。
所有的輸入格式類都繼承自InputFormat,這是一個抽象類。其子類有例如專門用於讀取普通文件的FileInputFormat,還有用來讀取數據庫的DBInputFormat等等。相關類圖簡單畫出如下(推薦新標簽中打開圖片查看):

2. InputFormat
從InputFormat類圖看,InputFormat抽象類僅有兩個抽象方法:
- List<InputSplit> getSplits(), 獲取由輸入文件計算出輸入分片(InputSplit),解決數據或文件分割成片問題。
- RecordReader<K,V> createRecordReader(),創建RecordReader,從InputSplit中讀取數據,解決讀取分片中數據問題。
在后面說到InputSplits的時候,會介紹在getSplits()時需要驗證輸入文件是否可分割、文件存儲時分塊的大小和文件大小等因素,所以總體來說,通過InputFormat,Mapreduce框架可以做到:
- 驗證作業輸入的正確性
- 將輸入文件切割成邏輯分片(InputSplit),一個InputSplit將會被分配給一個獨立的MapTask
- 提供RecordReader實現,讀取InputSplit中的“K-V對”供Mapper使用
InputFormat抽象類源碼也很簡單,如下供參考(文章格式考慮,刪除了部分注釋,添加了部分中文注釋):
public abstract class InputFormat<K, V> {
/**
* 僅僅是邏輯分片,並沒有物理分片,所以每一個分片類似於這樣一個元組 <input-file-path, start, offset>
*/
public abstract List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException;
/**
* Create a record reader for a given split.
*/
public abstract RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException;
}
不同的InputFormat會各自實現不同的文件讀取方式以及分片方式,每個輸入分片會被單獨的map task作為數據源。下面詳細介紹輸入分片(inputSplit)是什么。
3.InputSplit
Mappers的輸入是一個一個的輸入分片,稱InputSplit。看源碼可知,InputSplit也是一個抽象類,它在邏輯上包含了提供給處理這個InputSplit的Mapper的所有K-V對。
public abstract class InputSplit {
/**
* 獲取Split的大小,支持根據size對InputSplit排序.
*/
public abstract long getLength() throws IOException, InterruptedException;
/**
* 獲取存儲該分片的數據所在的節點位置.
*/
public abstract
String[] getLocations() throws IOException, InterruptedException;
}
下面深入看一個InputSplit的子類:FileSplit類
public class FileSplit extends InputSplit implements Writable {
private Path file;
private long start;
private long length;
private String[] hosts;
/**
* Constructs a split with host information
*
* @param file
* the file name
* @param start
* the position of the first byte in the file to process
* @param length
* the number of bytes in the file to process
* @param hosts
* the list of hosts containing the block, possibly null
*/
public FileSplit(Path file, long start, long length, String[] hosts) {
this.file = file;
this.start = start;
this.length = length;
this.hosts = hosts;
}
/** The number of bytes in the file to process. */
@Override
public long getLength() {
return length;
}
@Override
public String[] getLocations() throws IOException {
if (this.hosts == null) {
return new String[] {};
} else {
return this.hosts;
}
}
// 略掉部分方法
}
從源碼中可以看出,FileSplit有四個屬性:文件路徑,分片起始位置,分片長度和存儲分片的hosts。用這四項數據,就可以計算出提供給每個Mapper的分片數據。在InputFormat的getSplit()方法中構造分片,分片的四個屬性會通過調用FileSplit的Constructor設置。
再看一個InputSplit的子類:CombineFileSplit。源碼如下:
public class CombineFileSplit extends InputSplit implements Writable {
private Path[] paths;
private long[] startoffset;
private long[] lengths;
private String[] locations;
private long totLength;
public CombineFileSplit(Path[] files, long[] start, long[] lengths,
String[] locations) {
initSplit(files, start, lengths, locations);
}
private void initSplit(Path[] files, long[] start, long[] lengths,
String[] locations) {
this.startoffset = start;
this.lengths = lengths;
this.paths = files;
this.totLength = 0;
this.locations = locations;
for (long length : lengths) {
totLength += length;
}
}
public long getLength() {
return totLength;
}
/** Returns all the Paths where this input-split resides */
public String[] getLocations() throws IOException {
return locations;
}
//省略了部分構造函數和方法,深入學習請閱讀源文件
}
為什么介紹該類呢,因為接下來要學習《Hadoop學習(五) – 小文件處理》,深入理解該類,將有助於該節學習。
上面我們介紹的FileSplit對應的是一個輸入文件,也就是說,如果用FileSplit對應的FileInputFormat作為輸入格式,那么即使文件特別小,也是作為一個單獨的InputSplit來處理,而每一個InputSplit將會由一個獨立的Mapper Task來處理。在輸入數據是由大量小文件組成的情形下,就會有同樣大量的InputSplit,從而需要同樣大量的Mapper來處理,大量的Mapper Task創建銷毀開銷將是巨大的,甚至對集群來說,是災難性的!
CombineFileSplit是針對小文件的分片,它將一系列小文件封裝在一個InputSplit內,這樣一個Mapper就可以處理多個小文件。可以有效的降低進程開銷。與FileSplit類似,CombineFileSplit同樣包含文件路徑,分片起始位置,分片大小和分片數據所在的host列表四個屬性,只不過這些屬性不再是一個值,而是一個列表。
需要注意的一點是,CombineFileSplit的getLength()方法,返回的是這一系列數據的數據的總長度。
現在,我們已深入的了解了InputSplit的概念,看了其源碼,知道了其屬性。我們知道數據分片是在InputFormat中實現的,接下來,我們就深入InputFormat的一個子類,FileInputFormat看看分片是如何進行的。
4. FileInputFormat
FileInputFormat中,分片方法代碼及詳細注釋如下,就不再詳細解釋該方法:
public List<InputSplit> getSplits(JobContext job) throws IOException {
// 首先計算分片的最大和最小值。這兩個值將會用來計算分片的大小。
// 由源碼可知,這兩個值可以通過mapred.min.split.size和mapred.max.split.size來設置
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// splits鏈表用來存儲計算得到的輸入分片結果
List<InputSplit> splits = new ArrayList<InputSplit>();
// files鏈表存儲由listStatus()獲取的輸入文件列表,listStatus比較特殊,我們在下面詳細研究
List<FileStatus> files = listStatus(job);
for (FileStatus file : files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
// 獲取該文件所有的block信息列表[hostname, offset, length]
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
// 判斷文件是否可分割,通常是可分割的,但如果文件是壓縮的,將不可分割
// 是否分割可以自行重寫FileInputFormat的isSplitable來控制
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
// 計算分片大小
// 即 Math.max(minSize, Math.min(maxSize, blockSize));
// 也就是保證在minSize和maxSize之間,且如果minSize<=blockSize<=maxSize,則設為blockSize
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
// 循環分片。
// 當剩余數據與分片大小比值大於Split_Slop時,繼續分片, 小於等於時,停止分片
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);
splits.add(new FileSplit(path, length - bytesRemaining,
splitSize, blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
// 處理余下的數據
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length - bytesRemaining,
bytesRemaining,
blkLocations[blkLocations.length - 1].getHosts()));
}
} else if (length != 0) {
// 不可split,整塊返回
splits.add(new FileSplit(path, 0, length, blkLocations[0]
.getHosts()));
} else {
// 對於長度為0的文件,創建空Hosts列表,返回
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
// 設置輸入文件數量
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
return splits;
}
在getSplits()方法中,我們提到了一個方法,listStatus(),我們先來看一下這個方法:
protected List<FileStatus> listStatus(JobContext job) throws IOException {
// 省略部分代碼...
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
// 創建了一個MultiPathFilter,其內部包含了兩個PathFilter
// 一個為過濾隱藏文件的Filter,一個為用戶自定義Filter(如果制定了)
PathFilter inputFilter = new MultiPathFilter(filters);
for (int i = 0; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
} else if (matches.length == 0) {
errors.add(new IOException("Input Pattern " + p
+ " matches 0 files"));
} else {
for (FileStatus globStat : matches) {
if (globStat.isDir()) {
for (FileStatus stat : fs.listStatus(
globStat.getPath(), inputFilter)) {
result.add(stat);
}
} else {
result.add(globStat);
}
}
}
}
// 省略部分代碼
}
NLineInputFormat是一個很有意思的FileInputFormat的子類,有時間可以了解一下。
5. PathFilter
PathFilter文件篩選器接口,使用它我們可以控制哪些文件要作為輸入,哪些不作為輸入。PathFilter有一個accept(Path)方法,當接收的Path要被包含進來,就返回true,否則返回false。可以通過設置mapred.input.pathFilter.class來設置用戶自定義的PathFilter。
public interface PathFilter {
/**
* Tests whether or not the specified abstract pathname should be
* included in a pathname list.
*
* @param path The abstract pathname to be tested
* @return <code>true</code> if and only if <code>pathname</code>
* should be included
*/
boolean accept(Path path);
}
FileInputFormat類有hiddenFileFilter屬性:
private static final PathFilter hiddenFileFilter = new PathFilter() {
public boolean accept(Path p) {
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
};
hiddenFileFilter過濾掉隱藏文件。
FileInputFormat類還有一個內部類:
private static class MultiPathFilter implements PathFilter {
private List<PathFilter> filters;
public MultiPathFilter(List<PathFilter> filters) {
this.filters = filters;
}
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (!filter.accept(path)) {
return false;
}
}
return true;
}
}
MultiPathFilter類類似於一個PathFilter代理,其內部有一個PathFilter list屬性,只有符合其內部所有filter的路徑,才被作為輸入。在FileInputFormat類中,它被listStatus()方法調用,而listStatus()又被getSplits()方法調用來獲取輸入文件,也即實現了在獲取輸入分片前進行文件過濾。
至此,我們已經利用PathFilter過濾了文件,利用FileInputFormat 的getSplits方法,計算出了Mapreduce的Map的InputSplit。作業的輸入分片有了,而這些分片,是怎么被Map讀取的呢?
這由InputFormat中的另一個方法createRecordReader()來負責。FileInputFormat沒有對於這個方法的實現,而是交給子類自行去實現它。
6. RecordReader
RecordReader將讀入到Map的數據拆分成<key, value>對。RecordReader也是一個抽象類,下面我們通過源碼看一下,RecordReader主要做哪些工作:
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
/**
* 由一個InputSplit初始化
*/
public abstract void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException;
/**
* 顧名思義,讀取分片下一個<key, value>對
*/
public abstract boolean nextKeyValue() throws IOException,
InterruptedException;
/**
* Get the current key
*/
public abstract KEYIN getCurrentKey() throws IOException,
InterruptedException;
/**
* Get the current value.
*/
public abstract VALUEIN getCurrentValue() throws IOException,
InterruptedException;
/**
* 跟蹤讀取分片的進度
*/
public abstract float getProgress() throws IOException,
InterruptedException;
/**
* Close the record reader.
*/
public abstract void close() throws IOException;
}
從源碼可以看出,一個RecordReader主要來完成這幾項功能。接下來,通過一個具體的RecordReader實現類,來詳細了解一下各功能的具體操作。
public class LineRecordReader extends RecordReader<LongWritable, Text> {
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;
// initialize函數即對LineRecordReader的一個初始化
// 主要是計算分片的始末位置,打開輸入流以供讀取K-V對,處理分片經過壓縮的情況等
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
// 打開文件,並定位到分片讀取的起始位置
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
// 文件是壓縮文件的話,直接打開文件
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
//
if (start != 0) {
skipFirstLine = true;
--start;
// 定位到偏移位置,下次讀取就會從便宜位置開始
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int) Math.min((long) Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);// key即為偏移量
if (value == null) {
value = new Text();
}
int newSize = 0;
while (pos < end) {
newSize = in.readLine(value, maxLineLength,
Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
maxLineLength));
// 讀取的數據長度為0,則說明已讀完
if (newSize == 0) {
break;
}
pos += newSize;
// 讀取的數據長度小於最大行長度,也說明已讀取完畢
if (newSize < maxLineLength) {
break;
}
// 執行到此處,說明該行數據沒讀完,繼續讀入
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
// 省略了部分方法
}
數據從InputSplit分片中讀出已經解決,但是RecordReader是如何被Mapreduce框架利用的呢?我們先看一下Mapper類
7. Mapper
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public Context(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN, VALUEIN> reader,
RecordWriter<KEYOUT, VALUEOUT> writer,
OutputCommitter committer, StatusReporter reporter,
InputSplit split) throws IOException, InterruptedException {
super(conf, taskid, reader, writer, committer, reporter, split);
}
}
/**
* 預處理,僅在map task啟動時運行一次
*/
protected void setup(Context context) throws IOException,
InterruptedException {
}
/**
* 對於InputSplit中的每一對<key, value>都會運行一次
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value, Context context)
throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
* 掃尾工作,比如關閉流等
*/
protected void cleanup(Context context) throws IOException,
InterruptedException {
}
/**
* map task的驅動器
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
}
重點看一下Mapper.class中的run()方法,它相當於map task的驅動。
- run()方法首先調用setup()進行初始操作
- 然后循環對每個從context.nextKeyValue()獲取的“K-V對”調用map()函數進行處理
- 最后調用cleanup()做最后的處理
事實上,content.nextKeyValue()就是使用了相應的RecordReader來獲取“K-V對”。Mapper.class中的Context類,它繼承自MapContext類,使用一個RecordReader進行構造。下面我們再看這個MapContext。
public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private RecordReader<KEYIN, VALUEIN> reader;
private InputSplit split;
public MapContext(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN, VALUEIN> reader,
RecordWriter<KEYOUT, VALUEOUT> writer, OutputCommitter committer,
StatusReporter reporter, InputSplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader = reader;
this.split = split;
}
/**
* Get the input split for this map.
*/
public InputSplit getInputSplit() {
return split;
}
@Override
public KEYIN getCurrentKey() throws IOException, InterruptedException {
return reader.getCurrentKey();
}
@Override
public VALUEIN getCurrentValue() throws IOException, InterruptedException {
return reader.getCurrentValue();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}
}
從MapContent類中的方法可見,content.getCurrentKey(),content.getCurrentValue()以及nextKeyValue(),其實都是對RecordReader方法的封裝,即MapContext是直接使用傳入的RecordReader來對InputSplit進行“K-V對”讀取的。
至此,我們已經清楚的知道Mapreduce的輸入文件是如何被過濾、讀取、分片、讀出“K-V對”,然后交給Mapper類來處理的。
