一:MapReduce模型簡介
MapReduce將復雜的、運行於大規模集群上的並行計算過程高度地抽象到了兩個函數:Map和Reduce。它采用“分而治之”策略,一個存儲在分布式文件系統中的大規模數據集,會被切分成許多獨立的分片(split),這些分片可以被多個Map任務並行處理
1.Map和Reduce函數
Map和Reduce
2.MapReduce體系結構
MapReduce體系結構主要由四個部分組成,分別是:Client、JobTracker、TaskTracker以及Task
1)Client
用戶編寫的MapReduce程序通過Client提交到JobTracker端 用戶可通過Client提供的一些接口查看作業運行狀態
2)JobTracker
JobTracker負責資源監控和作業調度 JobTracker 監控所有TaskTracker與Job的健康狀況,一旦發現失敗,就將相應的任務轉移到其他節點 JobTracker 會跟蹤任務的執行進度、資源使用量等信息,並將這些信息告訴任務調度器(TaskScheduler),而調度器會在資源出現空閑時,
選擇合適的任務去使用這些資源
3)TaskTracker
TaskTracker 會周期性地通過“心跳”將本節點上資源的使用情況和任務的運行進度匯報給JobTracker,同時接收JobTracker 發送過來的命令並執行相應的操作(如啟動新任務、殺死任務等) TaskTracker 使用“slot”等量划分本節點上的資源量(CPU、內存等)。一個Task 獲取到
一個slot 后才有機會運行,而Hadoop調度器的作用就是將各個TaskTracker上的空閑slot分配給Task使用。slot 分為Map slot 和Reduce slot 兩種,分別供MapTask 和Reduce Task 使用
4)Task
Task 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動
3.MapReduce工作流程
1) 工作流程概述
不同的Map任務之間不會進行通信
不同的Reduce任務之間也不會發生任何信息交換
用戶不能顯式地從一台機器向另一台機器發送消息
所有的數據交換都是通過MapReduce框架自身去實現的
2) MapReduce各個執行階段
4.MapReduce應用程序執行過程
二 :WordCount運行實例
工作流程是Input從HDFS里面並行讀取文本中的內容,經過MapReduce模型,最終把分析出來的結果用Output封裝,持久化到HDFS中
<一>WordCount的Map過程
1、使用三個Map任務並行讀取三行文件中的內容,對讀取的單詞進行map操作,每個單詞都以<key, value>形式生成
2.Map端源碼
public class WordMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken().toLowerCase()); context.write(word, one); } } }
<二>、WordCount的Reduce過程
1、Reduce操作是對Map的結果進行排序、合並等操作最后得出詞頻
2、Reduce端源碼
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, new IntWritable(sum)); } }
三:WordCount源碼
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class WordMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken().toLowerCase()); context.write(word, one); } } } public static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(WordMapper.class); job.setCombinerClass(WordReducer.class); job.setReducerClass(WordReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }