MapReduce的自定義排序、分區和分組


自定義排序(WritableComparable)

當寫mr程序來處理文本時,經常會將處理后的信息封裝到我們自定義的bean中,並將bean作為map輸出的key來傳輸

而mr程序會在處理數據的過程中(傳輸到reduce之前)對數據排序(如:map端生成的文件中的內容分區且區內有序)。

操作:

自定義bean來封裝處理后的信息,可以自定義排序規則用bean中的某幾個屬性來作為排序的依據

代碼節段:

自定義的bean實現WritableComparable接口,並對compareTo的實現

 //先按照年齡排序,再按性別(年齡小,sex大的在前)

@Override
public int compareTo(Person o) {
  if(o.age==this.age){
    if(o.sex==this.sex){
      return 0;
    }else{
      return o.sex-this.sex;
    }
  }else{
  return this.age-o.age;
  }
}

注意: 1.hadoop開發了一套自己的序列化和反序列化策略(Writable),因為map端的文件要下載到reduce端的話如果不在同一台節點上是會走網絡進行傳輸(hadoop-rpc),所以對象需要序列化。

    2.如果空構造函數被覆蓋,一定要顯示的定義一下,否則反序列化時會拋異常。

自定義分區(Partitioner)

Mapreduce中會將maptask輸出的kv對,默認(HashPartitioner)根據key的hashcode%reducetask數來分區。如果要按照我們自己的需求進行分組,則需要改寫數據分發組件Partitioner繼承抽象類:Partitioner。 

操作:

在job對象中,設job.setPartitionerClass(自定義分區類.class)

節選代碼:

public class CustomPartitioner extends Partitioner<Text,Text>{

  /*
  * numPartitions其實我們可以設置,在job.setNumReduceTasks(n)設置。
  * 假如job.setNumReduceTasks(5),那么這里的numPartitions=5,那么默認的HashPartitioner的機制就是用key的hashcode%numPartitions來決定分區屬於哪個分區,所以分區數量就等於我們設置的reduce數量5個。
  */
  @Override
  public int getPartition(Text key, Text value, int numPartitions) {
    Integer hash = numMap.get(key.toString().substring(0, 1));
    //將沒有匹配到的數據放入3號分區
    return hash==null?3:hash;
  }
}

自定義分組(GroupingComparator)

假設把bean作為key發送給reduce,而在reduce端我們希望將年齡相同的kv聚合成組,那么就可以如下方式實現。

1.自定義分組要繼承WritableComparator,然后重寫compare方法。

2.定義完成后要設置job.setGroupingComparatorClass(CustomGroupingComparator.class);
代碼節選

public class CustomGroupingComparator extends WritableComparator{
  protected CustomGroupingComparator() {
    super(Person.class, true);
  }
  @Override
  public int compare(WritableComparable a, WritableComparable b) {
    Bean abean = (Bean) a;
    Bean bbean = (Bean) b;
    //將item_id相同的bean都視為相同,從而聚合為一組
    return abean.getAge()-bbean.getAge();
  }
}

 

自定義分組排序(SortGroupingComparator)盡快補充上

 

 

自定義分組排序(SortComparator)盡快補充上

 每個分區內都會調用job.setSortComparatorClass()設置的key比較函數類排序;

如果沒有通過job.setSortComparatorClass()設置key比較函數類,則使用key的實現的compareTo方法

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM