mapreduce(1)--wordcount的實現


1.需求

利用mapreduce編程框架編寫wordcount程序。

2.環境配置

(1)hadoop為本地模式

(2)pom文件代碼如下

<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>
 </dependencies>
View Code

 3.mapreduce介紹

(1)mapreduce結構

完整的mapreduce在分布式運行時有三類實例:MRAppMaster,MapTask,ReduceTask.

  • MRAppMaster是負責整個程序過程調度以及狀態協調,會根據需要創建一定數量的MapTask和ReduceTask
  • MapTask是負責map階段的整個數據處理,MapTask一般是對一行調用一次用戶自定義的map函數
  • ReduceTask是負責reduce階段的整個數據處理,ReduceTask一般是對一組<k,v>調用一次用戶自定義的reduce函數

(2)流程解析

  • 執行用戶的Driver類中的main函數,向MRAppMaster提交job
  • MRAppMaster啟動后決定maptask實例數量,進而啟動相應數量的maptask進程。(具體如何分配參見(3))
  • maptask啟動之后,一般是對一行調用一次用戶自定義的map函數
  • MRAppMaster在等待所有的maptask進程完成之后,會啟動相應數量的reducetask進程,並給每個reducetask指定數據范圍。
  • reducetask啟動之后,獲取maptask的輸出結果,並按照相同key為一組,對每一組調用一次我們重寫的reduce方法。處理完成后向外部文件寫結果。

(3)maptask的數量如何確定

最根本的因素:每個split分配一個maptask,也就是每個切片對應一個map任務

切片大小可以指定,但是最好和hdfs上的block大小一致(默認情況就是一致),這是因為如果不相等,maptask需要從其他datanode通過網絡傳輸數據,這樣就會受帶寬限制。

(4)如果文件大小小於blocksize,那么split大小是block大小還是文件大小?

在hdfs上存儲的文件如果大小小於blocksize,那么在hdfs上占用的空間是文件大小,blocksize的作用是在追加文件內容的時候決定何時進行划分(超過blocksize就要將文件分為兩個block)。

這樣總結起來,如果文件大小小於blocksize的部分,那么對應的split就是該部分文件的大小,其余部分是等於blocksize的。

4.wordcount程序介紹

在mapreduce框架下實現wordcount,需要自定義一個WordcountMapper繼承Mapper並重寫map方法自定義一個WordcountReducer繼承Reducer並重寫reduce方法,以及程序的main方法WordcountDriver。

5.wordcount代碼具體實現

(1)WordcountMapper

//1.定義四個泛型類型:KEYIN:LongWritable,VALUEIN:Text,
// KEYOUT:Text, VALUEOUT:IntWritable
//各個泛型類型與java中的對比:LongWritable->Long
//Text->String,IntWritable->Integer

//2.這里之所以定義這些泛型類型,是因為網絡傳輸需要序列化,精簡java中的序列化接口

//3.關於四個泛型作用:
// KEYIN(LongWritable類型):一行文本的開始位置,在整個文本的偏移量(用來確定當前是哪一行的,對於業務來說沒有用)
// VALUEIN(Text類型):讀到一行文本的內容,使用這個文本划分成單詞
// KEYOUT(Text類型):輸出的單詞
// VALUEOUT(IntWritable類型):單詞的統計次數

public
class WordcountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { //map階段的業務邏輯,被maptask調用 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將傳入的數據按空格分割成單詞 String[] words = value.toString().split(" "); //將單詞輸出為<單詞,1> for(String word:words){ //將單詞作為key,將次數1作為value //根據單詞的分發,相同的key會進入相同的reduce task中 //context是mr框架提供的上下文 //還注意要使用Text,IntWritable類型 context.write(new Text(word),new IntWritable(1)); } } }

(2)WordcountReducer類

//KEYINVALUEIN對應mapper輸出的KEYOUTVALUEOUT
//KEYOUTVALUEOUT是自定義的
//KEYOUT是單詞,VALUEOUT是單詞的總個數

public
class WordcountReducer extends Reducer<Text, IntWritable,Text,IntWritable> { //reduce函數會被reduceTask任務調用,每一組單詞調用一次 //每一組的意思就是[<hello,1>,<hello,1>,<hello,1>]... @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable value:values){ //value表示從迭代器中取出的值,是統計的個數,要使用.get()轉換函數轉換成int類型 count += value.get(); } //key是單詞,new IntWritable(count)是單詞的總個數 //其實context是將結果寫入文件當中,文件存儲在hdfs上 context.write(key,new IntWritable(count)); } }

 (3)WordcountDriver

public class WordcountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //指定本程序jar包所在的路徑
        job.setJarByClass(WordcountDriver.class); //利用反射指定job要使用的mapper業務類
        job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //利用反射,指定mapper輸出數據的kv類型
        job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //設置最終(也就是reduce)輸出的數據類型
        job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定job輸入原始文件所在目錄
        FileInputFormat.setInputPaths(job,new Path(args[0])); //指定job的輸出結果
        FileOutputFormat.setOutputPath(job,new Path(args[1])); //將job中配置的相關參數,提交給yarn運行 //等待集群完成工作
        boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }

6.打成jar包並運行

  • 打成jar包的操作步驟詳見:https://blog.csdn.net/sinat_33201781/article/details/80264828
  • 運行命令:hadoop jar [jar包路徑] [主類的相對路徑]  [輸入文件在hdfs上的路徑] [輸出文件在hdfs上的路徑]例如:hadoop jar /home/zhangjiaqian/IdeaProjects/STBigData/target/ChuanzhiBigdata-1.0-SNAPSHOT.jar cn.gulu.bigdata.mr.MRDemos.WordcountDriver /wordcount/input /wordcount/output
  • 運行過后可在hdfs上查看輸出文件
  • 查看輸出內容:hdfs dfs -cat /wordcount/output/part-r-00000

7.github鏈接:

  https://github.com/gulu2016/STBigData/tree/master/src/main/java/cn/gulu/bigdata/mr/MRDemos

8.練習:使用mr進行流量統計

   github鏈接:https://github.com/gulu2016/STBigData/tree/master/src/main/java/cn/gulu/bigdata/mr/flowsum

9.練習2:在8的基礎上對手機號以身份進行划分,即不同省份的手機號進入不同的reducetask中

  這里邊最主要的任務就是重寫Partitioner中的getPartition方法,使相同省份的手機號獲得同一個數字

  github鏈接:https://github.com/gulu2016/STBigData/tree/master/src/main/java/cn/gulu/bigdata/mr/flowProvinceSum

10.練習3:在8的基礎上對根據總流量從高到低進行排序

  這里需要考慮的是選取哪個值作為maptask的key,以及如何對key進行排序

  github鏈接:https://github.com/gulu2016/STBigData/tree/master/src/main/java/cn/gulu/bigdata/mr/flowCountSort


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM