一、MR排序的分類
1.部分排序:MR會根據自己輸出記錄的KV對數據進行排序,保證輸出到每一個文件內存都是經過排序的;
2.全局排序;
3.輔助排序:再第一次排序后經過分區再排序一次;
4.二次排序:經過一次排序后又根據業務邏輯再次進行排序。
二、MR排序的接口——WritableComparable
該接口繼承了Hadoop的Writable接口和Java的Comparable接口,實現該接口要重寫write、readFields、compareTo三個方法。
三、流量統計案例的排序與分區
/**
* @author: PrincessHug
* @date: 2019/3/24, 15:36
* @Blog: https://www.cnblogs.com/HelloBigTable/
*/
public class FlowSortBean implements WritableComparable<FlowSortBean> {
private long upFlow;
private long dwFlow;
private long flowSum;
public FlowSortBean() {
}
public FlowSortBean(long upFlow, long dwFlow) {
this.upFlow = upFlow;
this.dwFlow = dwFlow;
this.flowSum = upFlow + dwFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDwFlow() {
return dwFlow;
}
public void setDwFlow(long dwFlow) {
this.dwFlow = dwFlow;
}
public long getFlowSum() {
return flowSum;
}
public void setFlowSum(long flowSum) {
this.flowSum = flowSum;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dwFlow);
out.writeLong(flowSum);
}
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
dwFlow = in.readLong();
flowSum = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + dwFlow + "\t" + flowSum;
}
@Override
public int compareTo(FlowSortBean o) {
return this.flowSum > o.getFlowSum() ? -1:1;
}
}
public class FlowSortMapper extends Mapper<LongWritable, Text,FlowSortBean,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//獲取數據
String line = value.toString();
//切分數據
String[] fields = line.split("\t");
//封裝數據
long upFlow = Long.parseLong(fields[1]);
long dwFlow = Long.parseLong(fields[2]);
//傳輸數據
context.write(new FlowSortBean(upFlow,dwFlow),new Text(fields[0]));
}
}
public class FlowSortReducer extends Reducer<FlowSortBean,Text,Text,FlowSortBean> {
@Override
protected void reduce(FlowSortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(values.iterator().next(),key);
}
}
public class FlowSortPartitioner extends Partitioner<FlowSortBean, Text> {
@Override
public int getPartition(FlowSortBean key, Text value, int i) {
String phoneNum = value.toString().substring(0, 3);
int partition = 4;
if ("135".equals(phoneNum)){
return 0;
}else if ("137".equals(phoneNum)){
return 1;
}else if ("138".equals(phoneNum)){
return 2;
}else if ("139".equals(phoneNum)){
return 3;
}
return partition;
}
}
public class FlowSortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//設置配置,初始化Job類
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//設置執行類
job.setJarByClass(FlowSortDriver.class);
//設置Mapper、Reducer類
job.setMapperClass(FlowSortMapper.class);
job.setReducerClass(FlowSortReducer.class);
//設置Mapper輸出數據類型
job.setMapOutputKeyClass(FlowSortBean.class);
job.setMapOutputValueClass(Text.class);
//設置Reducer輸出數據類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowSortBean.class);
//設置自定義分區
job.setPartitionerClass(FlowSortPartitioner.class);
job.setNumReduceTasks(5);
//設置文件輸入輸出類型
FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\flowsort\\in"));
FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\flowsort\\partitionout"));
//提交任務
if (job.waitForCompletion(true)){
System.out.println("運行完成!");
}else {
System.out.println("運行失敗!");
}
}
}
注意:再寫Mapper類的時候,要注意KV對輸出的數據類型,Key的類型一定要為FlowSortBean,因為在Mapper和Reducer之間進行的排序(只是排序)是通過Mapper輸出的Key來進行排序的,而分區可以指定是通過Key或者Value。
四、Combiner合並
Combiner是在MR之外的一個組件,可以用來在maptask輸出到環形緩沖區溢寫之后,分區排序完成時進行局部的匯總,可以減少網絡傳輸量,進而優化MR程序。
Combiner是用在當數據量到達一定規模之后的,小的數據量並不是很明顯。
例如WordCount程序,當單詞文件的大小到達一定程度,可以使用自定義Combiner進行優化:
public class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{
protected void reduce(Text key,Iterable<IntWritable> values,Context context){
//計數
int count = 0;
//累加求和
for(IntWritable v:values){
count += v.get();
}
//輸出
context.write(key,new IntWritable(count));
}
}
然后再Driver類中設置使用Combiner類
job.setCombinerClass(WordCountCombiner.class);
如果仔細觀察,WordCount的自定義Combiner類與Reducer類是完全相同的,因為他們的邏輯是相同的,即在maptask之后的分區內先進行一次累加求和,然后到reducer后再進行總的累加求和,所以在設置Combiner時也可以這樣:
job.setCombinerClass(WordCountReducer.class);
注意:Combiner的應用一定要注意不能影響最終業務邏輯的情況下使用,比如在求平均值的時候:
mapper輸出兩個分區:3,5,7 =>avg=5
2,6 =>avg=4
reducer合並輸出: 5,4 =>avg=4.5 但是實際應該為4.6,錯誤!
所以在使用Combiner時要注意其不會影響最中的結果!!!
