(一)MapReduce介紹
1、MapReduce簡介
MapReduce是Hadoop生態系統的一個重要組成部分,與分布式文件系統HDFS、分布式數據庫HBase一起合稱為傳統Hadoop的三駕馬車,一起構成了一個面向海量數據的分布式系統的基礎架構。
MapReduce是一個用於大規模數據(大於1TB)處理的分布式計算模型、編程模型,它最初是由Google設計並實現的,在Google提出時,給它的定義是:Map/Reduce是一個編程模型(programming model),是一個用於處理和生成大規模數據集(processing and generating large data sets)的相關的實現。
MapReduce的主要思想“Map(映射)”和“Reduce(規約)”都來自於函數式編程語言。MapReduce極大地方便了編程人員在不會分布式並行編程的情況下,將自己的程序運行在分布式系統之上。用戶只需要定義一個map函數來處理一個key/value對以生成一批中間的key/value對,再定義一個reduce函數將所有這些中間的有着相同key的values合並起來。很多現實世界中的任務都可用這個模型來表達,具有較強的實用價值。
具體來看,MapReuce應當是包含了以下三層含義:
(1)MapReduce是一個基於集群的高性能並行計算平台。通過MapReduce可以將市場上普通的商用服務器構成一個包含數十、數百甚至數千個節點的分布和並行計算集群。
(2)MapReduce是一個並行計算與運行軟件框架。它提供了一個龐大但設計精良的並行計算軟件框架,能自動完成計算任務的並行化處理,自動划分計算數據和計算任務,在集群節點上自動分配和執行任務以及收集計算結果,將數據分布存儲、數據通信、容錯處理等並行計算涉及到的很多系統底層的復雜細節交由系統負責處理,大大減少了軟件開發人員的負擔。
(3)MapReduce是一個並行程序設計模型與方法。它借助於函數式程序設計語言Lisp的設計思想,提供了一種簡便的並行程序設計方法,用Map和Reduce兩個函數編程實現基本的並行計算任務,提供了抽象的操作和並行編程接口,以簡單方便地完成大規模數據的編程和計算處理。
MapReduce的數據處理模型非常簡單:map函數和reduce函數的輸入和輸出都遵循<key,value>鍵值對的格式,簡單的用符號表示就是:
Map:(K1,V1)——> list(K2,V2)
Reduce:(K2,list< V2>)——> list<K3,V3>
Map-Reduce框架的運作完全基於<key,value>對,即數據的輸入是一批<key,value>對,生成的結果也是一批<key,value>對,只是有時候它們的類型不一樣而已。Key和value的類由於需要支持被序列化(serialize)操作,所以它們必須要實現Writable接口,而且key的類還必須實現WritableComparable接口,使得可以讓框架對數據集的執行排序操作,后面我們通過具體的實例來展示它的用法。
2、MapReduce版本演化
MapReduce是Hadoop生態系統的一員,是一個完全開源的分布式計算系統。MapReduce從第一次提出到今天,並不是一成不變的,雖然其主流思想和計算模型沒有大的改變,但是整個系統也是在不斷的完善和演變的。
首先經典版本的MapReduce框架,也就是第一版成熟的商用框架,屬於Hadoop的V1.0版本,這個版本的主要特點是簡單易用,其思路也比較清晰,各個Client提交Job給一個統一的Job Tracker,然后Job Tracker將Job拆分成N個Task,然后進行分發到各個節點(Node)進行並行協同運行,然后再將各自的運行結果反饋至Job Tracker,進而輸出結果。

雖然實現簡單,但是這個1.0版本存在着其固有的局限性,其中最主要的一點就是:單點故障問題。所有的Job的完成都得益於JobTracker的調度和分配,一旦此節點宕機就意味着整個平台的癱瘓,當然,在實際中大部分通過一個備用機來解決。但是,在一個以分布式運算為特性的框架中,將這種核心的計算集中與一台機器不是一個最優的方案。其次,這個設計擴展性不強,容易造成資源的浪費。
因此,為了減輕單個JobTracker的職責,mapreduce的2.0版本開始引入了YARN作為集群的資源管理器,JobTracker的職責分為兩大部分:集群資源管理和任務協調,YARN作為資源管理器,專注於負責整個平台的資源管理,而任務的調度和協調交給下屬的任務節點來完成。其主要的運行機制后面具體解析。
目前為止,Hadoop已經發展到了3.0版本,3.0和2.0版本在編程模型和運行機制上沒有太大的變化,仍然使用YARN作為其資源管理器,但是在穩定性、存儲開銷和兼容性等方面有所優化。
(二)MapReduce實例(WordCount)
下面通過實例來對MapReduce的過程進行說明。
WordCount是Hadoop自帶的一個例子,目標是統計文本文件中單詞的個數。假設有如下的兩個文本文件來運行WorkCount程序:
第一個文件內容:Hello World Bye World
第二個文件內容:Hello Hadoop GoodBye Hadoop
(1)Map數據的輸入
MapReduce針對文本文件缺省使用LineRecordReader類來實現讀取,一行一個key/value對,key取偏移量,value為行內容。因此,對於給出的文件,假設每個文件正好是一個分片,那么會有兩個Map任務,MapReduce會將其映射為如下所示的鍵值對作為Map過程的輸入。
Map任務 | key | value |
---|---|---|
map1 | 0(偏移量) | Hello World Bye World |
map2 | 0(偏移量) | Hello Hadoop GoodBye Hadoop |
用戶通過定義map函數對輸入的鍵值對進行處理,目標是統計每個單詞的個數,這相當於一個數據預處理的過程,經過處理后,會輸出一系列的鍵值對,鍵是每個單詞,而值是個數(也就是1)。
Map任務 | key | value |
---|---|---|
map1 | Hello | 1 |
map1 | World | 1 |
map1 | Bye | 1 |
map1 | World | 1 |
map2 | Hello | 1 |
map2 | Hadoop | 1 |
map2 | GoodBye | 1 |
map2 | Hadoop | 1 |
(3)Reduce的輸入
Map或者Combiner的輸出(如果有的話)會經歷一個shuffle的過程,這個過程將key相同的數據進行合並,並按照字符順序進行排序。
如這里Combiner輸出進行shuffle之后會得到:
Key | Value |
---|---|
Bye | [1] |
GoodBye | [1] |
Hadoop | [2] |
Hello | [1,1] |
World | [2] |
最后,Reducer實現將相同key的值合並起來,得到最后的結果。
Key | Value |
---|---|
Bye | 1 |
GoodBye | 1 |
Hadoop | 2 |
Hello | 2 |
World | 2 |
如圖所示:

如下圖:

這就是一個MapReduce應用的實例,其編程實現也非常簡單,用戶只需要定義map函數和reduce函數,然后寫一個驅動程序來運行作業即可。
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text(); //輸出的key的類型,可以理解為String
public void map(LongWritable key, Text value, Context context) {
String line = value.toString(); //每行句子
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one); //輸出
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
//在這里,reduce步的輸入相當於<單詞,valuelist>,如<Hello,<1,1>>
public void reduce(Text key, Iterable<IntWritable> values,Context context) {
int sum = 0;
for (IntWritable val : values)
sum += val.get();
context.write(key, new IntWritable(sum));
}
}
以上就是map函數和reduce函數的實現,邏輯都很簡單,最后我們只需要寫一個主函數,設置一個Job作業,進行相對設置,就可以運行,比如如下的Job作業設置了處理該作業的類、作業名字、輸入輸出數據的路徑、map和reduce對應的類、輸出結果類型,最后調用執行命令進行執行即可。
Job job = new Job(); // 創建一個作業對象
job.setJarByClass(WordCount.class); // 設置運行/處理該作業的類
job.setJobName("WordCount");
FileInputFormat.addInputPath(job, new Path(args[0])); //設置這個作業輸入數據的路徑
FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置這個作業輸出結果的路徑
job.setMapperClass(Map.class); //設置實現了Map步的類
job.setReducerClass(Reduce.class); //設置實現了Reduce步的類
job.setOutputKeyClass(Text.class); //設置輸出結果key的類型
job.setOutputValueClass(IntWritable.class); //設置輸出結果value的類型
System.exit(job.waitForCompletion(true) ? 0 : 1); //執行作業