一學生成績---增強版
數據信息

computer,huangxiaoming,85,86,41,75,93,42,85 computer,xuzheng,54,52,86,91,42 computer,huangbo,85,42,96,38 english,zhaobenshan,54,52,86,91,42,85,75 english,liuyifei,85,41,75,21,85,96,14 algorithm,liuyifei,75,85,62,48,54,96,15 computer,huangjiaju,85,75,86,85,85 english,liuyifei,76,95,86,74,68,74,48 english,huangdatou,48,58,67,86,15,33,85 algorithm,huanglei,76,95,86,74,68,74,48 algorithm,huangjiaju,85,75,86,85,85,74,86 computer,huangdatou,48,58,67,86,15,33,85 english,zhouqi,85,86,41,75,93,42,85,75,55,47,22 english,huangbo,85,42,96,38,55,47,22 algorithm,liutao,85,75,85,99,66 computer,huangzitao,85,86,41,75,93,42,85 math,wangbaoqiang,85,86,41,75,93,42,85 computer,liujialing,85,41,75,21,85,96,14,74,86 computer,liuyifei,75,85,62,48,54,96,15 computer,liutao,85,75,85,99,66,88,75,91 computer,huanglei,76,95,86,74,68,74,48 english,liujialing,75,85,62,48,54,96,15 math,huanglei,76,95,86,74,68,74,48 math,huangjiaju,85,75,86,85,85,74,86 math,liutao,48,58,67,86,15,33,85 english,huanglei,85,75,85,99,66,88,75,91 math,xuzheng,54,52,86,91,42,85,75 math,huangxiaoming,85,75,85,99,66,88,75,91 math,liujialing,85,86,41,75,93,42,85,75 english,huangxiaoming,85,86,41,75,93,42,85 algorithm,huangdatou,48,58,67,86,15,33,85 algorithm,huangzitao,85,86,41,75,93,42,85,75
數據解釋
數據字段個數不固定:
第一個是課程名稱,總共四個課程,computer,math,english,algorithm,
第二個是學生姓名,后面是每次考試的分數
統計需求
1、統計每門課程的參考人數和課程平均分
2、統計每門課程參考學生的平均分,並且按課程存入不同的結果文件,要求一門課程一個結果文件,並且按平均分從高到低排序,分數保留一位小數
3、求出每門課程參考學生成績最高的學生的信息:課程,姓名和平均分
第一題
MRAvgScore1.java

/** * 需求:統計每門課程的參考人數和課程平均分 * */ public class MRAvgScore1 { public static void main(String[] args) throws Exception { Configuration conf1 = new Configuration(); Configuration conf2 = new Configuration(); Job job1 = Job.getInstance(conf1); Job job2 = Job.getInstance(conf2); job1.setJarByClass(MRAvgScore1.class); job1.setMapperClass(AvgScoreMapper1.class); //job.setReducerClass(MFReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(DoubleWritable.class); Path inputPath1 = new Path("D:\\MR\\hw\\work3\\input"); Path outputPath1 = new Path("D:\\MR\\hw\\work3\\output_hw1_1"); FileInputFormat.setInputPaths(job1, inputPath1); FileOutputFormat.setOutputPath(job1, outputPath1); job2.setMapperClass(AvgScoreMapper2.class); job2.setReducerClass(AvgScoreReducer2.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(DoubleWritable.class); Path inputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw1_1"); Path outputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw1_end"); FileInputFormat.setInputPaths(job2, inputPath2); FileOutputFormat.setOutputPath(job2, outputPath2); JobControl control = new JobControl("AvgScore"); ControlledJob aJob = new ControlledJob(job1.getConfiguration()); ControlledJob bJob = new ControlledJob(job2.getConfiguration()); bJob.addDependingJob(aJob); control.addJob(aJob); control.addJob(bJob); Thread thread = new Thread(control); thread.start(); while(!control.allFinished()) { thread.sleep(1000); } System.exit(0); } /** * 數據類型:computer,huangxiaoming,85,86,41,75,93,42,85 * * 需求:統計每門課程的參考人數和課程平均分 * * 分析:以課程名稱+姓名作為key,以平均分數作為value * */ public static class AvgScoreMapper1 extends Mapper<LongWritable, Text, Text, DoubleWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splits = value.toString().split(","); //拼接成要輸出的key String outKey = splits[0]+"\t"+splits[1]; int length = splits.length; int sum = 0; //求出成績的總和 for(int i=2;i<length;i++) { sum += Integer.parseInt(splits[i]); } //求出平均分 double outValue = sum / (length - 2); context.write(new Text(outKey), new DoubleWritable(outValue)); } } /** * 對第一次MapReduce輸出的結果進一步計算,第一步輸出結果樣式為 * math huangjiaju 82.0 * math huanglei 74.0 * math huangxiaoming 83.0 * math liujialing 72.0 * math liutao 56.0 * math wangbaoqiang 72.0 * math xuzheng 69.0 * * 需求:統計每門課程的參考人數和課程平均分 * 分析:以課程名稱作為key,以分數作為value進行 輸出 * * */ public static class AvgScoreMapper2 extends Mapper<LongWritable, Text, Text, DoubleWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splits = value.toString().split("\t"); String outKey = splits[0]; String outValue = splits[2]; context.write(new Text(outKey), new DoubleWritable(Double.parseDouble(outValue))); } } /** * 針對同一門課程,對values進行遍歷計數,看看有多少人參加了考試,並計算出平均成績 * */ public static class AvgScoreReducer2 extends Reducer<Text, DoubleWritable, Text, Text>{ @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { int count = 0; double sum = 0; for(DoubleWritable value : values) { count++; sum += value.get(); } double avg = sum / count; String outValue = count + "\t" + avg; context.write(key, new Text(outValue)); } } }
第二題
MRAvgScore2.java

public class MRAvgScore2 { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MRAvgScore2.class); job.setMapperClass(ScoreMapper3.class); job.setReducerClass(ScoreReducer3.class); job.setOutputKeyClass(StudentBean.class); job.setOutputValueClass(NullWritable.class); job.setPartitionerClass(CoursePartitioner.class); job.setNumReduceTasks(4); Path inputPath = new Path("D:\\MR\\hw\\work3\\output_hw1_1"); Path outputPath = new Path("D:\\MR\\hw\\work3\\output_hw2_1"); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); boolean isDone = job.waitForCompletion(true); System.exit(isDone ? 0 : 1); } public static class ScoreMapper3 extends Mapper<LongWritable, Text, StudentBean, NullWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splits = value.toString().split("\t"); double score = Double.parseDouble(splits[2]); DecimalFormat df = new DecimalFormat("#.0"); df.format(score); StudentBean student = new StudentBean(splits[0],splits[1],score); context.write(student, NullWritable.get()); } } public static class ScoreReducer3 extends Reducer<StudentBean, NullWritable, StudentBean, NullWritable>{ @Override protected void reduce(StudentBean key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException { for(NullWritable nvl : values){ context.write(key, nvl); } } } }
StudentBean.java

public class StudentBean implements WritableComparable<StudentBean>{ private String course; private String name; private double avgScore; public String getCourse() { return course; } public void setCourse(String course) { this.course = course; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getavgScore() { return avgScore; } public void setavgScore(double avgScore) { this.avgScore = avgScore; } public StudentBean(String course, String name, double avgScore) { super(); this.course = course; this.name = name; this.avgScore = avgScore; } public StudentBean() { super(); } @Override public String toString() { return course + "\t" + name + "\t" + avgScore; } @Override public void readFields(DataInput in) throws IOException { course = in.readUTF(); name = in.readUTF(); avgScore = in.readDouble(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(course); out.writeUTF(name); out.writeDouble(avgScore); } @Override public int compareTo(StudentBean stu) { double diffent = this.avgScore - stu.avgScore; if(diffent == 0) { return 0; }else { return diffent > 0 ? -1 : 1; } } }
第三題
MRScore3.java

二影評案例
數據及需求
數據格式
movies.dat 3884條數據
1::Toy Story (1995)::Animation|Children's|Comedy 2::Jumanji (1995)::Adventure|Children's|Fantasy 3::Grumpier Old Men (1995)::Comedy|Romance 4::Waiting to Exhale (1995)::Comedy|Drama 5::Father of the Bride Part II (1995)::Comedy 6::Heat (1995)::Action|Crime|Thriller 7::Sabrina (1995)::Comedy|Romance 8::Tom and Huck (1995)::Adventure|Children's 9::Sudden Death (1995)::Action 10::GoldenEye (1995)::Action|Adventure|Thriller
users.dat 6041條數據
1::F::1::10::48067 2::M::56::16::70072 3::M::25::15::55117 4::M::45::7::02460 5::M::25::20::55455 6::F::50::9::55117 7::M::35::1::06810 8::M::25::12::11413 9::M::25::17::61614 10::F::35::1::95370
ratings.dat 1000210條數據
1::1193::5::978300760 1::661::3::978302109 1::914::3::978301968 1::3408::4::978300275 1::2355::5::978824291 1::1197::3::978302268 1::1287::5::978302039 1::2804::5::978300719 1::594::4::978302268 1::919::4::978301368
數據解釋
1、users.dat 數據格式為: 2::M::56::16::70072
對應字段為:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
對應字段中文解釋:用戶id,性別,年齡,職業,郵政編碼
2、movies.dat 數據格式為: 2::Jumanji (1995)::Adventure|Children's|Fantasy
對應字段為:MovieID BigInt, Title String, Genres String
對應字段中文解釋:電影ID,電影名字,電影類型
3、ratings.dat 數據格式為: 1::1193::5::978300760
對應字段為:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
對應字段中文解釋:用戶ID,電影ID,評分,評分時間戳
用戶ID,電影ID,評分,評分時間戳,性別,年齡,職業,郵政編碼,電影名字,電影類型
userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType
需求統計
(1)求被評分次數最多的10部電影,並給出評分次數(電影名,評分次數)
(2)分別求男性,女性當中評分最高的10部電影(性別,電影名,評分)
(3)求movieid = 2116這部電影各年齡段(因為年齡就只有7個,就按這個7個分就好了)的平均影評(年齡段,評分)
(4)求最喜歡看電影(影評次數最多)的那位女性評最高分的10部電影的平均影評分(人,電影名,影評)
(5)求好片(評分>=4.0)最多的那個年份的最好看的10部電影
(6)求1997年上映的電影中,評分最高的10部Comedy類電影
(7)該影評庫中各種類型電影中評價最高的5部電影(類型,電影名,平均影評分)
(8)各年評分最高的電影類型(年份,類型,影評分)
(9)每個地區最高評分的電影名,把結果存入HDFS(地區,電影名,電影評分)
代碼實現
1、求被評分次數最多的10部電影,並給出評分次數(電影名,評分次數)
分析:此問題涉及到2個文件,ratings.dat和movies.dat,2個文件數據量傾斜比較嚴重,此處應該使用mapjoin方法,先將數據量較小的文件預先加載到內存中
MovieMR1_1.java

MovieMR1_2.java

MovieRating.java

2、分別求男性,女性當中評分最高的10部電影(性別,電影名,評分)
分析:此問題涉及到3個表的聯合查詢,需要先將2個小表的數據預先加載到內存中,再進行查詢
對三表進行聯合
MoviesThreeTableJoin.java

三表聯合之后的數據為
1000::1023::5::975041651::Winnie the Pooh and the Blustery Day (1968)::Animation|Children's::F::25::6::90027 1000::1029::3::975041859::Dumbo (1941)::Animation|Children's|Musical::F::25::6::90027 1000::1036::4::975040964::Die Hard (1988)::Action|Thriller::F::25::6::90027 1000::1104::5::975042421::Streetcar Named Desire, A (1951)::Drama::F::25::6::90027 1000::110::5::975040841::Braveheart (1995)::Action|Drama|War::F::25::6::90027 1000::1196::3::975040841::Star Wars: Episode V - The Empire Strikes Back (1980)::Action|Adventure|Drama|Sci-Fi|War::F::25::6::90027 1000::1198::5::975040841::Raiders of the Lost Ark (1981)::Action|Adventure::F::25::6::90027 1000::1200::4::975041125::Aliens (1986)::Action|Sci-Fi|Thriller|War::F::25::6::90027 1000::1201::5::975041025::Good, The Bad and The Ugly, The (1966)::Action|Western::F::25::6::90027 1000::1210::5::975040629::Star Wars: Episode VI - Return of the Jedi (1983)::Action|Adventure|Romance|Sci-Fi|War::F::25::6::90027
字段解釋
1000 :: 1036 :: 4 :: 975040964 :: Die Hard (1988) :: Action|Thriller :: F :: 25 :: 6 :: 90027 用戶ID 電影ID 評分 評分時間戳 電影名字 電影類型 性別 年齡 職業 郵政編碼
0 1 2 3 4 5 6 7 8 9
要分別求男性,女性當中評分最高的10部電影(性別,電影名,評分)
1、以性別和電影名分組,以電影名+性別為key,以評分為value進行計算;
2、以性別+電影名+評分作為對象,以性別分組,以評分降序進行輸出TOP10
業務邏輯:MoviesDemo2.java

對象:MoviesSexBean.java

分組:MoviesSexGC.java

3、求movieid = 2116這部電影各年齡段(因為年齡就只有7個,就按這個7個分就好了)的平均影評(年齡段,評分)
以第二部三表聯合之后的文件進行操作

4、求最喜歡看電影(影評次數最多)的那位女性評最高分的10部電影的平均影評分(人,電影名,影評)
1000 :: 1036 :: 4 :: 975040964 :: Die Hard (1988) :: Action|Thriller :: F :: 25 :: 6 :: 90027 用戶ID 電影ID 評分 評分時間戳 電影名字 電影類型 性別 年齡 職業 郵政編碼 0 1 2 3 4 5 6 7 8 9
(1)求出評論次數最多的女性ID
MoviesDemo4_1.java

三 銷售數據
第一題
下面是三種商品的銷售數據
要求:根據以上數據,用 MapReduce 統計出如下數據:
1、每種商品的銷售總金額,並降序排序
2、每種商品銷售額最多的三周
第二題:MapReduce 題
現有如下數據文件需要處理:
格式:CSV
數據樣例:
user_a,location_a,2018-01-01 08:00:00,60
user_a,location_a,2018-01-01 09:00:00,60
user_a,location_b,2018-01-01 10:00:00,60
user_a,location_a,2018-01-01 11:00:00,60
字段:用戶 ID,位置 ID,開始時間,停留時長(分鍾)
數據意義:某個用戶在某個位置從某個時刻開始停留了多長時間
處理邏輯: 對同一個用戶,在同一個位置,連續的多條記錄進行合並
合並原則:開始時間取最早的,停留時長加和
要求:請編寫 MapReduce 程序實現
其他:只有數據樣例,沒有數據。
UserLocationMR.java

UserLocation.java

UserLocationGC.java

第三題:MapReduce 題--倒排索引
概念: 倒排索引(Inverted Index),也常被稱為反向索引、置入檔案或反向檔案,是一種索引方法, 被用來存儲在全文搜索下某個單詞在一個文檔或者一組文檔中的存儲位置的映射。它是文檔 檢索系統中最常用的數據結構。了解詳情可自行百度
有兩份數據:
mapreduce-4-1.txt
huangbo love xuzheng huangxiaoming love baby huangxiaoming love yangmi liangchaowei love liujialing huangxiaoming xuzheng huangbo wangbaoqiang
mapreduce-4-2.txt
hello huangbo hello xuzheng hello huangxiaoming
題目一:編寫 MapReduce 求出以下格式的結果數據:統計每個關鍵詞在每個文檔中當中的 第幾行出現了多少次 例如,huangxiaoming 關鍵詞的格式:
huangixaoming mapreduce-4-1.txt:2,2; mapreduce-4-1.txt:4,1;mapreduce-4-2.txt:3,1
以上答案的意義:
關鍵詞 huangxiaoming 在第一份文檔 mapreduce-4-1.txt 中的第 2 行出現了 2 次 關鍵詞 huangxiaoming 在第一份文檔 mapreduce-4-1.txt 中的第 4 行出現了 1 次 關鍵詞 huangxiaoming 在第二份文檔 mapreduce-4-2.txt 中的第 3 行出現了 1 次
題目二:編寫 MapReduce 程序求出每個關鍵詞在每個文檔出現了多少次,並且按照出現次 數降序排序
例如:
huangixaoming mapreduce-4-1.txt,3;mapreduce-4-2.txt,1
以上答案的含義: 表示關鍵詞 huangxiaoming 在第一份文檔 mapreduce-4-1.txt 中出現了 3 次,在第二份文檔mapreduce-4-2.txt 中出現了 1 次
四求共同好友
數據格式
A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A,O J:B,O K:A,C,D L:D,E,F M:E,F,G O:A,H,I,J,K
以上是數據:
A:B,C,D,F,E,O
表示:B,C,D,E,F,O是A用戶的好友。
1 public class SharedFriend { 2 /* 3 第一階段的map函數主要完成以下任務 4 1.遍歷原始文件中每行<所有朋友>信息 5 2.遍歷“朋友”集合,以每個“朋友”為鍵,原來的“人”為值 即輸出<朋友,人> 6 */ 7 static class SharedFriendMapper01 extends Mapper<LongWritable, Text, Text, Text>{ 8 @Override 9 protected void map(LongWritable key, Text value,Context context) 10 throws IOException, InterruptedException { 11 String line = value.toString(); 12 String[] person_friends = line.split(":"); 13 String person = person_friends[0]; 14 String[] friends = person_friends[1].split(","); 15 16 for(String friend : friends){ 17 context.write(new Text(friend), new Text(person)); 18 } 19 } 20 } 21 22 /* 23 第一階段的reduce函數主要完成以下任務 24 1.對所有傳過來的<朋友,list(人)>進行拼接,輸出<朋友,擁有這名朋友的所有人> 25 */ 26 static class SharedFriendReducer01 extends Reducer<Text, Text, Text, Text>{ 27 @Override 28 protected void reduce(Text key, Iterable<Text> values,Context context) 29 throws IOException, InterruptedException { 30 StringBuffer sb = new StringBuffer(); 31 for(Text friend : values){ 32 sb.append(friend.toString()).append(","); 33 } 34 sb.deleteCharAt(sb.length()-1); 35 context.write(key, new Text(sb.toString())); 36 } 37 } 38 39 /* 40 第二階段的map函數主要完成以下任務 41 1.將上一階段reduce輸出的<朋友,擁有這名朋友的所有人>信息中的 “擁有這名朋友的所有人”進行排序 ,以防出現B-C C-B這樣的重復 42 2.將 “擁有這名朋友的所有人”進行兩兩配對,並將配對后的字符串當做鍵,“朋友”當做值輸出,即輸出<人-人,共同朋友> 43 */ 44 static class SharedFriendMapper02 extends Mapper<LongWritable, Text, Text, Text>{ 45 @Override 46 protected void map(LongWritable key, Text value,Context context) 47 throws IOException, InterruptedException { 48 String line = value.toString(); 49 String[] friend_persons = line.split("\t"); 50 String friend = friend_persons[0]; 51 String[] persons = friend_persons[1].split(","); 52 Arrays.sort(persons); //排序 53 54 //兩兩配對 55 for(int i=0;i<persons.length-1;i++){ 56 for(int j=i+1;j<persons.length;j++){ 57 context.write(new Text(persons[i]+"-"+persons[j]+":"), new Text(friend)); 58 } 59 } 60 } 61 } 62 63 /* 64 第二階段的reduce函數主要完成以下任務 65 1.<人-人,list(共同朋友)> 中的“共同好友”進行拼接 最后輸出<人-人,兩人的所有共同好友> 66 */ 67 static class SharedFriendReducer02 extends Reducer<Text, Text, Text, Text>{ 68 @Override 69 protected void reduce(Text key, Iterable<Text> values,Context context) 70 throws IOException, InterruptedException { 71 StringBuffer sb = new StringBuffer(); 72 Set<String> set = new HashSet<String>(); 73 for(Text friend : values){ 74 if(!set.contains(friend.toString())) 75 set.add(friend.toString()); 76 } 77 for(String friend : set){ 78 sb.append(friend.toString()).append(","); 79 } 80 sb.deleteCharAt(sb.length()-1); 81 82 context.write(key, new Text(sb.toString())); 83 } 84 } 85 86 public static void main(String[] args)throws Exception { 87 Configuration conf = new Configuration(); 88 89 //第一階段 90 Job job1 = Job.getInstance(conf); 91 job1.setJarByClass(SharedFriend.class); 92 job1.setMapperClass(SharedFriendMapper01.class); 93 job1.setReducerClass(SharedFriendReducer01.class); 94 95 job1.setOutputKeyClass(Text.class); 96 job1.setOutputValueClass(Text.class); 97 98 FileInputFormat.setInputPaths(job1, new Path("H:/大數據/mapreduce/sharedfriend/input")); 99 FileOutputFormat.setOutputPath(job1, new Path("H:/大數據/mapreduce/sharedfriend/output")); 100 101 boolean res1 = job1.waitForCompletion(true); 102 103 //第二階段 104 Job job2 = Job.getInstance(conf); 105 job2.setJarByClass(SharedFriend.class); 106 job2.setMapperClass(SharedFriendMapper02.class); 107 job2.setReducerClass(SharedFriendReducer02.class); 108 109 job2.setOutputKeyClass(Text.class); 110 job2.setOutputValueClass(Text.class); 111 112 FileInputFormat.setInputPaths(job2, new Path("H:/大數據/mapreduce/sharedfriend/output")); 113 FileOutputFormat.setOutputPath(job2, new Path("H:/大數據/mapreduce/sharedfriend/output01")); 114 115 boolean res2 = job2.waitForCompletion(true); 116 117 System.exit(res1?0:1); 118 } 119 }
第一階段輸出結果

第二階段輸出結果
