Mapreduce--分區(shuffle)
分區partition
我們來回顧一下mapreduce
編程指導思想中的第三個步驟(shuffle
階段的分區):
- 第三步:對輸出的
key
,value
對進行分區:相同key
的數據發送到同一個reduce task
里面去,相同key
合並,value
形成一個集合。(這個分區的"區"本質是reduce task
,將鍵值對數據分配到不同的reduce task
)。分區是map
端的組件。
示意圖如下:
分區的個數是多少?怎么知道某個key
的數據要進入到那個分區?
在mapreduce
當中有一個抽象類叫做Partitioner
,默認使用的實現類是HashPartitioner
,我們可以通過HashPartitioner
的源碼,查看到分區的邏輯。
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public HashPartitioner() {
}
//獲取某個key的數據要進入分區:
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & 2147483647) % numReduceTasks;
//numReduceTasks是指reducetask的個數
}
}
從源碼可知,分區公式為(key.hashCode() & 2147483647) % numReduceTasks
,即對numReduceTasks
的大小求余數。
假如說 numReduceTasks=4
,則(key.hashCode() & 2147483647) % numReduceTasks
的計算結果可能為0,1,2,3
,因此,有4個分區。所以可以看到,分區的個數跟reduce task
的個數是相一致的(從分區的作用就可以推測)。
因為key.hashCode()
的存在,所以用戶沒法控制哪些key
的數據進入哪些分區。但是我們可以定義自己的分區邏輯。
案例需求
基於手機流量數據,實現將不同的手機號的數據划分到不同的文件里面去
135開頭的手機號分到一個文件里面去,
136開頭的手機號分到一個文件里面去,
137開頭的手機號分到一個文件里面去,
138開頭的手機號分到一個文件里面去,
139開頭的手機號分到一個文件里面去,
其他開頭的手機號分到一個文件里面去
案例分析
要將不同手機號數據分到不同文件去,實際上就是分區,將手機號數據分到不同的reducetask
處理,然后生成不同的part-r-
輸出文件。分區等價於將結果輸出到不同的part-r-
文件。
因此,我們要定義自己分區器(分區類),根據不同開頭的手機號返回不同的分區。
要使用我們定義的分區器,還要在job
驅動中,將分區器設置為我們自己定義的。
又因為分區個數跟reducetask
個數是一致的,所以要根據分區邏輯設置相應個數的reducetask
。
步驟1:定義自己的分區邏輯
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionOwn extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
String phoenNum = text.toString();
if(null != phoenNum && !phoenNum.equals("")){
if(phoenNum.startsWith("135")){
return 0;
}else if(phoenNum.startsWith("136")){
return 1;
}else if(phoenNum.startsWith("137")){
return 2;
}else if(phoenNum.startsWith("138")){
return 3;
}else if(phoenNum.startsWith("139")){
return 4;
}else {
return 5;
}
}else{
return 5;
}
}
}
步驟2:定義bean
類型的可序列化類型
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable {
private Integer upFlow;
private Integer downFlow;
private Integer upCountFlow;
private Integer downCountFlow;
/**
* 序列化方法
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upFlow);
out.writeInt(downFlow);
out.writeInt(upCountFlow);
out.writeInt(downCountFlow);
}
/**
* 反序列化方法
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow= in.readInt();
this.downFlow= in.readInt();
this.upCountFlow = in.readInt();
this.downCountFlow = in.readInt();
}
public Integer getUpFlow() {
return upFlow;
}
public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
}
public Integer getDownFlow() {
return downFlow;
}
public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}
public Integer getUpCountFlow() {
return upCountFlow;
}
public void setUpCountFlow(Integer upCountFlow) {
this.upCountFlow = upCountFlow;
}
public Integer getDownCountFlow() {
return downCountFlow;
}
public void setDownCountFlow(Integer downCountFlow) {
this.downCountFlow = downCountFlow;
}
@Override
public String toString() {
return "FlowBean{" +
"upFlow=" + upFlow +
", downFlow=" + downFlow +
", upCountFlow=" + upCountFlow +
", downCountFlow=" + downCountFlow +
'}';
}
}
步驟3:定義map邏輯
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable,Text,Text,FlowBean> {
private FlowBean flowBean ;
private Text text;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
flowBean = new FlowBean();
text = new Text();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
String phoneNum = split[1];
String upFlow =split[6];
String downFlow =split[7];
String upCountFlow =split[8];
String downCountFlow =split[9];
text.set(phoneNum);
flowBean.setUpFlow(Integer.parseInt(upFlow));
flowBean.setDownFlow(Integer.parseInt(downFlow));
flowBean.setUpCountFlow(Integer.parseInt(upCountFlow));
flowBean.setDownCountFlow(Integer.parseInt(downCountFlow));
context.write(text,flowBean);
}
}
步驟4:定義reduce邏輯
package com.jimmy.day05;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text,FlowBean,Text,Text> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
int upFlow = 0;
int donwFlow = 0;
int upCountFlow = 0;
int downCountFlow = 0;
for (FlowBean value : values) {
upFlow += value.getUpFlow();
donwFlow += value.getDownFlow();
upCountFlow += value.getUpCountFlow();
downCountFlow += value.getDownCountFlow();
}
context.write(key,new Text(upFlow +"\t" + donwFlow + "\t" + upCountFlow + "\t" + downCountFlow));
}
}
步驟5:創建組裝類和定義main()方法
package com.jimmy.day05;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FlowMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//獲取job對象
Job job = Job.getInstance(super.getConf(), "flowCount");
//如果程序打包運行必須要設置這一句
job.setJarByClass(FlowMain.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path(args[0]));
job.setMapperClass(FlowMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//關鍵代碼:
//設置要使用的分區類,這里指定我們自己定義的:
job.setPartitionerClass(PartitionOwn.class);
//設置reducetTask的個數,默認值是1;
job.setNumReduceTasks(Integer.parseInt(args[2]));
job.setReducerClass(FlowReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path(args[1]));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.set("mapreduce.framework.name","local");
configuration.set("yarn.resourcemanager.hostname","local");
int run = ToolRunner.run(configuration, new FlowMain(), args);
System.exit(run);
}
}
步驟6:運行
打包jar
包到集群運行:
hadoop jar original-MrNline-1.0-SNAPSHOT.jar com.jimmy.day05.FlowMain /parinput /paroutput 6
可以看到運行輸出了6個part-r
文件
思考問題
如果reducetask
的個數,跟分區邏輯設置的分區個數不一致,會怎么樣?思考下列兩個問題:
-
如果手動指定
6
個分區,reduceTask
個數設置為3
個會出現什么情況 -
如果手動指定
6
個分區,reduceTask
個數設置為9
個會出現什么情況
實驗:設置不同的reducetask
個數來運行jar
包
hadoop jar original-MrNline-1.0-SNAPSHOT.jar com.jimmy.day05.FlowMain /parinput /paroutput2 3
//出錯了,證明reducetask的個數不能少於分區個數
//---------------------------------------------------------------------------
hadoop jar original-MrNline-1.0-SNAPSHOT.jar com.jimmy.day05.FlowMain /parinput /paroutput3 9
//成功運行,證明reducetask的個數可以大於分區個數
//程序運行成功后,生成了9個part-r-文件,但是part-r-00006到part-r-00008這三個文件,沒有任何內容,為空。原因是分區個數小於reducetask個數時,有三個reducetask是不需要處理任何數據的,沒有數據傳進到那三個reducetask。
//因此,設置過多reducetask沒有必要
ReduceTask並行度對Job執行的影響
ReduceTask
的並行度個數會影響整個Job
的執行並發度和執行效率,但與MapTask
的並發數由切片數決定不同,ReduceTask
數量的決定是可以直接手動設置:
// 默認值是1,手動設置為4
job.setNumReduceTasks(4);
下面有個實驗:測試ReduceTask
多少合適
(1)實驗環境:1
個Master
節點,16
個Slave
節點:CPU:8GHZ
,內存: 2G
,(數據量為1GB
)
(2)實驗結論:從下表可知道,reducetask
不能太多,得不償失,也不能太少,效率會較低下。
MapTask =16 | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
ReduceTask |
1 | 5 | 10 | 15 | 16 | 20 | 25 | 30 | 45 | 60 |
總時間 | 892 | 146 | 110 | 92 | 88 | 100 | 128 | 101 | 145 | 104 |
Mapreduce--排序(shuffle)
排序
排序是MapReduce
框架中最重要的操作之一。
MapTask
和ReduceTask
均會對數據按照key
進行排序。該操作屬於Hadoop
的默認行為。任何應用程序中的數據均會被排序,而不管邏輯上是否需要。
默認排序是按照字典順序排序,且實現該排序的方法是快速排序。
對於MapTask
,它會將處理的結果暫時放到環形緩沖區中,當環形緩沖區使用率達到一定閾值后,再對緩沖區中的數據進行一次快速排序,並將這些有序數據溢寫到磁盤上,而當數據處理完畢后,它會對磁盤上所有文件進行歸並排序。
對於ReduceTask
,它從每個MapTask
上遠程拷貝相應的數據文件,如果文件大小超過一定閾值,則溢寫磁盤上,否則存儲在內存中。如果磁盤上文件數目達到一定閾值,則進行一次歸並排序以生成一個更大文件;如果內存中文件大小或者數目超過一定閾值,則進行一次合並后將數據溢寫到磁盤上。當所有數據拷貝完畢后,ReduceTask
統一對內存和磁盤上的所有數據進行一次歸並排序。
排序分類:
1、部分排序
MapReduce
根據輸入記錄的鍵對數據集排序。保證輸出的每個文件內部有序
2、全排序
最終輸出結果只有一個文件,且文件內部有序。實現方式是只設置一個ReduceTask
。但該方法在處理大型文件時效率極低,因為一台機器處理所有文件,完全喪失了MapReduce
所提供的並行架構
3、利用分區器實現全部排序
4、輔助排序
在Reduce
端對key
進行分組。應用於:在接收的key
為bean
對象時,想讓一個或幾個字段相同(全部字段比較不相同)的key
進入到同一個reduce
方法時,可以采用分組排序。
5、二次排序
先按照key
中的某個屬性(值)進行排序,再按照key
中的另一個屬性(值)進行排序,這個叫二次排序。在自定義排序過程中,如果compareTo
中的判斷條件為兩個即為二次排序。
案例需求
在前面的序列化當中在數據輸出的時候,我們對上行流量,下行流量,上行總流量,下行總流量進行了匯總,匯總結果如下:
//手機號 上行流量 下行流量 上行總流量 下行總流量
13480253104 3 3 180 180
13502468823 57 102 7335 110349
13560439658 33 24 2034 5892
13600217502 18 138 1080 186852
13602846565 15 12 1938 2910
13660577991 24 9 6960 690
13719199419 4 0 240 0
13726230503 24 27 2481 24681
13760778710 2 2 120 120
13823070001 6 3 360 180
13826544101 4 0 264 0
13922314466 12 12 3008 3720
13925057413 69 63 11058 48243
13926251106 4 0 240 0
13926435656 2 4 132 1512
15013685858 28 27 3659 3538
15920133257 20 20 3156 2936
15989002119 3 3 1938 180
18211575961 15 12 1527 2106
18320173382 21 18 9531 2412
84138413 20 16 4116 1432
在上面的數據基礎上,我們需要對下行流量,以及上行總流量進行排序,如果下行流量相等就按照上行總流量進行排序(二次排序)。
步驟1:定義可序列化的java bean類
把手機手機號、上行流量、下行流量、上行總流量、下行總流量這5個數據封裝到一個Java bean
類中,該類要實現WritableComparable
接口,表示既是可序列的,又是可排序的。實現該接口后,要重寫compareTo()
方法,用於排序。
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//Writable表示可序列化,Comparable表示可排序
public class FlowSortBean implements WritableComparable<FlowSortBean> {
private String phone;
private Integer upFlow;
private Integer downFlow;
private Integer upCountFlow;
private Integer downCountFlow;
//重寫compareTo方法,用於排序
@Override
public int compareTo(FlowSortBean o) {
int i = this.downCountFlow.compareTo(o.downCountFlow);
if(i == 0){
//升序:
i = this.upCountFlow.compareTo(o.upCountFlow);
//降序:
//i = -this.upCountFlow.compareTo(o.upCountFlow);
}
return i;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phone);
out.writeInt(upFlow);
out.writeInt(downFlow);
out.writeInt(upCountFlow);
out.writeInt(downCountFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.phone = in.readUTF();
this.upFlow= in.readInt();
this.downFlow= in.readInt();
this.upCountFlow = in.readInt();
this.downCountFlow = in.readInt();
}
@Override
public String toString() {
return phone + "\t" + upFlow + "\t" +downFlow + "\t" + upCountFlow + "\t" + downCountFlow ;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public Integer getUpFlow() {
return upFlow;
}
public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
}
public Integer getDownFlow() {
return downFlow;
}
public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}
public Integer getUpCountFlow() {
return upCountFlow;
}
public void setUpCountFlow(Integer upCountFlow) {
this.upCountFlow = upCountFlow;
}
public Integer getDownCountFlow() {
return downCountFlow;
}
public void setDownCountFlow(Integer downCountFlow) {
this.downCountFlow = downCountFlow;
}
}
步驟2:定義map邏輯
package com.jimmy.day06;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* LongWritable:行偏移量
* Text: 一行的內容
* FlowSortBean:封裝了手機號等5個數據的自己定義的類的對象
* NullWritable:無
*/
public class FlowSortMapper extends Mapper<LongWritable,Text,FlowSortBean, NullWritable> {
private FlowSortBean flowSortBean;
//初始化:
@Override
protected void setup(Context context) throws IOException, InterruptedException {
flowSortBean = new FlowSortBean();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
flowSortBean.setPhone(split[0]);
flowSortBean.setUpFlow(Integer.parseInt(split[1]));
flowSortBean.setDownFlow(Integer.parseInt(split[2]));
flowSortBean.setUpCountFlow(Integer.parseInt(split[3]));
flowSortBean.setDownCountFlow(Integer.parseInt(split[4]));
context.write(flowSortBean,NullWritable.get());
}
}
步驟3:定義reduce邏輯
package com.jimmy.day06;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowSortReducer extends Reducer<FlowSortBean, NullWritable,FlowSortBean,NullWritable> {
@Override
protected void reduce(FlowSortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
步驟4:定義組裝類和main()方法
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FlowSortMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//獲取job對象
Job job = Job.getInstance(super.getConf(), "flowSort");
//如果程序打包運行必須要設置這一句
job.setJarByClass(FlowSortMain.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///F://test3//"));
job.setMapperClass(FlowSortMapper.class);
job.setMapOutputKeyClass(FlowSortBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(FlowSortReducer.class);
job.setOutputKeyClass(FlowSortBean.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///F://test2//sortoutput"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new FlowSortMain(), args);
System.exit(run);
}
}
運行輸出結果
13719199419 4 0 240 0
13826544101 4 0 264 0
13760778710 2 2 120 120
13480253104 3 3 180 180
13823070001 6 3 360 180
15989002119 3 3 1938 180
13660577991 24 9 6960 690
84138413 20 16 4116 1432
13926435656 2 4 132 1512
18211575961 15 12 1527 2106
18320173382 21 18 9531 2412
13602846565 15 12 1938 2910
15920133257 20 20 3156 2936
15013685858 28 27 3659 3538
13922314466 12 12 3008 3720
13560439658 33 24 2034 5892
13726230503 24 27 2481 24681
13925057413 69 63 11058 48243
13502468823 57 102 7335 110349
13600217502 18 138 1080 186852
Mapreduce--規約(shuffle)
Conbiner規約
Combiner
是MR
程序中Mapper
和Reducer
之外的一種組件。Combiner
本質上是一個reduce
,因為它的父類是Reducer
。Combiner
和Reducer
的區別在於運行的位置Combiner
是在每一個maptask
所在的節點運行Reducer
是接收全局所有mapper
的輸出結果
combiner
的作用是對每一個maptask
的輸出進行局部合並匯總,以減少網絡傳輸量combiner
能夠應用的前提是不能影響最終的業務邏輯,而且Combiner
的輸出kv
對要跟Reducer
的輸入kv
對的類型對應起來。
規約效果示意圖(以詞頻統計為例):
如果沒有規約的處理階段,將是下面的情形:
案例需求
對於我們前面的wordCount
單詞計數統計,我們加上Combiner
過程,實現map
端的數據進行匯總之后,再發送到reduce
端,減少數據的網絡拷貝。
步驟1:自定義Combiner類
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//合並求和:
int sum=0;
for (IntWritable value:values){
sum+=value.get();
}
context.write(key,new IntWritable(sum));
}
}
步驟2:自定義map邏輯
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MyMap extends Mapper<LongWritable,Text,Text,IntWritable> {
Text text = new Text();
IntWritable intW = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
for (String word : split) {
text.set(word);
context.write(text,intW);
}
}
}
步驟3:自定義reduce邏輯
package com.jimmy.day07;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int result = 0;
for (IntWritable value : values) {
result += value.get();
}
IntWritable intWritable = new IntWritable(result);
context.write(key,intWritable);
}
}
步驟4:定義組裝類和main()方法
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Assem extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
Configuration conf = super.getConf();
Job job = Job.getInstance(conf, "mrdemo1");
job.setJarByClass(Assem.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///F://test3//"));
job.setMapperClass(MyMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//設定規約類為自己定義的類
job.setCombinerClass(MyCombiner.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///F://test2//outr"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
int exitCode= ToolRunner.run(conf,new Assem(),args);
}
}
運行結果
部分運行輸出信息如下:
從上圖可以看出,規約前record
鍵值對的個數是1132702
,規約后輸出的record
鍵值對個數為73
,極大地減少了要傳輸地鍵值對數,減少了網絡傳輸。
Mapreduce--分組(shuffle)
GroupingComparator
是mapreduce
當中reduce
端的一個功能組件,主要的作用是決定哪些數據作為一組,調用一次reduce
的邏輯,默認是每個不同的key
,作為多個不同的組,每個組調用一次reduce
邏輯,我們可以自定義GroupingComparator
實現不同的key
作為同一個組,調用一次reduce
邏輯。
注意:這里說的是每個組調用一次reduce
邏輯,可以理解為調用一次reduc()
方法,但不要理解為一個reducetask
。
分組示意圖:
案例需求
現在有訂單數據如下:
訂單id | 商品id | 成交金額 |
---|---|---|
Order_0000001 | Pdt_01 | 222.8 |
Order_0000001 | Pdt_05 | 25.8 |
Order_0000002 | Pdt_03 | 522.8 |
Order_0000002 | Pdt_04 | 122.4 |
Order_0000002 | Pdt_05 | 722.4 |
Order_0000003 | Pdt_01 | 222.8 |
現在需要求取每個訂單當中金額最大的商品id
。(不是所有訂單)
案例分析
步驟1:創建可序列化可排序的Java bean類
該類是用來封裝訂單id
、商品id
、成交金額這三個數據的。
package com.jimmy.day08;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
private String orderId;
private Double price ;
private String productId;
@Override
public int compareTo(OrderBean o) {
//注意:如果是不同的訂單之間,金額不需要排序,沒有可比性
int orderIdCompare = this.orderId.compareTo(o.orderId);
if(orderIdCompare == 0){
//比較金額,按照金額進行排序
int priceCompare = this.price.compareTo(o.price);
return -priceCompare;
}else{
//如果訂單號不同,沒有可比性,直接返回訂單號的排序即可
return orderIdCompare;
}
}
/**
* 序列化方法
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeUTF(productId);
out.writeDouble(price);
}
/**
* 反序列化方法
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.productId=in.readUTF();
this.price = in.readDouble();
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
@Override
public String toString() {
return (orderId + "\t" + productId + "\t" + price);
}
}
步驟2:定義自己的分區類
定義分區類,使用orderId
作為分區的條件,保證相同的orderId
進入到同一個reduceTask
里面去。
這里要注意:雖然進行分區后能夠保證相同orderId
的數據進入到同一個reduceTask
里面去,但是不能保證一個reduceTask
里只有一種orderId
的數據,有可能有不同orderId
的數據。
因此,進行分區后,還要需要定義我們自己的分組類,使orderId
相同的數據調用一次reduce()
方法。
package com.jimmy.day08;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class GroupPartition extends Partitioner<OrderBean, NullWritable> {
@Override
public int getPartition(OrderBean orderBean, NullWritable nullw, int numReduceTasks) {
//(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
//注意這里:使用orderId作為分區的條件,來進行判斷,保證相同的orderId進入到同一個reduceTask里面去
return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
步驟3:定義自己的分組類
該類要繼承WritableComparator
接口,表示可序列化和可比較的。重寫compare()
方法
package com.jimmy.day08;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import java.util.SortedMap;
/**
* 第六步:自定義分組邏輯
* Writable表示可序列化,Comparator表示可比較
*/
public class MyGroup extends WritableComparator {
/**
* 覆寫默認構造器,通過反射,構造OrderBean對象
* 通過反射來構造OrderBean對象
* 接受到的key2 是orderBean類型,我們就需要告訴分組,以OrderBean接受我們的參數
*/
public MyGroup(){
//調用父類WritableComparator的有參構造方法,得到一個OrderBean對象
super(OrderBean.class,true);
}
/**
* compare方法接受到兩個參數,這兩個參數其實就是我們前面傳過來的OrderBean
* @param a
* @param b
* @return
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
//類型強轉:
OrderBean first = (OrderBean) a;
OrderBean second = (OrderBean) b;
//以orderId作為比較條件,判斷哪些orderid相同作為同一組
return first.getOrderId().compareTo(second.getOrderId());
}
}
步驟4:自定義map邏輯
package com.jimmy.day08;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class GroupMapper extends Mapper<LongWritable,Text,OrderBean,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
OrderBean odb = new OrderBean();
odb.setOrderId(split[0]);
odb.setProductId(split[1]);
odb.setPrice(Double.valueOf(split[2]));
context.write(odb, NullWritable.get());
}
}
步驟5:自定義reduce邏輯
package com.jimmy.day08;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class GroupReducer extends Reducer<OrderBean,NullWritable, Text,NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
Text text=new Text();
text.set(key.getProductId());
context.write(text,NullWritable.get());
}
}
步驟6:定義組裝類和Main方法
package com.jimmy.day08;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class GroupMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//獲取job對象
Job job = Job.getInstance(super.getConf(), "group");
//第一步:讀取文件,解析成為key,value對
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///F://test4//"));
//第二步:自定義map邏輯
job.setMapperClass(GroupMapper.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
//第三步:分區
job.setPartitionerClass(GroupPartition.class);
//第四步:排序 已經做了
//第五步:規約 combiner 省掉
//第六步:分組 自定義分組邏輯
job.setGroupingComparatorClass(MyGroup.class);
//第七步:設置reduce邏輯
job.setReducerClass(GroupReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
//第八步:設置輸出路徑
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///F://test2//groupoutput"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(new Configuration(), new GroupMain(), args);
System.exit(run);
}
}
案例拓展:
如何求每個分組(根據orderId分組)當中的top2的訂單等信息???
輸出格式:
Order_0000001 Pdt_01 222.8 222.8
Order_0000001 Pdt_05 25.8 25.8
Order_0000002 Pdt_05 822.4 822.4
Order_0000002 Pdt_04 522.4 522.4
修改map邏輯
package com.jimmy.day09;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MyMap extends Mapper<LongWritable,Text, OrderBean, DoubleWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
OrderBean odb = new OrderBean();
odb.setOrderId(split[0]);
odb.setProductId(split[1]);
odb.setPrice(Double.valueOf(split[2]));
//輸出kv對:(orderBean , price)
context.write(odb, new DoubleWritable(Double.valueOf(split[2])));
}
}
修改reduce邏輯
package com.jimmy.day09;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyReduce extends Reducer<OrderBean, DoubleWritable, OrderBean,DoubleWritable> {
@Override
protected void reduce(OrderBean key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
//需要對我們集合只輸出兩個值
int i = 0;
for (DoubleWritable value : values) {
if(i<2){
context.write(key,value);
i ++;
}else{
break;
}
}
}
}
修改分區邏輯
package com.jimmy.day09;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartition extends Partitioner<OrderBean, DoubleWritable> {
@Override
public int getPartition(OrderBean orderBean, DoubleWritable dw, int numReduceTasks) {
//(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
//注意這里:使用orderId作為分區的條件,來進行判斷,保證相同的orderId進入到同一個reduceTask里面去
return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
修改組裝類
package com.jimmy.day09;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class RunClass extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//獲取job對象
Job job = Job.getInstance(super.getConf(), "group");
//第一步:讀取文件,解析成為key,value對
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///F://test4//"));
//第二步:自定義map邏輯
job.setMapperClass(MyMap.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(DoubleWritable.class);
//第三步:分區
job.setPartitionerClass(MyPartition.class);
//第四步:排序 已經做了
//第五步:規約 combiner 省掉
//第六步:分組 自定義分組邏輯
job.setGroupingComparatorClass(MyGroup.class);
//第七步:設置reduce邏輯
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(DoubleWritable.class);
//第八步:設置輸出路徑
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///F://test2//groupoutput4"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(new Configuration(), new RunClass(), args);
System.exit(run);
}
}