1.需求
利用mapreduce編程框架編寫wordcount程序。
2.環境配置
(1)hadoop為本地模式
(2)pom文件代碼如下

<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> </dependencies>
3.mapreduce介紹
(1)mapreduce結構
完整的mapreduce在分布式運行時有三類實例:MRAppMaster,MapTask,ReduceTask.
- MRAppMaster是負責整個程序過程調度以及狀態協調,會根據需要創建一定數量的MapTask和ReduceTask
- MapTask是負責map階段的整個數據處理,MapTask一般是對一行調用一次用戶自定義的map函數
- ReduceTask是負責reduce階段的整個數據處理,ReduceTask一般是對一組<k,v>調用一次用戶自定義的reduce函數
(2)流程解析
- 執行用戶的Driver類中的main函數,向MRAppMaster提交job
- MRAppMaster啟動后決定maptask實例數量,進而啟動相應數量的maptask進程。(具體如何分配參見(3))
- maptask啟動之后,一般是對一行調用一次用戶自定義的map函數
- MRAppMaster在等待所有的maptask進程完成之后,會啟動相應數量的reducetask進程,並給每個reducetask指定數據范圍。
- reducetask啟動之后,獲取maptask的輸出結果,並按照相同key為一組,對每一組調用一次我們重寫的reduce方法。處理完成后向外部文件寫結果。
(3)maptask的數量如何確定
最根本的因素:每個split分配一個maptask,也就是每個切片對應一個map任務
切片大小可以指定,但是最好和hdfs上的block大小一致(默認情況就是一致),這是因為如果不相等,maptask需要從其他datanode通過網絡傳輸數據,這樣就會受帶寬限制。
(4)如果文件大小小於blocksize,那么split大小是block大小還是文件大小?
在hdfs上存儲的文件如果大小小於blocksize,那么在hdfs上占用的空間是文件大小,blocksize的作用是在追加文件內容的時候決定何時進行划分(超過blocksize就要將文件分為兩個block)。
這樣總結起來,如果文件大小小於blocksize的部分,那么對應的split就是該部分文件的大小,其余部分是等於blocksize的。
4.wordcount程序介紹
在mapreduce框架下實現wordcount,需要自定義一個WordcountMapper繼承Mapper並重寫map方法,自定義一個WordcountReducer繼承Reducer並重寫reduce方法,以及程序的main方法WordcountDriver。
5.wordcount代碼具體實現
(1)WordcountMapper類
//1.定義四個泛型類型:KEYIN:LongWritable,VALUEIN:Text,
// KEYOUT:Text, VALUEOUT:IntWritable
//各個泛型類型與java中的對比:LongWritable->Long
//Text->String,IntWritable->Integer
//2.這里之所以定義這些泛型類型,是因為網絡傳輸需要序列化,精簡java中的序列化接口
//3.關於四個泛型作用:
// KEYIN(LongWritable類型):一行文本的開始位置,在整個文本的偏移量(用來確定當前是哪一行的,對於業務來說沒有用)
// VALUEIN(Text類型):讀到一行文本的內容,使用這個文本划分成單詞
// KEYOUT(Text類型):輸出的單詞
// VALUEOUT(IntWritable類型):單詞的統計次數
public class WordcountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { //map階段的業務邏輯,被maptask調用
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將傳入的數據按空格分割成單詞
String[] words = value.toString().split(" "); //將單詞輸出為<單詞,1>
for(String word:words){ //將單詞作為key,將次數1作為value //根據單詞的分發,相同的key會進入相同的reduce task中 //context是mr框架提供的上下文 //還注意要使用Text,IntWritable類型
context.write(new Text(word),new IntWritable(1)); } } }
(2)WordcountReducer類
//KEYIN,VALUEIN對應mapper輸出的KEYOUT,VALUEOUT
//KEYOUT,VALUEOUT是自定義的
//KEYOUT是單詞,VALUEOUT是單詞的總個數
public class WordcountReducer extends Reducer<Text, IntWritable,Text,IntWritable> { //reduce函數會被reduceTask任務調用,每一組單詞調用一次 //每一組的意思就是[<hello,1>,<hello,1>,<hello,1>]...
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable value:values){ //value表示從迭代器中取出的值,是統計的個數,要使用.get()轉換函數轉換成int類型
count += value.get(); } //key是單詞,new IntWritable(count)是單詞的總個數 //其實context是將結果寫入文件當中,文件存儲在hdfs上
context.write(key,new IntWritable(count)); } }
(3)WordcountDriver類
public class WordcountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //指定本程序jar包所在的路徑
job.setJarByClass(WordcountDriver.class); //利用反射指定job要使用的mapper業務類
job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //利用反射,指定mapper輸出數據的kv類型
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //設置最終(也就是reduce)輸出的數據類型
job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定job輸入原始文件所在目錄
FileInputFormat.setInputPaths(job,new Path(args[0])); //指定job的輸出結果
FileOutputFormat.setOutputPath(job,new Path(args[1])); //將job中配置的相關參數,提交給yarn運行 //等待集群完成工作
boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
6.打成jar包並運行
- 打成jar包的操作步驟詳見:https://blog.csdn.net/sinat_33201781/article/details/80264828
- 運行命令:hadoop jar [jar包路徑] [主類的相對路徑] [輸入文件在hdfs上的路徑] [輸出文件在hdfs上的路徑]例如:hadoop jar /home/zhangjiaqian/IdeaProjects/STBigData/target/ChuanzhiBigdata-1.0-SNAPSHOT.jar cn.gulu.bigdata.mr.MRDemos.WordcountDriver /wordcount/input /wordcount/output
- 運行過后可在hdfs上查看輸出文件
- 查看輸出內容:hdfs dfs -cat /wordcount/output/part-r-00000
7.github鏈接:
https://github.com/gulu2016/STBigData/tree/master/src/main/java/cn/gulu/bigdata/mr/MRDemos
8.練習:使用mr進行流量統計
github鏈接:https://github.com/gulu2016/STBigData/tree/master/src/main/java/cn/gulu/bigdata/mr/flowsum
9.練習2:在8的基礎上對手機號以身份進行划分,即不同省份的手機號進入不同的reducetask中
這里邊最主要的任務就是重寫Partitioner中的getPartition方法,使相同省份的手機號獲得同一個數字
github鏈接:https://github.com/gulu2016/STBigData/tree/master/src/main/java/cn/gulu/bigdata/mr/flowProvinceSum
10.練習3:在8的基礎上對根據總流量從高到低進行排序
這里需要考慮的是選取哪個值作為maptask的key,以及如何對key進行排序
github鏈接:https://github.com/gulu2016/STBigData/tree/master/src/main/java/cn/gulu/bigdata/mr/flowCountSort