Hadoop mapreduce自定義分組RawComparator


本文發表於本人博客

    今天接着上次【Hadoop mapreduce自定義排序WritableComparable】文章寫,按照順序那么這次應該是講解自定義分組如何實現,關於操作順序在這里不多說了,需要了解的可以看看我在博客園的評論,現在開始。

   首先我們查看下Job這個類,發現有setGroupingComparatorClass()這個方法,具體源碼如下:

  /**
   * Define the comparator that controls which keys are grouped together
   * for a single call to 
   * {@link Reducer#reduce(Object, Iterable, 
   *                       org.apache.hadoop.mapreduce.Reducer.Context)}
   * @param cls the raw comparator to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setGroupingComparatorClass(Class<? extends RawComparator> cls
                                         ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setOutputValueGroupingComparator(cls);
  }

從方法的源碼可以看出這個方法是定義自定義鍵分組功能。設置這個自定義分組類必須滿足extends RawComparator,那我們可以看下這個類的源碼:

/**
 * <p>
 * A {@link Comparator} that operates directly on byte representations of
 * objects.
 * </p>
 * @param <T>
 * @see DeserializerComparator
 */
public interface RawComparator<T> extends Comparator<T> {
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}

然而這個RawComparator是泛型繼承Comparator接口的,簡單看了下那我們來自定義一個類繼承RawComparator,代碼如下:

public class MyGrouper implements RawComparator<SortAPI> {
    @Override
    public int compare(SortAPI o1, SortAPI o2) {
        return (int)(o1.first - o2.first);
    }
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        int compareBytes = WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
        return compareBytes;
    }
    
}

源碼中SortAPI是上節自定義排序中的定義對象,第一個方法從注釋可以看出是比較2個參數的大小,返回的是自然整數;第二個方法是在反序列化時比較,所以需要是用字節比較。接下來我們繼續看看自定義MyMapper類:

public class MyMapper extends Mapper<LongWritable, Text, SortAPI, LongWritable> {    
    @Override
    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
        String[] splied = value.toString().split("\t");
        try {
            long first = Long.parseLong(splied[0]);
            long second = Long.parseLong(splied[1]);
            context.write(new SortAPI(first,second), new LongWritable(1));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }    
}

自定義MyReduce類:

public class MyReduce extends Reducer<SortAPI, LongWritable, LongWritable, LongWritable> {
    @Override
    protected void reduce(SortAPI key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(key.first), new LongWritable(key.second));
    }
    
}

自定義SortAPI類:

public class SortAPI implements WritableComparable<SortAPI> {
    public Long first;
    public Long second;
    public SortAPI(){
        
    }
    public SortAPI(long first,long second){
        this.first = first;
        this.second = second;
    }

    @Override
    public int compareTo(SortAPI o) {
        return (int) (this.first - o.first);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(first);
        out.writeLong(second);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.first = in.readLong();
        this.second = in.readLong();
        
    }

    @Override
    public int hashCode() {
        return this.first.hashCode() + this.second.hashCode();
    }

    @Override
    public boolean equals(Object obj) {
        if(obj instanceof SortAPI){
            SortAPI o = (SortAPI)obj;
            return this.first == o.first && this.second == o.second;
        }
        return false;
    }
    
    @Override
    public String toString() {
        return "輸出:" + this.first + ";" + this.second;
    }
    
}

接下來准備數據,數據如下:

1       2
1       1
3       0
3       2
2       2
1       2

上傳至hdfs://hadoop-master:9000/grouper/input/test.txt,main代碼如下:

public class Test {
    static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/grouper/output/";
    static final String INPUT_DIR = "hdfs://hadoop-master:9000/grouper/input/test.txt";
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, Test.class.getSimpleName());    
        job.setJarByClass(Test.class);
        deleteOutputFile(OUTPUT_DIR);
        //1設置輸入目錄
        FileInputFormat.setInputPaths(job, INPUT_DIR);
        //2設置輸入格式化類
        job.setInputFormatClass(TextInputFormat.class);
        //3設置自定義Mapper以及鍵值類型
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(SortAPI.class);
        job.setMapOutputValueClass(LongWritable.class);
        //4分區
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(1);
        //5排序分組
        job.setGroupingComparatorClass(MyGrouper.class);
        //6設置在一定Reduce以及鍵值類型
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);
        //7設置輸出目錄
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR));
        //8提交job
        job.waitForCompletion(true);
    }
    
    static void deleteOutputFile(String path) throws Exception{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf);
        if(fs.exists(new Path(path))){
            fs.delete(new Path(path));
        }
    }
}

執行代碼,然后在節點上用終端輸入:hadoop fs -text /grouper/output/part-r-00000查看結果:

1       2
2       2
3       0

接下來我們修改下SortAPI類的compareTo()方法:

    @Override
    public int compareTo(SortAPI o) {
        long mis = (this.first - o.first) * -1;
        if(mis != 0 ){
            return (int)mis;
        }
        else{
            return (int)(this.second - o.second);
        }
    }

再次執行並查看/grouper/output/part-r-00000文件:

3       0
2       2
1       1

這樣我們就得出了同樣的數據分組結果會受到排序算法的影響,比如排序是倒序那么分組也是先按照倒序數據源進行分組輸出。我們還可以在map函數以及reduce函數中打印記錄(過程省略)這樣經過對比也得出分組階段:鍵值對中key相同(即compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法返回0)的則為一組,當前組再按照順序選擇第一個往緩沖區輸出(也許會存儲到硬盤)。其它的相同key的鍵值對就不會再往緩沖區輸出了。在百度上檢索到這邊文章,其中它的分組是把map函數輸出的value全部迭代到同一個key中,就相當於上面{key,value}:{1,{2,1,2}},這個結果跟最開始沒有自定義分組時是一樣的,我們可以在reduce函數輸出Iterable<LongWritable> values進行查看,其實我覺得這樣的才算是分組吧就像數據查詢一樣。

    在這里我們應該要弄懂分組與分區的區別。分區是對輸出結果文件進行分類拆分文件以便更好查看,比如一個輸出文件包含所有狀態的http請求,那么為了方便查看通過分區把請求狀態分成幾個結果文件。分組就是把一些相同鍵的鍵值對進行計算減少輸出;分區之后數據全部還是照樣輸出到reduce端,而分組的話就有所減少了;當然這2個步驟也是不同的階段執行。


這次先到這里。堅持記錄點點滴滴!



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM