MapReduce簡介
- MapReduce是一種分布式計算模型,是Google提出的,主要用於搜索領域,解決海量數據的計算問題。
- MR有兩個階段組成:Map和Reduce,用戶只需實現map()和reduce()兩個函數,即可實現分布式計算。
MapReduce執行流程

MapReduce原理

MapReduce的執行步驟:
1、Map任務處理
1.1 讀取HDFS中的文件。每一行解析成一個<k,v>。每一個鍵值對調用一次map函數。 <0,hello you> <10,hello me>
1.2 覆蓋map(),接收1.1產生的<k,v>,進行處理,轉換為新的<k,v>輸出。 <hello,1> <you,1> <hello,1> <me,1>
1.3 對1.2輸出的<k,v>進行分區。默認分為一個區。詳見《Partitioner》
1.4 對不同分區中的數據進行排序(按照k)、分組。分組指的是相同key的value放到一個集合中。 排序后:<hello,1> <hello,1> <me,1> <you,1> 分組后:<hello,{1,1}><me,{1}><you,{1}>
1.5 (可選)對分組后的數據進行歸約。詳見《Combiner》
2、Reduce任務處理
2.1 多個map任務的輸出,按照不同的分區,通過網絡copy到不同的reduce節點上。(shuffle)詳見《shuffle過程分析》
2.2 對多個map的輸出進行合並、排序。覆蓋reduce函數,接收的是分組后的數據,實現自己的業務邏輯, <hello,2> <me,1> <you,1>
處理后,產生新的<k,v>輸出。
2.3 對reduce輸出的<k,v>寫到HDFS中。
Java代碼實現
注:要導入org.apache.hadoop.fs.FileUtil.java。
1、先創建一個hello文件,上傳到HDFS中

2、然后再編寫代碼,實現文件中的單詞個數統計(代碼中被注釋掉的代碼,是可以省略的,不省略也行)
1 package mapreduce;
2
3 import java.net.URI;
4 import org.apache.hadoop.conf.Configuration;
5 import org.apache.hadoop.fs.FileSystem;
6 import org.apache.hadoop.fs.Path;
7 import org.apache.hadoop.io.LongWritable;
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
16
17 public class WordCountApp {
18 static final String INPUT_PATH = "hdfs://chaoren:9000/hello";
19 static final String OUT_PATH = "hdfs://chaoren:9000/out";
20
21 public static void main(String[] args) throws Exception {
22 Configuration conf = new Configuration();
23 FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
24 Path outPath = new Path(OUT_PATH);
25 if (fileSystem.exists(outPath)) {
26 fileSystem.delete(outPath, true);
27 }
28
29 Job job = new Job(conf, WordCountApp.class.getSimpleName());
30
31 // 1.1指定讀取的文件位於哪里
32 FileInputFormat.setInputPaths(job, INPUT_PATH);
33 // 指定如何對輸入的文件進行格式化,把輸入文件每一行解析成鍵值對
34 //job.setInputFormatClass(TextInputFormat.class);
35
36 // 1.2指定自定義的map類
37 job.setMapperClass(MyMapper.class);
38 // map輸出的<k,v>類型。如果<k3,v3>的類型與<k2,v2>類型一致,則可以省略
39 //job.setOutputKeyClass(Text.class);
40 //job.setOutputValueClass(LongWritable.class);
41
42 // 1.3分區
43 //job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
44 // 有一個reduce任務運行
45 //job.setNumReduceTasks(1);
46
47 // 1.4排序、分組
48
49 // 1.5歸約
50
51 // 2.2指定自定義reduce類
52 job.setReducerClass(MyReducer.class);
53 // 指定reduce的輸出類型
54 job.setOutputKeyClass(Text.class);
55 job.setOutputValueClass(LongWritable.class);
56
57 // 2.3指定寫出到哪里
58 FileOutputFormat.setOutputPath(job, outPath);
59 // 指定輸出文件的格式化類
60 //job.setOutputFormatClass(TextOutputFormat.class);
61
62 // 把job提交給jobtracker運行
63 job.waitForCompletion(true);
64 }
65
66 /**
67 *
68 * KEYIN 即K1 表示行的偏移量
69 * VALUEIN 即V1 表示行文本內容
70 * KEYOUT 即K2 表示行中出現的單詞
71 * VALUEOUT 即V2 表示行中出現的單詞的次數,固定值1
72 *
73 */
74 static class MyMapper extends
75 Mapper<LongWritable, Text, Text, LongWritable> {
76 protected void map(LongWritable k1, Text v1, Context context)
77 throws java.io.IOException, InterruptedException {
78 String[] splited = v1.toString().split("\t");
79 for (String word : splited) {
80 context.write(new Text(word), new LongWritable(1));
81 }
82 };
83 }
84
85 /**
86 * KEYIN 即K2 表示行中出現的單詞
87 * VALUEIN 即V2 表示出現的單詞的次數
88 * KEYOUT 即K3 表示行中出現的不同單詞
89 * VALUEOUT 即V3 表示行中出現的不同單詞的總次數
90 */
91 static class MyReducer extends
92 Reducer<Text, LongWritable, Text, LongWritable> {
93 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s,
94 Context ctx) throws java.io.IOException,
95 InterruptedException {
96 long times = 0L;
97 for (LongWritable count : v2s) {
98 times += count.get();
99 }
100 ctx.write(k2, new LongWritable(times));
101 };
102 }
103 }
3、運行成功后,可以在Linux中查看操作的結果

轉載自:https://www.cnblogs.com/ahu-lichang/p/6645074.html

