本文發表於本人博客。
今天接着上次【Hadoop mapreduce自定義排序WritableComparable】文章寫,按照順序那么這次應該是講解自定義分組如何實現,關於操作順序在這里不多說了,需要了解的可以看看我在博客園的評論,現在開始。
首先我們查看下Job這個類,發現有setGroupingComparatorClass()這個方法,具體源碼如下:
/**
* Define the comparator that controls which keys are grouped together
* for a single call to
* {@link Reducer#reduce(Object, Iterable,
* org.apache.hadoop.mapreduce.Reducer.Context)}
* @param cls the raw comparator to use
* @throws IllegalStateException if the job is submitted
*/
public void setGroupingComparatorClass(Class<? extends RawComparator> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setOutputValueGroupingComparator(cls);
}
從方法的源碼可以看出這個方法是定義自定義鍵分組功能。設置這個自定義分組類必須滿足extends RawComparator,那我們可以看下這個類的源碼:
/**
* <p>
* A {@link Comparator} that operates directly on byte representations of
* objects.
* </p>
* @param <T>
* @see DeserializerComparator
*/
public interface RawComparator<T> extends Comparator<T> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
然而這個RawComparator是泛型繼承Comparator接口的,簡單看了下那我們來自定義一個類繼承RawComparator,代碼如下:
public class MyGrouper implements RawComparator<SortAPI> {
@Override
public int compare(SortAPI o1, SortAPI o2) {
return (int)(o1.first - o2.first);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int compareBytes = WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
return compareBytes;
}
}
源碼中SortAPI是上節自定義排序中的定義對象,第一個方法從注釋可以看出是比較2個參數的大小,返回的是自然整數;第二個方法是在反序列化時比較,所以需要是用字節比較。接下來我們繼續看看自定義MyMapper類:
public class MyMapper extends Mapper<LongWritable, Text, SortAPI, LongWritable> {
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
String[] splied = value.toString().split("\t");
try {
long first = Long.parseLong(splied[0]);
long second = Long.parseLong(splied[1]);
context.write(new SortAPI(first,second), new LongWritable(1));
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
自定義MyReduce類:
public class MyReduce extends Reducer<SortAPI, LongWritable, LongWritable, LongWritable> {
@Override
protected void reduce(SortAPI key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
context.write(new LongWritable(key.first), new LongWritable(key.second));
}
}
自定義SortAPI類:
public class SortAPI implements WritableComparable<SortAPI> {
public Long first;
public Long second;
public SortAPI(){
}
public SortAPI(long first,long second){
this.first = first;
this.second = second;
}
@Override
public int compareTo(SortAPI o) {
return (int) (this.first - o.first);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(first);
out.writeLong(second);
}
@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readLong();
this.second = in.readLong();
}
@Override
public int hashCode() {
return this.first.hashCode() + this.second.hashCode();
}
@Override
public boolean equals(Object obj) {
if(obj instanceof SortAPI){
SortAPI o = (SortAPI)obj;
return this.first == o.first && this.second == o.second;
}
return false;
}
@Override
public String toString() {
return "輸出:" + this.first + ";" + this.second;
}
}
接下來准備數據,數據如下:
1 2 1 1 3 0 3 2 2 2 1 2
上傳至hdfs://hadoop-master:9000/grouper/input/test.txt,main代碼如下:
public class Test {
static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/grouper/output/";
static final String INPUT_DIR = "hdfs://hadoop-master:9000/grouper/input/test.txt";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, Test.class.getSimpleName());
job.setJarByClass(Test.class);
deleteOutputFile(OUTPUT_DIR);
//1設置輸入目錄
FileInputFormat.setInputPaths(job, INPUT_DIR);
//2設置輸入格式化類
job.setInputFormatClass(TextInputFormat.class);
//3設置自定義Mapper以及鍵值類型
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(SortAPI.class);
job.setMapOutputValueClass(LongWritable.class);
//4分區
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
//5排序分組
job.setGroupingComparatorClass(MyGrouper.class);
//6設置在一定Reduce以及鍵值類型
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
//7設置輸出目錄
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR));
//8提交job
job.waitForCompletion(true);
}
static void deleteOutputFile(String path) throws Exception{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf);
if(fs.exists(new Path(path))){
fs.delete(new Path(path));
}
}
}
執行代碼,然后在節點上用終端輸入:hadoop fs -text /grouper/output/part-r-00000查看結果:
1 2 2 2 3 0
接下來我們修改下SortAPI類的compareTo()方法:
@Override
public int compareTo(SortAPI o) {
long mis = (this.first - o.first) * -1;
if(mis != 0 ){
return (int)mis;
}
else{
return (int)(this.second - o.second);
}
}
再次執行並查看/grouper/output/part-r-00000文件:
3 0 2 2 1 1
這樣我們就得出了同樣的數據分組結果會受到排序算法的影響,比如排序是倒序那么分組也是先按照倒序數據源進行分組輸出。我們還可以在map函數以及reduce函數中打印記錄(過程省略)這樣經過對比也得出分組階段:鍵值對中key相同(即compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法返回0)的則為一組,當前組再按照順序選擇第一個往緩沖區輸出(也許會存儲到硬盤)。其它的相同key的鍵值對就不會再往緩沖區輸出了。在百度上檢索到這邊文章,其中它的分組是把map函數輸出的value全部迭代到同一個key中,就相當於上面{key,value}:{1,{2,1,2}},這個結果跟最開始沒有自定義分組時是一樣的,我們可以在reduce函數輸出Iterable<LongWritable> values進行查看,其實我覺得這樣的才算是分組吧就像數據查詢一樣。
在這里我們應該要弄懂分組與分區的區別。分區是對輸出結果文件進行分類拆分文件以便更好查看,比如一個輸出文件包含所有狀態的http請求,那么為了方便查看通過分區把請求狀態分成幾個結果文件。分組就是把一些相同鍵的鍵值對進行計算減少輸出;分區之后數據全部還是照樣輸出到reduce端,而分組的話就有所減少了;當然這2個步驟也是不同的階段執行。
這次先到這里。堅持記錄點點滴滴!
