1.概述
前面我們已經對Hadoop有了一個初步認識,接下來我們開始學習Hadoop的一些核心的功能,其中包含mapreduce,fs,hdfs,ipc,io,yarn,今天為大家分享的是mapreduce部分,其內容目錄如下所示:
- MapReduce V1
- MapReduce V2
- MR V1和MR V2的區別
- MR V2的重構思路
本篇文章的源碼是基於hadoop-2.6.0-src.tar.gz來完成的。代碼下載地址,請參考《Hadoop2源碼分析-准備篇》。
2.MapReduce V1
下面我們給出第一代的MapReduce的架構圖,如下所示:
上圖描述了第一代MapReduce框架的流程以及設計思路,下面為大家解釋下這張圖的具體含義:
- 當我們編寫完MR作業后,需要通過JobClient來提交一個job,提交的信息會發送到JobTracker模塊,這個模塊是第一代MapReduce計算框架的核心之一,它負責與集群中的其他節點維持心跳,為提交的作業分配資源,管理提交的作業的正常運作(失敗,重啟等)。
- 第一代MapReduce的另一個核心的功能是TaskTracker,在各個TaskTracker安裝節點上,它的主要功能是監控自己所在節點的資源使用情況。
- TaskTracker監控當前節點的Tasks的運行情況,其中包含Map Task和Reduce Task,最后由Reduce Task到Reduce階段,將結果輸送到HDFS的文件系統中;其中的具體流程如圖中描述的1-7步驟。TaskTracker在監控期間,需要把這些信息通過心跳機制發送給JobTracker,JobTracker收集到這些信息后,給新提交的作業分配其他的資源,避免重復資源分配。
可以看出,第一代的MapReduce架構簡單清晰,在剛面世的那幾年,也曾獲得總多企業的支持和認可。但隨着分布式集群的規模和企業業務的增長,第一代框架的問題也逐漸暴露出來,主要有以下問題:
- JobTracker是第一代MapReduce的入口點,若是JobTracker服務宕機,整個服務將會癱瘓,存在單點問題。
- JobTracker負責的事情太多,完成來太多的任務,占用過多的資源,當Job數非常多的時候,會消耗很多內存,容易出現性能瓶頸。
- 對TaskTracker而言,Task擔當的角色過於簡單,沒有考慮到CPU及內存的使用情況,若存在多個大內存的Task被集中調度,容易出現內存溢出。
- 另外,TaskTracker把資源強制分為map task slot和reduce task slot,若是MR任務中只存在其中一個(map或是reduce),會出現資源浪費的情況,資源利用率低。
- 從開發人員的角度來說,源碼分析的時候,閱讀性不夠友好,代碼量大,任務不清晰,給開發人員在修復BUG和維護的時候增大了難度。
3.MapReduce V2
在Hadoop V2中,加入了YARN的概念,所以MapReduce V2的架構和MapReduce V1的架構有些許的變化,如下圖所示:
從上圖中,我們可以清晰的看出,架構重構的基本思想在於將JobTracker的兩個核心的功能單獨分離成獨立的組件了。分離后的組件分別為資源管理(Applications Manager)和任務調度器(Resource Scheduler)。新的資源管理器(Resource Manager)管理整個系統的資源分配,而每一個Node Manager下的App Master(Application Master)負責對應的調度和協調工作,而在實際中,App Master從Resource Manager上獲得資源,讓Node Manager來協同工作和任務監控。
從圖中我們可以看出,Resource Manager是支持隊列分層的,這些隊列可以從集群中獲取一定比例的資源,也就是說Resource Manager可以算得上是一個調度器,它在執行的過程當中本身不負責對應用的監控和狀態的定位跟蹤。
Resource Manager在內存,CPU,IO等方面是動態分配的,相比第一代MapReduce計算框架,在資源使用上大大的加強了資源使用的靈活性。上圖中的Node Manager是一個代理框架,負責應用程序的執行,監控應用程序的資源利用率,並將信息上報給資源管理器。另外,App Master所擔當的角色職責包含:在運行任務是,向任務調度器動態的申請資源,對應用程序的狀態進行監控,處理異常情況,如若出現問題,會在其他節點進行重啟。
4.MR V1和MR V2的區別
在和大家分析完 MR V1 和 MR V2 的架構后,我們來看看二者有哪些變化。在MR V2版本中,大部分的API接口都是兼容的保留下來,MR V1中的JobTracker和TaskTracker被替換成相應的Resource Manager,Node Manager。對比於MR V1中的Task的監控,重啟等內熱都交由App Master來處理,Resource Manager提供中心服務,負責資源的分配與調度。Node Manager負責維護Container的狀態,並將收集的信息上報給Resource Manager,以及負責和Resource Manager維持心跳。
MR V2中加入Yarn的概念后,體現以下設計優點:
- 減少來資源消耗,讓監控每一個作業更加分布式了。
- 能夠支持更多的變成模型,如:Spark,Storm,以及其他待開發的編程模型。
- 將資源以內存量的概念來描述,比MR V1中的slot更加合理。
另外,在工程目錄結構也有了些許的變化,如下表所示:
改變目錄 | MR V1 | MR V2 | 描述 |
配置文件 | ${HADOOP_HOME}/conf | ${HADOOP_HOME}/etc/hadoop | MR V2中的配置文件路徑修改為etc/hadoop目錄下 |
腳本 | ${HADOOP_HOME}/bin | ${HADOOP_HOME}/sbin和${HADOOP_HOME}/bin | 在MR V2中啟動,停止等命令都位於sbin目錄下,操作hdfs的命令存放在bin目錄下 |
JAVA_HOME | ${HADOOP_HOME}/conf/hadoop-env.sh | ${HADOOP_HOME}/etc/hadoop/hadoop-env.sh和${HADOOP_HOME}/etc/hadoop/yarn-env.sh | 在MR V2中需要同時在hadoop-env.sh和yarn-env.sh中配置JDK的路徑 |
由於添加Yarn特性,與第一代MR的框架變化較大,第一代的核心配置文件許多項也在新框架中摒棄了,具體新框架的核心配置文件信息,請參考《配置高可用的Hadoop平台》。
5.MR V2的重構思路
在V2中的MapReduce重構的思路主要有以下幾點:
- 層次化的管理:分層級對資源的調度和分配進行管理。
- 資源管理方式:由第一代的slot作為資源單位元,調整為更加細粒的內存單位元。
- 編程模型拓展:V2版的設計支持除MapReduce以外的編程模型。
MapReduce:WordCount V2,代碼如下:
package cn.hdfs.mapreduce.example; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.StringUtils; /** * @date Apr 17, 2015 * * @author dengjie */ public class WordCount2 { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ static enum CountersEnum { INPUT_WORDS } private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private boolean caseSensitive; private Set<String> patternsToSkip = new HashSet<String>(); private Configuration conf; private BufferedReader fis; @Override public void setup(Context context) throws IOException, InterruptedException { conf = context.getConfiguration(); caseSensitive = conf.getBoolean("wordcount.case.sensitive", true); if (conf.getBoolean("wordcount.skip.patterns", true)) { URI[] patternsURIs = Job.getInstance(conf).getCacheFiles(); for (URI patternsURI : patternsURIs) { Path patternsPath = new Path(patternsURI.getPath()); String patternsFileName = patternsPath.getName().toString(); parseSkipFile(patternsFileName); } } } private void parseSkipFile(String fileName) { try { fis = new BufferedReader(new FileReader(fileName)); String pattern = null; while ((pattern = fis.readLine()) != null) { patternsToSkip.add(pattern); } } catch (IOException ioe) { System.err.println("Caught exception while parsing the cached file '" + StringUtils.stringifyException(ioe)); } } @Override public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase(); for (String pattern : patternsToSkip) { line = line.replaceAll(pattern, ""); } StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); Counter counter = context.getCounter(CountersEnum.class.getName(), CountersEnum.INPUT_WORDS.toString()); counter.increment(1); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); String[] remainingArgs = optionParser.getRemainingArgs(); if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) { System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount2.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); List<String> otherArgs = new ArrayList<String>(); for (int i=0; i < remainingArgs.length; ++i) { if ("-skip".equals(remainingArgs[i])) { job.addCacheFile(new Path(remainingArgs[++i]).toUri()); job.getConfiguration().setBoolean("wordcount.skip.patterns", true); } else { otherArgs.add(remainingArgs[i]); } } FileInputFormat.addInputPath(job, new Path(otherArgs.get(0))); FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1))); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Spark:WordCount,代碼如下:
package com.hdfs.spark.example /** * @date Apr 17, 2015 * * @author dengjie */ import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ /** * 統計字符出現次數 */ object WordCount { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: <file>") System.exit(1) } val conf = new SparkConf() val sc = new SparkContext(conf) val line = sc.textFile(args(0)) line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println) sc.stop() } }
6.結束語
這篇文章就和大家分享到這里,如果大家在研究和學習的過程中有什么疑問,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!