一、排序
排序: 需求:根據用戶每月使用的流量按照使用的流量多少排序 接口-->WritableCompareable 排序操作在hadoop中屬於默認的行為。默認按照字典殊勛排序。 排序的分類: 1)部分排序 2)全排序 3)輔助排序 4)二次排序 Combiner 合並 父類Reducer 局部匯總 ,減少網絡傳輸量 ,進而優化程序。 注意:求平均值? 3 5 7 2 6 mapper: (3 + 5 + 7)/3 = 5 (2 + 6)/2 = 4 reducer:(5+4)/2 只能應用在不影響最終業務邏輯的情況下
二、分區和排序實例
1.Mapper類
package com.css.flowsort; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.獲取一行數據 String line = value.toString(); // 2.切割 String[] fields = line.split("\t"); // 3.取出關鍵字段 long upFlow = Long.parseLong(fields[1]); long dfFlow = Long.parseLong(fields[2]); // 4.寫出到reducer階段 context.write(new FlowBean(upFlow, dfFlow), new Text(fields[0])); } }
2.Reducer類
package com.css.flowsort; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{ @Override protected void reduce(FlowBean key, Iterable<Text> value, Context context) throws IOException, InterruptedException { context.write(value.iterator().next(), key); } }
3.封裝類
package com.css.flowsort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; // 封裝類 直接完成排序 public class FlowBean implements WritableComparable<FlowBean> { // 定義屬性 private long upFlow; private long dfFlow; private long flowSum; // 無參構造 public FlowBean() { } // 有參構造 public FlowBean(long upFlow,long dfFlow){ this.upFlow = upFlow; this.dfFlow = dfFlow; this.flowSum = upFlow + dfFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDfFlow() { return dfFlow; } public void setDfFlow(long dfFlow) { this.dfFlow = dfFlow; } public long getFlowSum() { return flowSum; } public void setFlowSum(long flowSum) { this.flowSum = flowSum; } // 反序列化 @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); dfFlow = in.readLong(); flowSum = in.readLong(); } // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dfFlow); out.writeLong(flowSum); } @Override public String toString() { return upFlow + "\t" + dfFlow + "\t" + flowSum; } // 排序 @Override public int compareTo(FlowBean o) { // 倒序 return this.flowSum > o.getFlowSum() ? -1 : 1; } }
4.自定義分區類
package com.css.flowsort; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class FlowSortPartitioner extends Partitioner<FlowBean, Text>{ // 根據手機號前三位進行分區 @Override public int getPartition(FlowBean key, Text value, int numPartitions) { // 獲取手機號前三位 String phoneNum = value.toString().substring(0, 3); // 分區 int partitioner = 4; if ("135".equals(phoneNum)) { return 0; }else if ("137".equals(phoneNum)) { return 1; }else if ("138".equals(phoneNum)) { return 2; }else if ("139".equals(phoneNum)) { return 3; } return partitioner; } }
5.Driver類
package com.css.flowsort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowSortDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1.獲取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.獲取jar包 job.setJarByClass(FlowSortDriver.class); // 3.獲取自定義的mapper與reducer類 job.setMapperClass(FlowSortMapper.class); job.setReducerClass(FlowSortReducer.class); // 4.設置map輸出的數據類型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 5.設置reduce輸出的數據類型(最終的數據類型) job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //添加自定義分區 job.setPartitionerClass(FlowSortPartitioner.class); job.setNumReduceTasks(5); // 6.設置輸入存在的路徑與處理后的結果路徑 FileInputFormat.setInputPaths(job, new Path("c:/flow1024/in")); FileOutputFormat.setOutputPath(job, new Path("c:/flow1024/out1")); // 7.提交任務 boolean rs = job.waitForCompletion(true); System.out.println(rs ? 0 : 1); } }
6.輸入的文件part-r-00000
13480253104 120 1320 1440 13502468823 735 11349 12084 13510439658 1116 954 2070 13560436326 1136 94 1230 13560436666 1136 94 1230 13560439658 918 4938 5856 13602846565 198 910 1108 13660577991 660 690 1350 13719199419 240 0 240 13726130503 299 681 980 13726238888 2481 24681 27162 13760778710 120 120 240 13822544101 264 0 264 13884138413 4116 1432 5548 13922314466 3008 3720 6728 13925057413 11058 4243 15301 13926251106 240 0 240 13926435656 132 1512 1644 15013685858 369 338 707 15889002119 938 380 1318 15920133257 316 296 612 18212575961 1527 2106 3633 18320173382 9531 212 9743
7.如果第5步Driver類中的紅色部分去掉,則輸出全局排序后的文件part-r-00000
13726238888 2481 24681 27162 13925057413 11058 4243 15301 13502468823 735 11349 12084 18320173382 9531 212 9743 13922314466 3008 3720 6728 13560439658 918 4938 5856 13884138413 4116 1432 5548 18212575961 1527 2106 3633 13510439658 1116 954 2070 13926435656 132 1512 1644 13480253104 120 1320 1440 13660577991 660 690 1350 15889002119 938 380 1318 13560436326 1136 94 1230 13560436666 1136 94 1230 13602846565 198 910 1108 13726130503 299 681 980 15013685858 369 338 707 15920133257 316 296 612 13822544101 264 0 264 13760778710 120 120 240 13719199419 240 0 240 13926251106 240 0 240
8.如果第5步Driver類中的紅色部分不去掉,則輸出分區加排序后的文件
(1)part-r-00000 13502468823 735 11349 12084 13560439658 918 4938 5856 13510439658 1116 954 2070 13560436666 1136 94 1230 13560436326 1136 94 1230 (2)part-r-00001 13726238888 2481 24681 27162 13726130503 299 681 980 13760778710 120 120 240 13719199419 240 0 240 (3)part-r-00002 13884138413 4116 1432 5548 13822544101 264 0 264 (4)part-r-00003 13925057413 11058 4243 15301 13922314466 3008 3720 6728 13926435656 132 1512 1644 13926251106 240 0 240 (5)part-r-00004 18320173382 9531 212 9743 18212575961 1527 2106 3633 13480253104 120 1320 1440 13660577991 660 690 1350 15889002119 938 380 1318 13602846565 198 910 1108 15013685858 369 338 707 15920133257 316 296 612