1. Mapper 與 Reducer 數量
對於一個默認的MapReduce Job 來說,map任務的數量等於輸入文件被划分成的分塊數,這個取決於輸入文件的大小以及文件塊的大小(如果此文件在 HDFS中)。但是對於 reduce的任務,並不會自動決定reducer數目的大小,若未指定,則默認為1。例如:
但單個reducer任務執行效率不盡人意,在實際場景中會將它設置為一個較大的數值。此時,決定Key條目被送往哪個reducer由方法 setPartitionerClass() 指定:job.setPartitionerClass(HashPartitioner.class);
默認為HashPartitioner,它會將每條Key做Hash,然后與最大的整型值做一次按位與操作,以得到一個非負整數。然后對分區數做取模(mod)操作,將key分配到其中一個分區。這里的分區數即為reducer數目。HashPartitioner 源碼如下:
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public HashPartitioner() {
}
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & 2147483647) % numReduceTasks;
}
}
若是為reducer數目設置為默認值1,則所有的中間數據都會被放入到一個reducer中,作業處理效率會非常低效。若是設置了過大的值,則每個reducer都會輸出一個文件,會導致過多的小文件。
在為一個任務選擇多少個reducer個數時,應遵循的原則為:目標reducer保持在每個運行5分鍾左右,且產生至少一個HDFS塊的輸出比較合適。
記錄在發送給 reducer 之前,會被MapReduce系統進行排序。因此輸入文件中的行會被交叉放入一個合並后的輸出文件。
2. 輸入格式
我們已經了解到map的輸入是分片(split),一個分片對應一個mapper,且僅被一個mapper處理。分片里面是多條記錄(item)。“輸入分片”在Hadoop中以InputSplit 接口的方式提供:
public interface InputSplit extends Writable {
long getLength() throws IOException;
String[] getLocations() throws IOException;
}
它包含兩個方法,分別為getLength() 與 getLocations()。其中getLength() 用於獲取數據的長度(以字節為單位);getLocations() 用於獲取一組存儲位置(也就是一組主機名)。其中getLocations()的返回值由mapreduce系統獲取后,實現data locality,也就是盡量將map任務放在離數據節點近的地方。而getLength() 的返回值用於排序分片,將最大的分片優先處理,以最小化整個作業運行的時間。
InputSplit(mapreduce中的分片)由InputFormat創建,它負責創建輸入分片,並將它們分成一條條記錄(item)。首先簡單看一下InputFormat 抽象類:
public abstract class InputFormat<K, V> {
public InputFormat() {
}
public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}
這里 getSplits() 方法計算分片,然后將計算得到的List 結果發給 application master。Application master 根據其分片所在節點信息,調度map任務到離分片數據最近的節點。在map任務端,會把輸入分片傳給 InputFormat 的 createRecordReader() 方法,此方法會返回一個 RecordReader 對象,用於迭代讀取這個分片上的記錄(item),並生成記錄的鍵值對,之后傳遞給 map函數。通過查看 Mapper 類中的 run() 方法,更好的了解此過程:
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
this.setup(context);
try {
while(context.nextKeyValue()) {
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
this.cleanup(context);
}
}
這里,先運行setup() 操作,然后從 context 不斷迭代獲取分片的內容,並傳給map() 方法,並由map() 方法進一步對 key-value 對進行處理。
3. FileInputFormat類
在Hadoop 中,數據源一般為文件,而 FileInputFormat 類就是用於處理數據源為文件的一個(繼承於)InputFormat 類:
public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
可以看到它是一個抽象類,它的實現類有CombineFileInputFormat、TextInputFormat、KeyValueTextInputFormat、NLineInputFormat以及SequenceFileInputFormat。
FileInputFormat類提供兩個功能:1. 指出作業的輸入文件位置;2. 為輸入文件生成分片的代碼實現。
在FileInputFormat中,作業的輸入可以是一個文件、一個目錄,也可以是目錄與文件的集合。它提供了四種方法用於設置Job的輸入路徑:
public static void addInputPath(JobConf conf, Path path)
public static void addInputPaths(JobConf conf, String commaSeparatedPaths)
public static void setInputPaths(JobConf conf, Path... inputPaths)
public static void setInputPaths(JobConf conf, String commaSeparatedPaths)
其中addInputPath() 和 addInputPaths() 用於添加路徑,以構成路徑列表。而setInputPath() 用於一次性設置完整的路徑列表(會替換前面所有路徑設置)。
在設置路徑后,也可以指定需要排除的特定文件,此功能由 setInputPathFilter() 實現:
public static void setInputPathFilter(JobConf conf, Class<? extends PathFilter> filter) {
conf.setClass("mapreduce.input.pathFilter.class", filter, PathFilter.class);
}
它可以設置一個過濾器PathFilter,默認的實現是過濾掉隱藏文件(以 . 和 _ 開頭的文件)。如果通過setInputPathFilter() 設置過濾器,它會在默認過濾器的基礎上進行過濾,也就是說,僅會在非隱藏文件中再次進行過濾。
輸入路徑的設置可以通過屬性與選項進行配置,在屬性配置中相關配置為:
mapreduce.input.fileinputformat.inputdir (逗號分隔屬性,無默認值)
mapreduce.input.pathFilter.class (PathFilter 類名,無默認值)
4. FileInputFormat 類處理輸入分片
在設置了一組文件后,FileInputFormat會將文件轉換為輸入分片。這里需要注意的是:在HDFS中,一個文件可以占用(分布到)多個block,但是不會存在一個block中存多個文件。對於小文件(小於一個HDFS 塊大小的文件)來說,一個文件就是占用一個block,但是不會占據整個block的空間。例如,當一個1MB的文件存儲在一個128MB 的塊中時,文件只使用 1MB 的磁盤空間,而不是128MB)。
FileInputFormat 只分割大文件,也就是文件超過HDFS塊的大小。在FileInputFormat中,控制分片大小的屬性有:
mapreduce.input.fileinputformat.split.minsize 一個文件分片最小的有效字節數(int類型),默認值為1(字節)
mapreduce.input.fileinputformat.split.maxsize 一個文件分片中最大的有效字節數(long 類型),默認值為Long.MAX_VALUE,即9223372036854775807
dfs.blocksize HDFS中的塊大小(按字節),默認為 128MB(即 134217728)
最小分片通常為1個字節,用戶可以設置最小分片的大小超過HDFS 塊大小,這樣會強制分片比HDFS塊大。但是如果數據存儲在 HDFS 上,則這樣對data locality 來說,並不友好,以至於延長任務執行時間。
最大分片默認是 Java Long 類型的最大值,只有把它的值設置為小於 HDFS Block 大小才有效,此時會強制分片比塊小。
在 FileInputFormat中,分片的大小由以下公式計算:
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
其中參數部分為:
long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
protected long getFormatMinSplitSize() {
return 1L;
}
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);
}
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L);
}
minSize 若未指定,則默認為 1。MaxSize默認為Java Long類型最大值。再計算時,先取 maxSize 和 blockSize 的最小值,然后再取結果與 minSize的最大值。
在默認情況下:minSize < blockSize < maxSize
所以分片的大小就是 blockSize大小。
5. 小文件與 CombineFileInputFormat
相對於大量的小文件,Hadoop更適合處理少量的大文件。其中一個原因是:對於每個小文件(遠小於 HDFS塊大小),FileInputFormat 都會生成一個分片(生成的分片要么是文件的整個內容,要么是文件的部分內容),這樣會產生大量的 map 任務,並且每個map任務僅處理一小部分數據,這樣會導致任務執行效率低下,時間過長。
CombineFileInputFormat 可以緩解此問題,它針對小文件而設計。FileInputFormat 為每個小文件產生一個分片,而CombineFileInpurtFormat 把多個文件打包到一個分片中,以便於每個 mapper 可以處理更多的數據。更重要的是:CombineFileInputFormat在分配多個block到同一個 split時,會考慮到node locality 以及 rack locality。所以它的速度在一個典型的 mr 任務中,處理輸入的速度並不會下降。
不過盡可能要避免小文件過多的情況,原因有:
1. 處理小文件會增加運行作業而必須的尋址次數
2. 浪費namenode的內存
可以嘗試使用順序文件(sequence file)將這些小文件合並成一個或多個大文件:例如將文件名作為key,文件內容作為 value。但是如果集群里已經有了大量小文件,可以嘗試一下CombineFileInputFormat 方法。
CombinedFileInputFormat不僅處理小文件有好處,處理大文件時也有益處。例如,如果mapper在處理一個block時僅花費很少的時間,則可以考慮使用CombineFileInputFormat,並將maximum split size 設置為 HDFS block 大小的幾倍(參數為mapred.max.split.size)。這樣每個mapper會處理多個block,使得整個處理時間下降。
6. 避免分片
有時候可能需要計算整個文件里的順序關系,這種任務無法分布式處理,所以只能讓文件由一個mapper處理,此時需要避免文件被分片。
有兩種方式可以避免文件被分片,而是當作一個單獨分片處理:
1. 設置最小分片大小split.minsize 為Java Long類型最大值(long.MAX_VALUE)
2. 使用FileInputFormat 具體子類時,重寫isSplitable() 方法,把返回值設置為 false
使用第二種方法時,以 TextInputFormat類為例:
public class TextInputFormat extends FileInputFormat<LongWritable, Text> implements JobConfigurable {
private CompressionCodecFactory compressionCodecs = null;
public TextInputFormat() {
}
public void configure(JobConf conf) {
this.compressionCodecs = new CompressionCodecFactory(conf);
}
protected boolean isSplitable(FileSystem fs, Path file) {
CompressionCodec codec = this.compressionCodecs.getCodec(file);
return null == codec ? true : codec instanceof SplittableCompressionCodec;
}
….
}
默認會根據 CompressionCodec 類型判斷是否切分,也可以直接指定return 為 false,使得輸入文件不可切分。
References:Hadoop權威指南第4版