MapReduce基礎
目錄
一、關於MapReduce
1.1 為什么要MapReduce
- 單機資源有限:由於單台計算機的資源有限,計算能力不足以處理海量數據;所以需要多台計算機組成分布式集群來處理海量數據。
- 分布式計算較復雜:在分布式計算中,計算任務的分發,各個主機之間的協作;程序的啟動以及運行過程中的監控、容錯、重試等都會變得很復雜。所以引入了MapReduce框架,框架解決了分布式開發中的復雜性,開發人員只需要將大部分工作集中在業務邏輯的開發上,從而極大的提高了工作效率。
1.2 MapReduce的定義
- MapReduce是一個分布式運算程序的編程框架,用於大規模數據集(大於1TB)的並行計算;Map(映射)和reduce(歸約)是它的主要思想;它極大地方便了編程人員在不會分布式並行編程的情況下,將自己的程序運行在分布式系統上。
二、MapReduce的優缺點
2.1 優點:
- 易於編程:只需要實現一些接口,就可以完成一個分布式程序的編寫;跟編寫一個串行程序一樣;
- 良好的擴展性:當計算資源不足時,只需要簡單的增加機器來擴展它的計算能力;
- 高容錯性:當一個機器掛了之后,會自動把上面的計算任務轉移到另一個節點上運行,無需人工干預;
- 海量:適合PB級海量數據的離線處理。
2.2 缺點:
- 不適合實時計算:MapReduce由於過程較為復雜,IO次數較多,所以無法做到毫秒或秒級響應;
- 不適合流式計算:流式計算的輸入是動態的,可以不斷添加,而MapReduce的輸入是靜態的;
- 不適合DAG(有向圖)計算:對於多個程序之間有依賴關系,即后一個程序的輸入是前一個程序的輸出;雖然MapReduce也可以完成,但都是通過磁盤來傳遞中間數據,造成大量的磁盤IO,性能極低。
三、MapReduce的執行階段
3.1 執行的兩個階段
-
Map階段:若干個maptask並發實例,完全並行運行,互不相干。
-
Reduce階段:若干個reducetask並發實例,完全並行運行,但是他們的數據依賴於Map階段的輸出。
-
注意:MapReduce模型只能包含一個map階段和一個reduce階段;如果業務邏輯非常復雜,就只能使用多個MapReduce程序,串行運行。
四、編寫MapReduce程序
-
用戶需要編寫的三個部分:Mapper、Reducer、Driver(提交MR程序)。
4.1 以WordCount為例:
1. 編寫Mapper
// 注意:hadoop1.0版本中是mapred下包,hadoop2.0是mapreduce下的包
import org.apache.hadoop.mapreduce.Mapper;
// 繼承Mapper父類,泛型為輸入和輸出的<K, V>;並重寫父類的map方法
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* 每行文本都會執行一次map方法.
*
* @param key 文本偏移量.
* @param value 一行文本.
* @param context 上下文對象.
* @throws IOException .
* @throws InterruptedException 當阻塞方法收到中斷請求時拋出.
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split("\\s+"); // 拆分一行中的單詞
for (String word : words) {
context.write(new Text(word), new IntWritable(1)); // 輸出一個<K, V>
}
}
}
2. 編寫Reducer
// 繼承Reducer類,輸入的<K, V>類型為map端輸出<K, V>類型
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* 相同的key只會執行一次reduce方法
*
* @param key map端輸出的key
* @param values 相同key的value集合
* @param context 上下文對象
* @throws IOException .
* @throws InterruptedException .
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 當前的 key出現了多少次
int count = 0;
// values中的數據是反序列化過來的,最好不要直接使用values中的bean
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count)); // 輸出
}
}
3. 編寫Driver
// Driver的作用是將這個Mapper和Reducer程序打包成一個Job,並提交該Job
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 不需要為 conf設置HDFS等參數,因為conf會調用系統默認的配置文件,
// 所以這個mr程序在哪里運行就會調用哪里的配置文件,在集群上運行就會使用集群的設置文件。
Configuration conf = new Configuration();
// 刪除輸出文件,或者手動刪除
// FileHelper.deleteDir(args[1], conf);
// 根據配置文件實例化一個 Job,並取個名字
Job job = Job.getInstance(conf, "MyWordCount");
// 設置 Jar的位置
job.setJarByClass(WordCountDriver.class);
// 設置 Mapper運行類,以及輸出的key和value的類型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 設置 Reducer的運行類,以及輸出的key和value的類型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 設置分區(可以不用設置)
// 當設置的分區數大於實際分區數時,可以正常執行,多出的分區為空文件;
// 當設置的分區數小於實際分區數時,會報錯。
job.setNumReduceTasks(4);
// 如果設置的 numReduceTasks大於 1,而又沒有設置自定義的 PartitionerClass
// 則會調用系統默認的 HashPartitioner實現類來計算分區。
job.setPartitionerClass(WordCountPartitioner.class);
// 設置combine
job.setCombinerClass(WordCountCombiner.class);
// 設置輸入和輸出文件的位置
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交任務,等待執行結果,參數為 true表示打印信息
boolean result = job.waitForCompletion(true);
// 根據 job的返回值自定義退出
System.exit(result?0:1);
}
}
4. 運行
- 如果在Hadoop集群上運行還需要將這個project打包成jar包,所以一般是先在windows上運行調試。
- 由於要從命令行輸入input和output參數,所以這里配置一下輸入和輸出的位置。
五、MapReduce的主要執行流程
- job.waitForCompletion(true):將這個MapReduce任務(Job)提交,默認是提交到本地運行;部署到集群時,是提交給YARN運行。
- map():在父類Mapper的run()方法中會調用子類重寫的map()方法。輸入文件的每一行都會調用一次map()方法,map()方法的參數中:key為當前輸入行的偏移量,LongWritable類型;value為當前輸入行的數據,Text類型;context為上下文對象。父類Mapper是一個泛型類,泛型的類型表示map()方法輸入和輸出的<K, V>類型,子類在繼承時要傳入實際輸入輸出的<K, V>類型。map()使用context.write(k, v)來輸出數據到shuffle階段的環形緩沖區。
- shuffle階段簡述:shuffle階段起到承上啟下的作用;從接收map()方法的輸出,到執行reduce()方法之前都屬於shuffle階段。shuffle接收map()輸出<K,V>並通過K計算出分區號,然后與元數據一起寫入環形緩存區;環形緩沖區溢寫時會將數據排序並寫入小文件,然后歸並成一個大的分區文件。一個ReducerTask主機會到所有MapTask主機上拉取對應的分區文件,歸並所有分區文件后會對相同的key進行合並,再執行reduce方法。
- reduce():在父類Reducer的run()方法中會調用子類重寫的reduce()方法。相同的key只會調用一次reduce()方法,reduce()方法的參數中:key為相同key合並后的第一個key,與map()的輸出key類型相同;values為相同key的value列表,類型是Iterable<map()的輸出value類型>。與Mapper類類似,子類在繼承Reducer時輸入的<K, V>類型是Mapper輸出的<K, V>類型、Reducer輸出的<K, V>類型是context.write(K, V)中<K, V>的類型。reduce中的context.write(K, V)最終會寫入到輸出文件中,就是這次MapReduce的結果。