個人小站,正在持續整理中,歡迎訪問:http://shitouer.cn
小站博文地址:[Hadoop源碼詳解]之一MapReduce篇之InputFormat
1. 概述
我們在設置MapReduce輸入格式的時候,會調用這樣一條語句:
job.setInputFormatClass(KeyValueTextInputFormat.class);
這條語句保證了輸入文件會按照我們預設的格式被讀取。KeyValueTextInputFormat即為我們設定的數據讀取格式。
所有的輸入格式類都繼承自InputFormat,這是一個抽象類。其子類有例如專門用於讀取普通文件的FileInputFormat,還有用來讀取數據庫的DBInputFormat等等。相關類圖簡單畫出如下(推薦新標簽中打開圖片查看):
2. InputFormat
從InputFormat類圖看,InputFormat抽象類僅有兩個抽象方法:
- List<InputSplit> getSplits(), 獲取由輸入文件計算出輸入分片(InputSplit),解決數據或文件分割成片問題。
- RecordReader<K,V> createRecordReader(),創建RecordReader,從InputSplit中讀取數據,解決讀取分片中數據問題。
在后面說到InputSplits的時候,會介紹在getSplits()時需要驗證輸入文件是否可分割、文件存儲時分塊的大小和文件大小等因素,所以總體來說,通過InputFormat,Mapreduce框架可以做到:
- 驗證作業輸入的正確性
- 將輸入文件切割成邏輯分片(InputSplit),一個InputSplit將會被分配給一個獨立的MapTask
- 提供RecordReader實現,讀取InputSplit中的“K-V對”供Mapper使用
InputFormat抽象類源碼也很簡單,如下供參考(文章格式考慮,刪除了部分注釋,添加了部分中文注釋):
public abstract class InputFormat<K, V> { /** * 僅僅是邏輯分片,並沒有物理分片,所以每一個分片類似於這樣一個元組 <input-file-path, start, offset> */ public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; /** * Create a record reader for a given split. */ public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; }
不同的InputFormat會各自實現不同的文件讀取方式以及分片方式,每個輸入分片會被單獨的map task作為數據源。下面詳細介紹輸入分片(inputSplit)是什么。
3.InputSplit
Mappers的輸入是一個一個的輸入分片,稱InputSplit。看源碼可知,InputSplit也是一個抽象類,它在邏輯上包含了提供給處理這個InputSplit的Mapper的所有K-V對。
public abstract class InputSplit { /** * 獲取Split的大小,支持根據size對InputSplit排序. */ public abstract long getLength() throws IOException, InterruptedException; /** * 獲取存儲該分片的數據所在的節點位置. */ public abstract String[] getLocations() throws IOException, InterruptedException; }
下面深入看一個InputSplit的子類:FileSplit類
public class FileSplit extends InputSplit implements Writable { private Path file; private long start; private long length; private String[] hosts; /** * Constructs a split with host information * * @param file * the file name * @param start * the position of the first byte in the file to process * @param length * the number of bytes in the file to process * @param hosts * the list of hosts containing the block, possibly null */ public FileSplit(Path file, long start, long length, String[] hosts) { this.file = file; this.start = start; this.length = length; this.hosts = hosts; } /** The number of bytes in the file to process. */ @Override public long getLength() { return length; } @Override public String[] getLocations() throws IOException { if (this.hosts == null) { return new String[] {}; } else { return this.hosts; } } // 略掉部分方法 }
從源碼中可以看出,FileSplit有四個屬性:文件路徑,分片起始位置,分片長度和存儲分片的hosts。用這四項數據,就可以計算出提供給每個Mapper的分片數據。在InputFormat的getSplit()方法中構造分片,分片的四個屬性會通過調用FileSplit的Constructor設置。
再看一個InputSplit的子類:CombineFileSplit。源碼如下:
public class CombineFileSplit extends InputSplit implements Writable { private Path[] paths; private long[] startoffset; private long[] lengths; private String[] locations; private long totLength; public CombineFileSplit(Path[] files, long[] start, long[] lengths, String[] locations) { initSplit(files, start, lengths, locations); } private void initSplit(Path[] files, long[] start, long[] lengths, String[] locations) { this.startoffset = start; this.lengths = lengths; this.paths = files; this.totLength = 0; this.locations = locations; for (long length : lengths) { totLength += length; } } public long getLength() { return totLength; } /** Returns all the Paths where this input-split resides */ public String[] getLocations() throws IOException { return locations; } //省略了部分構造函數和方法,深入學習請閱讀源文件 }
為什么介紹該類呢,因為接下來要學習《Hadoop學習(五) – 小文件處理》,深入理解該類,將有助於該節學習。
上面我們介紹的FileSplit對應的是一個輸入文件,也就是說,如果用FileSplit對應的FileInputFormat作為輸入格式,那么即使文件特別小,也是作為一個單獨的InputSplit來處理,而每一個InputSplit將會由一個獨立的Mapper Task來處理。在輸入數據是由大量小文件組成的情形下,就會有同樣大量的InputSplit,從而需要同樣大量的Mapper來處理,大量的Mapper Task創建銷毀開銷將是巨大的,甚至對集群來說,是災難性的!
CombineFileSplit是針對小文件的分片,它將一系列小文件封裝在一個InputSplit內,這樣一個Mapper就可以處理多個小文件。可以有效的降低進程開銷。與FileSplit類似,CombineFileSplit同樣包含文件路徑,分片起始位置,分片大小和分片數據所在的host列表四個屬性,只不過這些屬性不再是一個值,而是一個列表。
需要注意的一點是,CombineFileSplit的getLength()方法,返回的是這一系列數據的數據的總長度。
現在,我們已深入的了解了InputSplit的概念,看了其源碼,知道了其屬性。我們知道數據分片是在InputFormat中實現的,接下來,我們就深入InputFormat的一個子類,FileInputFormat看看分片是如何進行的。
4. FileInputFormat
FileInputFormat中,分片方法代碼及詳細注釋如下,就不再詳細解釋該方法:
public List<InputSplit> getSplits(JobContext job) throws IOException { // 首先計算分片的最大和最小值。這兩個值將會用來計算分片的大小。 // 由源碼可知,這兩個值可以通過mapred.min.split.size和mapred.max.split.size來設置 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // splits鏈表用來存儲計算得到的輸入分片結果 List<InputSplit> splits = new ArrayList<InputSplit>(); // files鏈表存儲由listStatus()獲取的輸入文件列表,listStatus比較特殊,我們在下面詳細研究 List<FileStatus> files = listStatus(job); for (FileStatus file : files) { Path path = file.getPath(); FileSystem fs = path.getFileSystem(job.getConfiguration()); long length = file.getLen(); // 獲取該文件所有的block信息列表[hostname, offset, length] BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); // 判斷文件是否可分割,通常是可分割的,但如果文件是壓縮的,將不可分割 // 是否分割可以自行重寫FileInputFormat的isSplitable來控制 if ((length != 0) && isSplitable(job, path)) { long blockSize = file.getBlockSize(); // 計算分片大小 // 即 Math.max(minSize, Math.min(maxSize, blockSize)); // 也就是保證在minSize和maxSize之間,且如果minSize<=blockSize<=maxSize,則設為blockSize long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; // 循環分片。 // 當剩余數據與分片大小比值大於Split_Slop時,繼續分片, 小於等於時,停止分片 while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); splits.add(new FileSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } // 處理余下的數據 if (bytesRemaining != 0) { splits.add(new FileSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkLocations.length - 1].getHosts())); } } else if (length != 0) { // 不可split,整塊返回 splits.add(new FileSplit(path, 0, length, blkLocations[0] .getHosts())); } else { // 對於長度為0的文件,創建空Hosts列表,返回 splits.add(new FileSplit(path, 0, length, new String[0])); } } // 設置輸入文件數量 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); return splits; }
在getSplits()方法中,我們提到了一個方法,listStatus(),我們先來看一下這個方法:
protected List<FileStatus> listStatus(JobContext job) throws IOException { // 省略部分代碼... List<PathFilter> filters = new ArrayList<PathFilter>(); filters.add(hiddenFileFilter); PathFilter jobFilter = getInputPathFilter(job); if (jobFilter != null) { filters.add(jobFilter); } // 創建了一個MultiPathFilter,其內部包含了兩個PathFilter // 一個為過濾隱藏文件的Filter,一個為用戶自定義Filter(如果制定了) PathFilter inputFilter = new MultiPathFilter(filters); for (int i = 0; i < dirs.length; ++i) { Path p = dirs[i]; FileSystem fs = p.getFileSystem(job.getConfiguration()); FileStatus[] matches = fs.globStatus(p, inputFilter); if (matches == null) { errors.add(new IOException("Input path does not exist: " + p)); } else if (matches.length == 0) { errors.add(new IOException("Input Pattern " + p + " matches 0 files")); } else { for (FileStatus globStat : matches) { if (globStat.isDir()) { for (FileStatus stat : fs.listStatus( globStat.getPath(), inputFilter)) { result.add(stat); } } else { result.add(globStat); } } } } // 省略部分代碼 } NLineInputFormat是一個很有意思的FileInputFormat的子類,有時間可以了解一下。
5. PathFilter
PathFilter文件篩選器接口,使用它我們可以控制哪些文件要作為輸入,哪些不作為輸入。PathFilter有一個accept(Path)方法,當接收的Path要被包含進來,就返回true,否則返回false。可以通過設置mapred.input.pathFilter.class來設置用戶自定義的PathFilter。
public interface PathFilter { /** * Tests whether or not the specified abstract pathname should be * included in a pathname list. * * @param path The abstract pathname to be tested * @return <code>true</code> if and only if <code>pathname</code> * should be included */ boolean accept(Path path); }
FileInputFormat類有hiddenFileFilter屬性:
private static final PathFilter hiddenFileFilter = new PathFilter() { public boolean accept(Path p) { String name = p.getName(); return !name.startsWith("_") && !name.startsWith("."); } };
hiddenFileFilter過濾掉隱藏文件。
FileInputFormat類還有一個內部類:
private static class MultiPathFilter implements PathFilter { private List<PathFilter> filters; public MultiPathFilter(List<PathFilter> filters) { this.filters = filters; } public boolean accept(Path path) { for (PathFilter filter : filters) { if (!filter.accept(path)) { return false; } } return true; } }
MultiPathFilter類類似於一個PathFilter代理,其內部有一個PathFilter list屬性,只有符合其內部所有filter的路徑,才被作為輸入。在FileInputFormat類中,它被listStatus()方法調用,而listStatus()又被getSplits()方法調用來獲取輸入文件,也即實現了在獲取輸入分片前進行文件過濾。
至此,我們已經利用PathFilter過濾了文件,利用FileInputFormat 的getSplits方法,計算出了Mapreduce的Map的InputSplit。作業的輸入分片有了,而這些分片,是怎么被Map讀取的呢?
這由InputFormat中的另一個方法createRecordReader()來負責。FileInputFormat沒有對於這個方法的實現,而是交給子類自行去實現它。
6. RecordReader
RecordReader將讀入到Map的數據拆分成<key, value>對。RecordReader也是一個抽象類,下面我們通過源碼看一下,RecordReader主要做哪些工作:
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable { /** * 由一個InputSplit初始化 */ public abstract void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; /** * 顧名思義,讀取分片下一個<key, value>對 */ public abstract boolean nextKeyValue() throws IOException, InterruptedException; /** * Get the current key */ public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; /** * Get the current value. */ public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; /** * 跟蹤讀取分片的進度 */ public abstract float getProgress() throws IOException, InterruptedException; /** * Close the record reader. */ public abstract void close() throws IOException; }
從源碼可以看出,一個RecordReader主要來完成這幾項功能。接下來,通過一個具體的RecordReader實現類,來詳細了解一下各功能的具體操作。
public class LineRecordReader extends RecordReader<LongWritable, Text> { private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; // initialize函數即對LineRecordReader的一個初始化 // 主要是計算分片的始末位置,打開輸入流以供讀取K-V對,處理分片經過壓縮的情況等 public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // 打開文件,並定位到分片讀取的起始位置 FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); boolean skipFirstLine = false; if (codec != null) { // 文件是壓縮文件的話,直接打開文件 in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { // if (start != 0) { skipFirstLine = true; --start; // 定位到偏移位置,下次讀取就會從便宜位置開始 fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos);// key即為偏移量 if (value == null) { value = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); // 讀取的數據長度為0,則說明已讀完 if (newSize == 0) { break; } pos += newSize; // 讀取的數據長度小於最大行長度,也說明已讀取完畢 if (newSize < maxLineLength) { break; } // 執行到此處,說明該行數據沒讀完,繼續讀入 } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } // 省略了部分方法 }
數據從InputSplit分片中讀出已經解決,但是RecordReader是如何被Mapreduce框架利用的呢?我們先看一下Mapper類
7. Mapper
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN, VALUEIN> reader, RecordWriter<KEYOUT, VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) throws IOException, InterruptedException { super(conf, taskid, reader, writer, committer, reporter, split); } } /** * 預處理,僅在map task啟動時運行一次 */ protected void setup(Context context) throws IOException, InterruptedException { } /** * 對於InputSplit中的每一對<key, value>都會運行一次 */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } /** * 掃尾工作,比如關閉流等 */ protected void cleanup(Context context) throws IOException, InterruptedException { } /** * map task的驅動器 */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); } }
重點看一下Mapper.class中的run()方法,它相當於map task的驅動。
- run()方法首先調用setup()進行初始操作
- 然后循環對每個從context.nextKeyValue()獲取的“K-V對”調用map()函數進行處理
- 最后調用cleanup()做最后的處理
事實上,content.nextKeyValue()就是使用了相應的RecordReader來獲取“K-V對”。Mapper.class中的Context類,它繼承自MapContext類,使用一個RecordReader進行構造。下面我們再看這個MapContext。
public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { private RecordReader<KEYIN, VALUEIN> reader; private InputSplit split; public MapContext(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN, VALUEIN> reader, RecordWriter<KEYOUT, VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { super(conf, taskid, writer, committer, reporter); this.reader = reader; this.split = split; } /** * Get the input split for this map. */ public InputSplit getInputSplit() { return split; } @Override public KEYIN getCurrentKey() throws IOException, InterruptedException { return reader.getCurrentKey(); } @Override public VALUEIN getCurrentValue() throws IOException, InterruptedException { return reader.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); } }
從MapContent類中的方法可見,content.getCurrentKey(),content.getCurrentValue()以及nextKeyValue(),其實都是對RecordReader方法的封裝,即MapContext是直接使用傳入的RecordReader來對InputSplit進行“K-V對”讀取的。
至此,我們已經清楚的知道Mapreduce的輸入文件是如何被過濾、讀取、分片、讀出“K-V對”,然后交給Mapper類來處理的。