MapReduce原理深入理解(一)


1.MapReduce概念

1)MapReduce是一種分布式計算模型,由Google提出,主要用於搜索領域,解決海量數據的計算問題.

2)MapReduce是分布式運行的,由兩個階段組成:Map和Reduce,Map階段是一個獨立的程序,有很多個節點同時運行,每個節點處理一部分數據。Reduce階段是一個獨立的程序,有很多個節點同時運行,每個節點處理一部分數據【在這先把reduce理解為一個單獨的聚合程序即可】。

3)MapReduce框架都有默認實現,用戶只需要覆蓋map()和reduce()兩個函數,即可實現分布式計算,非常簡單。

4)兩個函數的形參和返回值都是<key、value>,使用的時候一定要注意構造<k,v>。

2.MapReduce核心思想

 

 

 

(1)分布式的運算程序往往需要分成至少2個階段。

 

(2)第一個階段的MapTask並發實例,完全並行運行,互不相干。

 

(3)第二個階段的ReduceTask並發實例互不相干,但是他們的數據依賴於上一個階段的所有MapTask並發實例的輸出。

 

(4)MapReduce編程模型只能包含一個Map階段和一個Reduce階段,如果用戶的業務邏輯非常復雜,那就只能多個MapReduce程序,串行運行。

 

總結:分析WordCount數據流走向深入理解MapReduce核心思想。

 

 

 

 

 

 3. MapReduce 中的shuffle

 

 4.Mapreduce代碼

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCount { //分割任務 // 第一對kv,是決定數據輸入的格式 // 第二隊kv 是決定數據輸出的格式
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { /*map階段數據是一行一行過來的 每一行數據都需要執行代碼*/ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { LongWritable longWritable = new LongWritable(1); String s = value.toString(); context.write(new Text(s), longWritable); } } //接收Map端數據
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { /* reduce 聚合程序 每一個k都會調用一次 * 默認是一個節點 * key:每一個單詞 * values:map端 當前k所對應的所有的v */ @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { //設置統計的初始值為0
            long sum = 0l; for (LongWritable value : values) { sum += value.get(); } context.write(key, new LongWritable(sum)); } } /** * 是當前mapreduce程序入口 * 用來構建mapreduce程序 */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //創建一個job任務
        Job job=Job.getInstance(); //指定job名稱
        job.setJobName("第一個mr程序"); //構建mr //指定當前main所在類名(識別具體的類)
        job.setJarByClass(WordCount.class); //指定map端類 // 指定map輸出的kv類型
        job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //指定reduce端類 //指定reduce端輸出的kv類型
        job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 指定輸入路徑
        Path in = new Path("/word"); FileInputFormat.addInputPath(job,in); //輸出路徑指定
        Path out = new Path("/output"); FileSystem fs = FileSystem.get(new Configuration()); //如果文件存在
        if(fs.exists(out)){ fs.delete(out,true); } //存在
 FileOutputFormat.setOutputPath(job,out); //啟動
        job.waitForCompletion(true); System.out.println("MapReduce正在執行"); } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM