大數據學習之九——Combiner,Partitioner,shuffle和MapReduce排序分組


1.Combiner

Combiner是MapReduce的一種優化手段。每一個map都可能會產生大量的本地輸出,Combiner的作用就是對map端的輸出先做一次合並,以減少map和reduce結點之間的數據傳輸量,以提高網絡IO性能。只有操作滿足結合律的才可設置combiner。

Combiner的作用:

(1)Combiner實現本地key的聚合,對map輸出的key排序value進行迭代:如圖所示:

map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2)  reduce: (K2, list(V2)) → list(K3, V3)

 (2)Combiner還有本地reduce功能(其本質上就是一個reduce)
         例如wordcount的例子和找出value的最大值的程序 ,combiner和reduce完全一致,如下所示:

 map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K3, V3) reduce: (K3, list(V3)) → list(K4, V4)

使用combiner之后,先完成的map會在本地聚合,提升速度。對於hadoop自帶的wordcount的例子,value就是一個疊加的數字,所以map一結束就可以進行reduce的value疊加,而不必要等到所有的map結束再去進行reduce的value疊加。

在實際的Hadoop集群操作中,我們是由多台主機一起進行MapReduce的,如果加入規約操作,每一台主機會在reduce之前進行一次對本機數據的規約,然后在通過集群進行reduce操作,這樣就會大大節省reduce的時間,從而加快MapReduce的處理速度。

2.Partitioner

step1.3就是分區操作,哪個key到哪個reducer的分配過程,是由Partitioner規定的。

用戶在中間key上使用分區函數來對數據進行分區,之后在輸入到后續任務執行進程。一個默認的分區函數式使用hash方法(比如常見的:hash(key) mod R)進行分區。hash方法能夠產生非常平衡的分區。

自定制Partitioner函數:

package mapreduce01;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class fenqu {      

static String INPUT_PATH="hdfs://master:9000/test";  

static String OUTPUT_PATH="hdfs://master:9000/output/fenqu";    

static class MyMapper extends Mapper<Object,Object,IntWritable,NullWritable>{  

 IntWritable output_key=new IntWritable();   

NullWritable output_value=NullWritable.get();   

protected void map(Object key, Object value, Context context) throw IOException,InterruptedException{        

int val=Integer.parseUnsignedInt(value.toString().trim());    

output_key.set(val);    

context.write(output_key,output_value);   

}  

}    

static class LiuPartitioner extends Partitioner<IntWritable,NullWritable> {   

@Override   

public int getPartition(IntWritable key, NullWritable value, int numPartitions){    

int num=key.get();    

if(num>100)     return 0;    

else     return 1;                           

}   

 }  

 static class MyReduce extends Reducer<IntWritable,NullWritable,IntWritable,IntWritable>{   

IntWritable output_key=new IntWritable();     

int num=1;     

protected void reduce(IntWritable key,Iterable<NullWritable> values,Context context) throws IOException,InterruptedException{        

output_key.set(num++);    

context.write(output_key,key);    

}   }    

public static void main(String[] args) throws Exception{   

Path outputpath=new Path(OUTPUT_PATH);   

Configuration conf=new Configuration();     |

FileInputFormat.setInputPaths(job, INPUT_PATH);   

FileOutputFormat.setOutputPath(job,outputpath);      

job.setMapperClass(MyMapper.class);  

 job.setReducerClass(MyReduce.class);     

 job.setNumReduceTasks(2);   

job.setPartitionerClass(LiuPartitioner.class);     

 job.setMapOutputKeyClass(IntWritable.class);  

 job.setMapOutputValueClass(NullWritable.class);     

 job.setOutputKeyClass(IntWritable.class);  

 job.setOutputValueClass(IntWritable.class);     

 job.waitForCompletion(true);  

}

}

分區Partitioner主要作用在於以下兩點:
 根據業務需要,產生多個輸出文件;多個reduce任務並發運行,提高整體job的運行效率。

3.Shuffle過程

reduce階段的三個步驟:

 step2.1就是一個shuffle【隨機、洗牌】操作

shuffle是什么:針對多個map任務的輸出按照不同的分區(Partition)通過網絡復制到不同的reduce任務節點上,這個過程就稱作為Shuffle。

在map端:

1.在map端首先是InputSplit,在InputSplit中含有DataNode中的數據,每一個InputSplit都會分配一個Mapper任務,Mapper任務結束后產生<K2,V2>的輸出,這些輸出先存放在緩存中,每個map有一個環形內存緩沖區,用於存儲任務的輸出。默認大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spil l.percent),一個后台線程就把內容寫到(spill)Linux本地磁盤中的指定目錄(mapred.local.dir)下的新建的一個溢出寫文件。

2.寫磁盤前,要進行partition、sort和combine等操作。通過分區,將不同類型的數據分開處理,之后對不同分區的數據進行排序,如果有Combiner,還要對排序后的數據進行combine。等最后記錄寫完,將全部溢出文件合並為一個分區且排序的文件。

3.最后將磁盤中的數據送到Reduce中,圖中Map輸出有三個分區,有一個分區數據被送到圖示的Reduce任務中,剩下的兩個分區被送到其他Reducer任務中。而圖示的Reducer任務的其他的三個輸入則來自其他節點的Map輸出。

reduce端:

1. Copy階段:Reducer通過Http方式得到輸出文件的分區。
reduce端可能從n個map的結果中獲取數據,而這些map的執行速度不盡相同,當其中一個map運行結束時,reduce就會從JobTracker中獲取該信息。map運行結束后TaskTracker會得到消息,進而將消息匯報給JobTracker,reduce定時從JobTracker獲取該信息,reduce端默認有5個數據復制線程從map端復制數據。

2.Merge階段:如果形成多個磁盤文件會進行合並
從map端復制來的數據首先寫到reduce端的緩存中,同樣緩存占用到達一定閾值后會將數據寫到磁盤中,同樣會進行partition、combine、排序等過程。如果形成了多個磁盤文件還會進行合並,最后一次合並的結果作為reduce的輸入而不是寫入到磁盤中。

3.Reducer的參數:最后將合並后的結果作為輸入傳入Reduce任務中。

4.排序sort

step4.1第四步中需要對不同分區中數據進行排序和分組,默認情況按照key進行排序和分組。

自定義類型MyGrouptestt實現了WritableComparable的接口,該接口中有一個compareTo()方法,當對key進行比較時會調用該方法,而我們將其改為了我們自己定義的比較規則,從而實現我們想要的效果。

自定義排序:

GroupSort.java

package mapreduce01;

import java.io.IOException;

import mapreduce01.fenqu.LiuPartitioner;

import mapreduce01.fenqu.MyMapper;

import mapreduce01.fenqu.MyReduce;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class GroupSort {     

static String INPUT_PATH="hdfs://master:9000/input/f.txt";  

static String OUTPUT_PATH="hdfs://master:9000/output/groupsort";  

 static class MyMapper extends Mapper<Object,Object,MyGrouptest,NullWritable>{  

 MyGrouptest output_key=new MyGrouptest();   

NullWritable output_value=NullWritable.get();   

protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{    

String[] tokens=value.toString().split(",",2);   

 MyGrouptest output_key=new MyGrouptest(Long.parseLong(tokens[0]), Long.parseLong(tokens[1]));  

context.write(output_key,output_value);   

}  

}  

 static class MyReduce extends Reducer<MyGrouptest,NullWritable,LongWritable,LongWritable>{   

LongWritable output_key=new LongWritable();  

 LongWritable output_value=new LongWritable();    

protected void reduce(MyGrouptest key,Iterable<NullWritable> values,Context context) throws IOException,InterruptedException{    

output_key.set(key.getFirstNum());    

output_value.set(key.getSecondNum());    

context.write(output_key,output_value);    

}   

}    

public static void main(String[] args) throws Exception{   

Path outputpath=new Path(OUTPUT_PATH);   

Configuration conf=new Configuration();      

Job job=Job.getInstance(conf);   

FileInputFormat.setInputPaths(job, INPUT_PATH);  

 FileOutputFormat.setOutputPath(job,outputpath);     

 job.setMapperClass(MyMapper.class);  

 job.setReducerClass(MyReduce.class);     

 job.setNumReduceTasks(1);  

 job.setPartitionerClass(LiuPartitioner.class);      

job.setMapOutputKeyClass(MyGrouptest.class);   

job.setMapOutputValueClass(NullWritable.class);     

 job.setOutputKeyClass(LongWritable.class);   

job.setOutputValueClass(LongWritable.class);     

 job.waitForCompletion(true);  

}

}

MyGrouptest.java

package mapreduce01;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class MyGrouptest implements WritableComparable<MyGrouptest> {       

 long firstNum;         

 long secondNum;        

public MyGrouptest() {}       

public MyGrouptest(long first, long second) {             

 firstNum = first;              

secondNum = second;       

 }        

@Override         

public void write(DataOutput out) throws IOException {              

out.writeLong(firstNum);              

out.writeLong(secondNum);         

}        

@Override       

 public void readFields(DataInput in) throws IOException {              

firstNum = in.readLong();              

secondNum = in.readLong();       

 }    /*         * 當key進行排序時會調用以下這個compreTo方法         */       

 @Override         

public int compareTo(MyGrouptest anotherKey) {              

 long min = firstNum - anotherKey.firstNum;              

 if (min != 0) {                // 說明第一列不相等,則返回兩數之間小的數                   

 return (int) min;               

}    

else {                   

 return (int) (secondNum - anotherKey.secondNum);              

 }         

}     

public long getFirstNum() {   return firstNum;  }  

public long getSecondNum() {   return secondNum;  }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM