MapReduce基本原理及應用


一:MapReduce模型簡介

  MapReduce將復雜的、運行於大規模集群上的並行計算過程高度地抽象到了兩個函數:Map和Reduce。它采用“分而治之”策略,一個存儲在分布式文件系統中的大規模數據集,會被切分成許多獨立的分片(split),這些分片可以被多個Map任務並行處理

  1.Map和Reduce函數

Map和Reduce

 

  2.MapReduce體系結構

  MapReduce體系結構主要由四個部分組成,分別是:Client、JobTracker、TaskTracker以及Task

  1)Client

  用戶編寫的MapReduce程序通過Client提交到JobTracker端 用戶可通過Client提供的一些接口查看作業運行狀態

  2)JobTracker

  JobTracker負責資源監控和作業調度 JobTracker 監控所有TaskTracker與Job的健康狀況,一旦發現失敗,就將相應的任務轉移到其他節點 JobTracker 會跟蹤任務的執行進度、資源使用量等信息,並將這些信息告訴任務調度器(TaskScheduler),而調度器會在資源出現空閑時,

  選擇合適的任務去使用這些資源

  3)TaskTracker

  TaskTracker 會周期性地通過“心跳”將本節點上資源的使用情況和任務的運行進度匯報給JobTracker,同時接收JobTracker 發送過來的命令並執行相應的操作(如啟動新任務、殺死任務等) TaskTracker 使用“slot”等量划分本節點上的資源量(CPU、內存等)。一個Task 獲取到

  一個slot 后才有機會運行,而Hadoop調度器的作用就是將各個TaskTracker上的空閑slot分配給Task使用。slot 分為Map slot 和Reduce slot 兩種,分別供MapTask 和Reduce Task 使用

  4)Task

  Task 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動

  

  3.MapReduce工作流程

  1) 工作流程概述

  

不同的Map任務之間不會進行通信

不同的Reduce任務之間也不會發生任何信息交換

用戶不能顯式地從一台機器向另一台機器發送消息

所有的數據交換都是通過MapReduce框架自身去實現的

  2) MapReduce各個執行階段

  

  4.MapReduce應用程序執行過程

  

 

二 :WordCount運行實例

  工作流程是Input從HDFS里面並行讀取文本中的內容,經過MapReduce模型,最終把分析出來的結果用Output封裝,持久化到HDFS中

  <一>WordCount的Map過程

  1、使用三個Map任務並行讀取三行文件中的內容,對讀取的單詞進行map操作,每個單詞都以<key, value>形式生成

  

 

   2.Map端源碼

  

public class WordMapper extends  
            Mapper<Object, Text, Text, IntWritable> {  
  
        private final static IntWritable one = new IntWritable(1);  
        private Text word = new Text();  
  
        public void map(Object key, Text value, Context context)  
                throws IOException, InterruptedException {  
            String line = value.toString();  
            StringTokenizer itr = new StringTokenizer(line);  
            while (itr.hasMoreTokens()) {  
                word.set(itr.nextToken().toLowerCase());  
                context.write(word, one);  
            }  
        }  
    }

  <二>、WordCount的Reduce過程

  1、Reduce操作是對Map的結果進行排序、合並等操作最后得出詞頻

  

  2、Reduce端源碼

public class WordReducer extends  
            Reducer<Text, IntWritable, Text, IntWritable> {  
        private IntWritable result = new IntWritable();  
  
        public void reduce(Text key, Iterable<IntWritable> values,  
                Context context) throws IOException, InterruptedException {  
            int sum = 0;  
            for (IntWritable val : values) {  
                sum += val.get();  
            }  
            result.set(sum);  
            context.write(key, new IntWritable(sum));  
        }  
    }  

三:WordCount源碼

import java.io.IOException;  
import java.util.StringTokenizer;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.util.GenericOptionsParser;  
  
public class WordCount {  
  
    public static class WordMapper extends  
            Mapper<Object, Text, Text, IntWritable> {  
  
        private final static IntWritable one = new IntWritable(1);  
        private Text word = new Text();  
  
        public void map(Object key, Text value, Context context)  
                throws IOException, InterruptedException {  
            String line = value.toString();  
            StringTokenizer itr = new StringTokenizer(line);  
            while (itr.hasMoreTokens()) {  
                word.set(itr.nextToken().toLowerCase());  
                context.write(word, one);  
            }  
        }  
    }  
  
    public static class WordReducer extends  
            Reducer<Text, IntWritable, Text, IntWritable> {  
        private IntWritable result = new IntWritable();  
  
        public void reduce(Text key, Iterable<IntWritable> values,  
                Context context) throws IOException, InterruptedException {  
            int sum = 0;  
            for (IntWritable val : values) {  
                sum += val.get();  
            }  
            result.set(sum);  
            context.write(key, new IntWritable(sum));  
        }  
    }  
  
    public static void main(String[] args) throws Exception {  
        Configuration conf = new Configuration();  
        String[] otherArgs = new GenericOptionsParser(conf, args)  
                .getRemainingArgs();  
        if (otherArgs.length != 2) {  
            System.err.println("Usage: wordcount <in> <out>");  
            System.exit(2);  
        }  
        Job job = new Job(conf, "word count");  
        job.setJarByClass(WordCount.class);  
        job.setMapperClass(WordMapper.class);  
        job.setCombinerClass(WordReducer.class);  
        job.setReducerClass(WordReducer.class);  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(IntWritable.class);  
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
        System.exit(job.waitForCompletion(true) ? 0 : 1);  
    }  
}  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM