MapReduce原理及源碼解讀
一、分片
靈魂拷問:為什么要分片?
- 分而治之:MapReduce(MR)的核心思想就是分而治之;何時分,如何分就要從原理和源碼來入手。做為碼農大家都知道,不管一個程序多么復雜,在寫代碼和學習代碼之前最重要的就是搞懂輸入和輸出,而MR的輸入其實就是一個目錄。而所謂的分而治之其實也是在把大文件分成小文件,然后一個機器處理一個小文件,最后再合並。所以MR的第一步就是對輸入的文件進行分片。
1.1 對誰分片
-
對每個文件分片:分片是對輸入目錄中的每一個文件進行分片。后面的分片都是針對單個文件分片。
-
源碼解讀(對誰分片):
// 分片的源碼位置
package org.apache.hadoop.mapreduce.lib.input;
abstract class FileInputFormat.java;
// 下面代碼所在方法
method getSplits();
// InputStatus表示一個切片類
List<InputSplit> splits = new ArrayList<InputSplit>();
// 得到所有輸入文件
List<FileStatus> files = listStatus(job);
// 遍歷每個文件。 根據每個文件來切片,而不是整個文件夾
for (FileStatus file : files) {
// 分片1
}
1.2 長度是否為0
- 文件長度:當文件長度不為0時才會進行下面的分片操作;如果文件長度為0,則會向分片列表中添加一個空的hosts文件數組和空長度的文件。也就是說,空文件也會創建一個空的分片。
- 源碼解讀(長度是否為0):
for (FileStatus file : files) {
Path path = file.getPath();
// 獲取文件大小
long length = file.getLen();
if (length != 0) {
// 分片2
} else {// 如果文大小為空,默認就創建一個空的hosts文件數組和空長度的文件
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
1.3 是否可以分片
- 壓縮格式:並不是所有的文件都可以分片,有一些壓縮格式的文件是不可以分片的。因此只會對可以分片的文件進行分片,而不可以分片的文件即使再大也會作為一個整體來處理,相當於一個片。
- 源碼解讀(是否可以分片):
// 如果可以分片
if (isSplitable(job, path)) {
// 分片3
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
// 判斷一個文件是否可以切片
// FileInputFormat抽象類中默認返回true,子類TextInputFormat中實現如下
@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {// 如果一個文件的壓縮編碼為null,那么表示可以切片
return true;
}// 如果一個文件的壓縮編碼是SplittableCompressionCodec的子類,那么表示當前文件也可以切片
return codec instanceof SplittableCompressionCodec;
}
1.4 分片的大小
- 分片大小:分片太大就失去了分片的意義;如果分片很小,則管理和構建map任務的時間就會增多,效率變低。並且如果分片跨越兩個數據塊,那么分片的部分數據需要通過網絡傳輸到map任務運行的節點上,效率會更低。所以分片的最佳大小應該和HDFS的分塊大小一致。Hadoop2默認128M。
- 源碼解讀(分片大小):
// FormatMinSplitSize是 1, MinSplitSize如果沒配置默認是 1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
// 如果沒配置,則默認是 Long類型的最大值
long maxSize = getMaxSplitSize(job);
// 塊大小,Hadoop2是128M,本地模式為32M
long blockSize = file.getBlockSize();
// 分片大小計算公式。默認就是blockSize的大小
long splitSize=Math.max(minSize, Math.min(maxSize, blockSize));
- 自定義分片大小:由上面的公式可知,默認的分片大小就是blockSize的大小。如果要自定義大於blockSize,比如改為200M,就把minSize改為200;小於blockSize,比如20M,就把maxSize改為20
- 1.1倍:最常見的問題就是:一個大小為130M的文件,在分片大小為128M的集群上會分成幾片?答案是1片;因為 128*1.1>130,准確來說應該是130 / 128 < 1.1 (源碼的公式)。也就是說,如果剩下的文件大小在分片大小的1.1倍以內,就不會再分片了。要這個1.1倍,是為了優化性能;試想如果不這樣,當還剩下130M大小的時候,就會分成一塊128M,一塊2M,后面還要為這個2M的塊單獨開一個map任務,不划算。至於為什么是1.1,這個1.1是專家們通過反復試驗得出來的結果。
- 源碼解讀(1.1倍):
// 當剩余文件的大小,大於分片大小的1.1倍時,才會分片
private static final double SPLIT_SLOP = 1.1; // 10% slop
// bytesRemaining為文件剩余大小,splitSize為上面計算出的分片大小
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
// 分片4
}
1.5 開始分片
- 終於分片了:經過上面的層層條件,下面就是// 分片4中的分片代碼。與HDFS的物理分塊不同的是,MapReduce的分片只是邏輯上的分片,即按照偏移量分片。
// 封裝一個分片信息(包含文件的路徑,分片的起始偏移量,要處理的大小,分片包含的塊的信息,分片中包含的塊存在哪兒些機器上)
int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
// makeSplit進行切片操作,返回值是一個切片,並且加入到切片列表中
splits.add(makeSplit(path, length - bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
// 剩余文件大小
bytesRemaining -= splitSize;
1.6 分片后讀取會不會斷行
- 不會:由於分片時是按照長度進行分片的,那就有很大可能會把一行數據分在兩個片里面,所以分片的時候確實會斷行。如果讀取並處理斷行的數據,就會導致結果不正確,那是肯定不行的。所以LineRecordReader類就充當了讀取記錄的角色,保證讀取不斷行;其中nextKeyValue()方法里是真正給Mapper中的key賦值的地方,並且調用了父類LineReader類中的readLine()方法來給value賦值。
- 源碼解讀(讀取時不斷行):
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter");
// 行分隔符
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
// 返回LineRecordReader對象
return new LineRecordReader(recordDelimiterBytes);
}
}
// 行記錄讀取類,提供讀取片中數據的功能,並且保證不斷行
public class LineRecordReader extends RecordReader<LongWritable, Text> {
// ......其他代碼
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
// ......
// 如果不是第一個分片,則開始位置退到下一行記錄的開始位置
// 因為為了保證讀取時不斷行,每個塊都會向后多讀一行(最后一個除外)
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
}
public boolean nextKeyValue() throws IOException {
// 給Mapper中輸入的key賦值
key.set(pos);
// 實例化Mapper中輸入的value
if (value == null) {
value = new Text();
}
// 注意是<=end,在等於end時還會執行一次,多讀了一行,所以不會斷行
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
// 給Mapper中輸入的value賦值。
// readLine方法會根據是否自定義行分隔符來調用不同的方法。
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;
}
}
}
}
二、Map階段
2.1 實例化Mapper
-
各種實例化:上面費了很大的勁來編寫分片TextInputFormat,和讀取類LineRecordReader;而這一切都是為了把輸入數據很好的傳給map()方法來運算,所以首先就要實例化我們自定義的Mapper類。
-
源碼解讀(各種實例化):
package org.apache.hadoop.mapred;
class MapTask.java;
method runNewMapper();
// 通過反射來獲取Mapper。在Job中設置的Mapper,也就是自己定義的繼承自Mapper的類
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// 通過反射來得到 InputFormat。默認是TextInputFormat
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// 獲得當前MapTask要處理的split
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
// 根據InputFormat對象創建RecordReader對象。默認是LineRecordReader
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
// 初始化。用來打開文件,並且調整文件的頭指針
input.initialize(split, mapperContext);
// MapTask中調用Mapper的run()方法
mapper.run(mapperContext);
2.2 調用map()方法
- 每行數據調用一次:從上面的代碼中我們知道,MapTask中會調用Mapper類的run()方法;而run()方法會在while循環中調用map()方法,由退出條件可知,是每一行數據調用一次map()方法。
- 源碼解讀(怎么調用map()方法):
public void run(Context context) throws IOException, InterruptedException {
// 在所有map執行之前初始化,也可以根據業務需要來重寫此方法
setup(context);
try {
// context.nextKeyValue()其實就是LineRecordReader中的nextKeyValue()方法;
// 在run方法中遍歷所有的key,每行數據都執行一次自定義map方法;
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
// 父類Mapper中的setup()和cleanup()方法中什么都沒做;
// 只執行一次,可以根據業務需要來重寫此方法;
cleanup(context);
}
}
三、Shuffle階段
靈魂拷問:哪來的Shuffle?
- 理論與實現:看過源碼的都知道,其實源碼中根本就沒有什么shuffle;shuffle只是一個過程,確切的來說是連貫Map階段和reduce階段的一個理論過程,而它的實現主要在MapTask和ReduceTask類中。shuffle階段可以說是MapReduce中最核心的一個階段。
3.1 shuffle的概念
- 作用:shuffle這個單詞的本意是洗牌、打亂的意思,而在這里則是:將map端的無規則輸出按照指定的規則“打亂”成具有一定規則的數據,以便reduce端接收和處理。
- 流程:shuffle的范圍是map輸出后到reduce輸入前。它的流程主要包括Map端shuffle和reduce端shuffle。
- MapReduce大致流程:
3.2 Map端Shuffle
- 作用:Map端的shuffle過程是對Map的結果進行分區、排序、溢寫、合並分區,最后寫入磁盤;最終會得到一個分區有序的文件,即先按分區排序,再按key排序。
- Map端shuffle大致流程:
3.2.1 分區(partition)
- 概念:對於map的每一個輸出的鍵值對,都會根據key來生成partition再一起寫入環形緩沖區。每一個reduceTask會處理一個partition(第0個reduceTask處理partition為0的分區,以此類推)。
- 如何分區:默認情況下,分區號是key的hash值對numReduceTask數量取模的結果。也可以自定義分區。
- 源碼解讀(如何分區):
// 當設置的reduceTask數大於實際分區數時,可以正常執行,多出的分區為空文件;
// 當設置的reduceTask數小於實際分區數時,會報錯。
job.setNumReduceTasks(4);
// 如果設置的 numReduceTasks大於 1,而又沒有設置自定義的 PartitionerClass
// 則會調用系統默認的 HashPartitioner實現類來計算分區。
job.setPartitionerClass(WordCountPartitioner.class);
// 自定義分區
public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
private static HashMap<String, Integer> map = new HashMap<>();
static {
map.put("0734", 0);
map.put("0561", 1);
map.put("0428", 2);
}
// 當 Mapper的輸出要寫入環形緩沖區時,會調用此方法來計算當前<K,V>的分區號
@Override
public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
String strText = text.toString();
return map.getOrDefault(strText.substring(0, 4), 3);
}
}
// MapTask.java$NewOutputCollector
public void write(K key, V value) throws IOException, InterruptedException {
// 把 K,V以及分區號寫入環形緩沖區
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
3.2.2 寫入環形緩沖區
- 概念:環形緩沖區是在內存中的一個字節數組kvbuffer。kvbuffer不僅存放map輸出的<k, v>,還在另一端存放了<k, v>的索引(元數據) kvmeta,每個kvmeta包括value的起始位置、key的起始位置、partition值、value的長度,占用4個int長度。上圖中的bufindex和kvindex分別表示kvbuffer和kvmeta的指針。環形緩沖區的默認大小是100M,當寫入數據大小超過80%(80M)就會觸發Spill,溢寫到磁盤。
- 源碼解讀(Spill):
// SpillThread線程在MapTask$MapOutputBuffer類中初始化,在init()方法中啟動。
// 它會一直監視環形緩沖區,當大小超過80%的時候,就會調用sortAndSpill()方法。
protected class SpillThread extends Thread {
@Override
public void run() {
// ....
// run方法中調用排序並溢寫方法
while (true) {
// ....
sortAndSpill();
}
//....
}
}
3.2.3 排序並溢寫(sortAndSpill):
- 排序:觸發溢寫后,會先排序,再溢寫。排序是根據partition和key的升序排序,移動的只是索引數據,排序的結果是將kvmeta中到的數據按照partition聚合在一起,同一個partition內再根據key排序。
- 溢寫:Spill線程根據排序后的kvmeta文件,將一個個partition輸出到文件,在這次溢寫過程中,會將環形緩沖區中已計算的數據(80M)寫入到一個文件spill.out,所以引入了索引文件spill.index,它記錄了partition在spill.out中的位置。
3.2.4 合並(merge):
- 概念:如果Map的數據很大,那么就會觸發多次Spill,spill.out和spill.index文件也會很多。所以最后就要把這些文件合並,方便Reduce讀取。
- 合並過程:合並過程中,首先會根據spill.index文件,將spill.out文件中的partition使用歸並排序分別寫入到相應的segment中,然后再把所有的segment寫入到一個file.out文件中,並用file.out.index來記錄partition的索引。由於合並時可能有相同的key,所以如果設置了combine,那么在寫入文件之前還會調用自定義的combine方法。
3.3 Reduce端Shuffle
3.3.1 拉取(Copy)
- 前期工作:Reduce任務會通過HTTP向各個Map任務拉取它所需的partition數據。當Map任務成功完成之后會通知 TaskTracker狀態已跟新,TaskTracker進而通知JobTracker(都是通過心跳機制實現),所以JobTracker中記錄了Map輸出和TaskTracker的映射關系。
- 何時拉取:Reduce會定期向JobTracker獲取Map的輸出位置,一旦拿到輸出位置,Reduce任務就會立即從此輸出對應的TaskTracker上復制相應的partition數據到本地,而不是等到所有Map任務結束。
3.3.2 排序合並(Merge Sort)
- 合並:copy過來的數據會先放入內存緩沖區中(大小是 JVM的heap size的70%),如果緩沖區放得下就直接把數據寫入內存,即內存到內存merge。如果緩沖區中的Map數據達到一定大小(緩沖區的66%)的時候,就會開啟內存merge,並將merge后的數據寫入磁盤,即內存到磁盤merge。當屬於該Reduce任務的map輸出全部拉取完成,則會在reduce任務的磁盤上生成多個文件(如果所有map輸出的大小沒有超過緩沖區大小,則數據只存在於內存中),這時開始最后的合並操作,即磁盤到磁盤merge。如果設置了combine,合並時也會執行。
- 排序:由於map輸出的數據已經是有序的,所以reduce在合並時的排序是歸並排序,並且reduce端的copy和sort是同時進行的,最終會得到一個整體有序的數據。
3.3.3 歸並分組(reduce)
-
歸並分組(reduce):當reduce任務執行完拉取和排序合並后,就會對相同的key進行分組。默認情況下是根據key對象中重寫的compareTo()方法來分組,如果設置了GroupingComparator,則會調用它的compare()方法來分組。reduce會把compareTo(或compare)方法計算返回為 0 的key分為一組,最終會得到一個組<key, Iterable<value,>>,其中組的key是這一組的第一個數據的key,Iterable<value,>則是相同key的value迭代器。最后再對每一個組調用Reducer的reduce()方法。
-
源碼解讀(分組):
// org.apache.hadoop.mapreduce.Reducer中的run()方法
while (context.nextKey()) {
// 調用自定義 reduce方法
reduce(context.getCurrentKey(), context.getValues(), context);
// .....
}
// org.apache.hadoop.mapreduce.task.ReduceContextImpl中的方法
public boolean nextKey() throws IOException,InterruptedException {
// 如果當前key與下一個key相同,則繼續往下走;
// 這一步就是把相同的key放到一組, 他們的value放到一個迭代器中;當下一個key不同時再調用reduce方法
while (hasMore && nextKeyIsSame) {
nextKeyValue();
}
if (hasMore) {
if (inputKeyCounter != null) {
// 計數器
inputKeyCounter.increment(1);
}
// 當nextKeyIsSame為false時,會再調用一次nextKeyValue(),而它的返回值必為true;
return nextKeyValue();
} else {
return false;
}
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (hasMore) {
nextKey = input.getKey();
// 在執行reduce方法之前調用ReduceContext中定義的GroupComparator
// 如果key的compareTo方法返回0則 nextKeyIsSame為true,也就會分到一組
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
currentRawKey.getLength(),
nextKey.getData(),
nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition()
) == 0;
} else {
nextKeyIsSame = false;
}
inputValueCounter.increment(1);
return true;
}
四、Reduce階段
4.1 執行reduce()方法
- 歸並:上面的Shuffle階段已經將數據分組成了<key, Iteralble<value,>>格式的數據,所以對於相同的key只會調用一次reduce()方法。
- 注意事項:在reduce()方法中,一定要重新創建key對象,不要直接使用參數中的key。
4.2 輸出最終結果
- 完結:整個MapReduce的輸出和輸入有點類似。輸出是實例化TextOutputFormat和LineRecordWrite對象。並由LineRecordWrite判斷是不是NullWriteable,最后輸出到文件