Hadoop實例之利用MapReduce實現Wordcount單詞統計 (附源代碼)


大致思路是將hdfs上的文本作為輸入,MapReduce通過InputFormat會將文本進行切片處理,並將每行的首字母相對於文本文件的首地址的偏移量作為輸入鍵值對的key,文本內容作為輸入鍵值對的value,經過在map函數處理,輸出中間結果<word,1>的形式,並在reduce函數中完成對每個單詞的詞頻統計。整個程序代碼主要包括兩部分:Mapper部分和Reducer部分。

 

Mapper代碼

 

    public  static  class  doMapper  extends  Mapper<Object,  Text,  Text,  IntWritable>{  
//第一個Object表示輸入key的類型;第二個Text表示輸入value的類型;第三個Text表示表示輸出鍵的類型;第四個IntWritable表示輸出值的類型  
        public  static  final  IntWritable  one  =  new  IntWritable(1);  
        public  static  Text  word  =  new  Text();  
        @Override  
        protected  void  map(Object  key,  Text  value,  Context  context)  
                    throws  IOException,  InterruptedException
 //拋出異常  {  
            StringTokenizer  tokenizer  =  new  StringTokenizer(value.toString(),  " ");  
//StringTokenizer是Java工具包中的一個類,用於將字符串進行拆分
            while(tokenizer.hasMoreTokens()) //循環每一行拆分出的所有單詞
           { 
                 word.set(tokenizer.nextToken()); //返回當前位置到下一個分隔符之間的字符串
                  context.write(word,  one);  //將word存到容器中,記一個數  
            }
        
        }  
    }  

 

map函數里有三個參數,前面兩個Object key,Text value就是輸入的keyvalue,第三個參數Context context是可以記錄輸入的keyvalue。例如context.write(word,one);此外context還會記錄map運算的狀態。map階段采用Hadoop的默認的作業輸入方式,把輸入的valueStringTokenizer()方法截取出的單詞設置為key,設置value1,然后直接輸出<key,value>

 

Reducer代碼

 

    public  static  class  doReducer  extends  Reducer<Text,  IntWritable,  Text,  IntWritable>{  
//參數同Map一樣,依次表示是輸入鍵類型,輸入值類型,輸出鍵類型,輸出值類型
        private  IntWritable  result  =  new  IntWritable();  
        @Override  
        protected  void  reduce(Text  key,  Iterable<IntWritable>  values,  Context  context)  
        throws  IOException,  InterruptedException  {  
        int  sum  =  0;  
        for  (IntWritable  value  :  values)  {  
        sum  +=  value.get();  
        }  
//for循環遍歷,將得到的values值累加  
        result.set(sum);  
        System.out.println(sum);
        context.write(key,  result);  
        }  
    } 

 

map輸出的<key,value>先要經過shuffle過程把相同key值的所有value聚集起來形成<key,values>后交給reduce端。reduce端接收到<key,values>之后,將輸入的key直接復制給輸出的key,for循環遍歷values並求和,求和結果就是key值代表的單詞出現的總次,將其設置為value,直接輸出<key,value>

完整代碼:

 

import  java.io.IOException;  
import  java.util.StringTokenizer;  
import  org.apache.hadoop.fs.Path;  
import  org.apache.hadoop.io.IntWritable;  
import  org.apache.hadoop.io.Text;  
import  org.apache.hadoop.mapreduce.Job;  
import  org.apache.hadoop.mapreduce.Mapper;  
import  org.apache.hadoop.mapreduce.Reducer;  
import  org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import  org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
public  class  WordCount  {  
    public  static  void  main(String[]  args)  throws  IOException,  ClassNotFoundException,  InterruptedException  {  
        Job  job  =  Job.getInstance();  
        job.setJobName("WordCount");  
        job.setJarByClass(WordCount.class);  
        job.setMapperClass(doMapper.class);  
        job.setReducerClass(doReducer.class);  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(IntWritable.class);  
        Path  in  =  new  Path("hdfs://192.168.68.130:9000/user/hadoop/wordcount.txt"); //需要統計的文本所在位置 
        Path  out  = new Path("hdfs://192.168.68.130:9000/user/hadoop/output3");  //注意output3不能存在
        FileInputFormat.addInputPath(job,  in);  
        FileOutputFormat.setOutputPath(job,  out);  
        System.exit(job.waitForCompletion(true) ? 0  :  1);  
    }  
    public  static  class  doMapper  extends  Mapper<Object,  Text,  Text,  IntWritable>{  
        public  static  final  IntWritable  one  =  new  IntWritable(1);  
        public  static  Text  word  =  new  Text();  
        @Override  
        protected  void  map(Object  key,  Text  value,  Context  context)  
                    throws  IOException,  InterruptedException  {  
            StringTokenizer  tokenizer  =  new  StringTokenizer(value.toString(),  " ");  
            while(tokenizer.hasMoreTokens()) { 
                 word.set(tokenizer.nextToken());
                  context.write(word,  one);  
            }
        
        }  
    }  
    public  static  class  doReducer  extends  Reducer<Text,  IntWritable,  Text,  IntWritable>{  
        private  IntWritable  result  =  new  IntWritable();  
        @Override  
        protected  void  reduce(Text  key,  Iterable<IntWritable>  values,  Context  context)  
        throws  IOException,  InterruptedException  {  
        int  sum  =  0;  
        for  (IntWritable  value  :  values)  {  
        sum  +=  value.get();  
        }  
        result.set(sum);  
        System.out.println(sum);
        context.write(key,  result);  
        }  
    }  
}  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM