Hadoop編程實例之MapReduce


MapReduce原理圖:

MapReduce具體執行過程圖:

首先是客戶端要編寫好mapreduce程序,配置好mapreduce的作業也就是job,接下來就是提交job了,提交job是提交到JobTracker上的,這個時候JobTracker就會構建這個job,具體就是分配一個新的job任務的ID值,接下來它會做檢查操作,這個檢查就是確定輸出目錄是否存在,如果存在那么job就不能正常運行下去,JobTracker會拋出錯誤給客戶端,接下來還要檢查輸入目錄是否存在,如果不存在同樣拋出錯誤,如果存在JobTracker會根據輸入計算輸入分片(Input Split),如果分片計算不出來也會拋出錯誤,至於輸入分片我后面會做講解的,這些都做好了JobTracker就會配置Job需要的資源了。分配好資源后,JobTracker就會初始化作業,初始化主要做的是將Job放入一個內部的隊列,讓配置好的作業調度器能調度到這個作業,作業調度器會初始化這個job,初始化就是創建一個正在運行的job對象(封裝任務和記錄信息),以便JobTracker跟蹤job的狀態和進程。初始化完畢后,作業調度器會獲取輸入分片信息(input split),每個分片創建一個map任務。接下來就是任務分配了,這個時候tasktracker會運行一個簡單的循環機制定期發送心跳給jobtracker,心跳間隔是5秒,程序員可以配置這個時間,心跳就是jobtracker和tasktracker溝通的橋梁,通過心跳,jobtracker可以監控tasktracker是否存活,也可以獲取tasktracker處理的狀態和問題,同時tasktracker也可以通過心跳里的返回值獲取jobtracker給它的操作指令。任務分配好后就是執行任務了。在任務執行時候jobtracker可以通過心跳機制監控tasktracker的狀態和進度,同時也能計算出整個job的狀態和進度,而tasktracker也可以本地監控自己的狀態和進度。當jobtracker獲得了最后一個完成指定任務的tasktracker操作成功的通知時候,jobtracker會把整個job狀態置為成功,然后當客戶端查詢job運行狀態時候(注意:這個是異步操作),客戶端會查到job完成的通知的。如果job中途失敗,mapreduce也會有相應機制處理,一般而言如果不是程序員程序本身有bug,mapreduce錯誤處理機制都能保證提交的job能正常完成。

 

下面,我們來實現一個mapreduce的demo,wordcount

public class WordCountMapReduce extends Configured implements Tool {

    // Mapper Class
    public static class MyMapper extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        /*
         * key是偏移量,value是一行一行的值 首先分割單詞,組成key/value對進行輸出
         */
        private Text mapOutputKey = new Text();
        private final static IntWritable mapOutputValue = new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // todo
            String line = value.toString().trim();
            
            //segment
            StringTokenizer strToken = new StringTokenizer(line);
            
            while(strToken.hasMoreTokens()){
                String word = strToken.nextToken();
                mapOutputKey.set(word);
                context.write(mapOutputKey, mapOutputValue);
            }
            

        }
    }

    // Reducer
    public static class MyReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable reduceOutputValue = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            // todo

            int sum = 0;
            //reduce
            for(IntWritable value : values){
                sum+=value.get();
            }
            reduceOutputValue.set(sum);
            context.write(key, reduceOutputValue);
        }

    }

    public int run(String[] args) throws Exception {
        // set Conf env
        Configuration conf = new Configuration();
        // conf.set("mapreduce.map.output.compress", true);

        // get job by conf
        Job job = Job.getInstance(super.getConf(),
                WordCountMapReduce.class.getSimpleName());

        job.setJarByClass(WordCountMapReduce.class);

        // set job
        // step 1 : map phase
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // step 2 :reduce phase
        job.setCombinerClass(MyReducer.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // submit
        // job.submit();
        boolean isSucceed = job.waitForCompletion(true);

        return isSucceed ? 1 : 0;

    }

    // Driver
    public static void main(String[] args) throws Exception {

        args = new String[] { "hdfs://192.168.1.109:8020/home/test/test.txt",
                "hdfs://192.168.1.109:8020/home/test/ouput2" };

        int status = ToolRunner.run(new WordCountMapReduce(), args);

        System.out.println(status);

    }

}

運行結果如下:

運行成功!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM