MapReduce(二)常用三大組件


mapreduce三大組件:Combiner\Sort\Partitioner

 默認組件:排序,分區(不設置,系統有默認值)

一、mapreduce中的Combiner

    1、什么是combiner

Combiner 是 MapReduce 程序中 Mapper 和 Reducer 之外的一種組件,它的作用是在 maptask 之后給 maptask 的結果進行局部匯總,以減輕 reducetask 的計算負載,減少網絡傳輸
    2、如何使用combiner

  Combiner 和 Reducer 一樣,編寫一個類,然后繼承 Reducer, reduce 方法中寫具體的 Combiner 邏輯,然后在 job 中設置 Combiner 類: job.setCombinerClass(FlowSumCombine.class)

(如果combiner和reduce邏輯一樣,就不用寫combiner類了,直接在job設置信息)

   3、使用combiner注意事項  

(1) Combiner 和 Reducer 的區別在於運行的位置:

      Combiner 是在每一個 maptask 所在的節點運行
      Reducer 是接收全局所有 Mapper 的輸出結果
(2) Combiner 的輸出 kv 應該跟 reducer 的輸入 kv 類型要對應起來
(3) Combiner 的使用要非常謹慎,因為 Combiner 在 MapReduce 過程中可能調用也可能不調 用,可能調一次也可能調多次,所以: Combiner 使用的原則是:有或沒有都不能影響業務 邏輯,都不能影響最終結果(求平均值時,combiner和reduce邏輯不一樣)
二、mapreduce中的序列化

     1、概述

Java 的序列化是一個重量級序列化框架( Serializable),一個對象被序列化后,會附帶很多額 外的信息(各種校驗信息, header,繼承體系等),不便於在網絡中高效傳輸;所以, hadoop 自己開發了一套序列化機制( Writable),精簡,高效
Hadoop 中的序列化框架已經對基本類型和 null 提供了序列化的實現了。分別是:

    2、Java序列化

以案例說明為例:

     3、自定義對象實現mapreduce框架的序列化

如果需要將自定義的 bean 放在 key 中傳輸,則還需要實現 Comparable 接口,因為 mapreduce框中的 shuffle 過程一定會對 key 進行排序,此時,自定義的 bean 實現的接口應該是:
public class FlowBean implements WritableComparable<FlowBean>
以案例為例說明
下面是進行了序列化的 FlowBean 類:

案例:

1、

package com.ghgj.mr.exerciseflow;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Flow implements WritableComparable<Flow>{

	private String phone;
	private long upflow;	// 上行流量
	private long downflow;	// 下行流量
	private long sumflow;	// 上行和下行流量之和
	public long getUpflow() {
		return upflow;
	}
	public void setUpflow(long upflow) {
		this.upflow = upflow;
	}
	public long getDownflow() {
		return downflow;
	}
	public void setDownflow(long downflow) {
		this.downflow = downflow;
	}
	public long getSumflow() {
		return sumflow;
	}
	public void setSumflow(long sumflow) {
		this.sumflow = sumflow;
	}
	public String getPhone() {
		return phone;
	}
	public void setPhone(String phone) {
		this.phone = phone;
	}
	public Flow() {
	}
	public Flow(long upflow, long downflow, String phone) {
		super();
		this.upflow = upflow;
		this.downflow = downflow;
		this.sumflow = upflow + downflow;
		this.phone = phone;
	}
	@Override
	public String toString() {
		return phone +"\t" + upflow +"\t" + downflow +"\t" + sumflow;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeLong(upflow);
		out.writeLong(downflow);
		out.writeLong(sumflow);
		out.writeUTF(phone);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		this.upflow = in.readLong();
		this.downflow = in.readLong();
		this.sumflow = in.readLong();
		this.phone = in.readUTF();
	}
	@Override
	public int compareTo(Flow flow) {
		if((flow.getSumflow() - this.sumflow) == 0){
			return this.phone.compareTo(flow.getPhone());
		}else{
			return (int)(flow.getSumflow() - this.sumflow);
		}
	}
}

 

package com.ghgj.mr.exerciseflow;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 手機號	上行流量	下行流量	總流量
 * @author Administrator
 *
 */
public class FlowExercise1 {

	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(FlowExercise1.class);
		
		job.setMapperClass(FlowExercise1Mapper.class);
		job.setReducerClass(FlowExercise1Reducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Flow.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.setInputPaths(job, "d:/flow/input");
		FileOutputFormat.setOutputPath(job, new Path("d:/flow/output13"));
		
		boolean status = job.waitForCompletion(true);
		System.exit(status? 0 : 1);
	}
	
	static class FlowExercise1Mapper extends Mapper<LongWritable, Text, Text, Flow>{
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String[] splits = value.toString().split("\t");
			
			String phone = splits[1];
			long upflow = Long.parseLong(splits[8]);
			long downflow = Long.parseLong(splits[9]);
			
			Flow flow = new Flow(upflow, downflow);
			context.write(new Text(phone), flow);
		}
	}

	static class FlowExercise1Reducer extends Reducer<Text, Flow, Text, Flow>{
		@Override
		protected void reduce(Text phone, Iterable<Flow> flows, Context context)
				throws IOException, InterruptedException {
			
			long sumUpflow = 0;    // 該phone用戶的總上行流量
			long sumDownflow = 0;  
			for(Flow f : flows){
				sumUpflow += f.getUpflow();
				sumDownflow += f.getDownflow();
			}
			Flow sumFlow = new Flow(sumUpflow, sumDownflow);
			context.write(phone, sumFlow);
			
//			String v = sumUpflow +"\t" + sumDownflow +"\t" + (sumUpflow + sumDownflow);
//			context.write(phone, new Text(v));
		}
	}
}

  2、

package com.ghgj.mr.exerciseflow;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowExercise2Sort {
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(FlowExercise2Sort.class);
		
		job.setMapperClass(FlowExercise2SortMapper.class);
		job.setReducerClass(FlowExercise2SortReducer.class);
		
		job.setMapOutputKeyClass(Flow.class);
		job.setMapOutputValueClass(Text.class);
		
//		job.setCombinerClass(FlowExercise1Combiner.class);
//		job.setCombinerClass(FlowExercise1Reducer.class);
		
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Flow.class);
		
		FileInputFormat.setInputPaths(job, "d:/flow/output1");
		FileOutputFormat.setOutputPath(job, new Path("d:/flow/sortoutput6"));
		
		boolean status = job.waitForCompletion(true);
		System.exit(status? 0 : 1);
	}
	
	static class FlowExercise2SortMapper extends Mapper<LongWritable, Text, Flow, Text>{
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Flow, Text>.Context context)
				throws IOException, InterruptedException {
			
			String[] splits = value.toString().split("\t");
			
			String phone = splits[0];
			long upflow = Long.parseLong(splits[1]);
			long downflow = Long.parseLong(splits[2]);
//			long sumflow = Long.parseLong(splits[3]);
			Flow flow = new Flow(upflow, downflow, phone);
			
			context.write(flow, new Text(phone));
		}
	}
	
	static class FlowExercise2SortReducer extends Reducer<Flow, Text, NullWritable, Flow>{
		@Override
		protected void reduce(Flow flow, Iterable<Text> phones, Context context)
				throws IOException, InterruptedException {
			
			for(Text t : phones){
				context.write(NullWritable.get(), flow);
			}
		}
	}
}

  三、mapreduce中的sort

需求: 把上例求得的流量綜合從大到小倒序排
基本思路:實現自定義的 bean 來封裝流量信息,並將 bean 作為 map 輸出的 key 來傳輸 MR 程序在處理數據的過程中會對數據排序(map 輸出的 kv 對傳輸到 reduce 之前,會排序), 排序的依據是 map 輸出的 key, 所以,我們如果要實現自己需要的排序規則,則可以考慮將
排序因素放到 key 中,讓 key 實現接口: WritableComparable, 然后重寫 key 的 compareTo 方法
(上面第二題)

     四、mapreduce中的partitioner

需求: 根據歸屬地輸出流量統計數據結果到不同文件,以便於在查詢統計結果時可以定位到 省級范圍進行
思路:MapReduce 中會將 map 輸出的 kv 對,按照相同 key 分組,然后分發給不同的 reducetask
默認的分發規則為:根據 key 的 hashcode%reducetask 數來分發, 所以:如果要按照我們自 己的需求進行分組,則需要改寫數據分發(分組)組件 Partitioner
自定義一個 CustomPartitioner 繼承抽象類: Partitioner
然后在 job 對象中,設置自定義 partitioner: job.setPartitionerClass(ProvincePartitioner.class)

(上面第三題)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM