自定義排序(WritableComparable)
當寫mr程序來處理文本時,經常會將處理后的信息封裝到我們自定義的bean中,並將bean作為map輸出的key來傳輸
而mr程序會在處理數據的過程中(傳輸到reduce之前)對數據排序(如:map端生成的文件中的內容分區且區內有序)。
操作:
自定義bean來封裝處理后的信息,可以自定義排序規則用bean中的某幾個屬性來作為排序的依據
代碼節段:
自定義的bean實現WritableComparable接口,並對compareTo的實現
//先按照年齡排序,再按性別(年齡小,sex大的在前)
@Override
public int compareTo(Person o) {
if(o.age==this.age){
if(o.sex==this.sex){
return 0;
}else{
return o.sex-this.sex;
}
}else{
return this.age-o.age;
}
}
注意: 1.hadoop開發了一套自己的序列化和反序列化策略(Writable),因為map端的文件要下載到reduce端的話如果不在同一台節點上是會走網絡進行傳輸(hadoop-rpc),所以對象需要序列化。
2.如果空構造函數被覆蓋,一定要顯示的定義一下,否則反序列化時會拋異常。
自定義分區(Partitioner)
Mapreduce中會將maptask輸出的kv對,默認(HashPartitioner)根據key的hashcode%reducetask數來分區。如果要按照我們自己的需求進行分組,則需要改寫數據分發組件Partitioner繼承抽象類:Partitioner。
操作:
在job對象中,設job.setPartitionerClass(自定義分區類.class)
節選代碼:
public class CustomPartitioner extends Partitioner<Text,Text>{
/*
* numPartitions其實我們可以設置,在job.setNumReduceTasks(n)設置。
* 假如job.setNumReduceTasks(5),那么這里的numPartitions=5,那么默認的HashPartitioner的機制就是用key的hashcode%numPartitions來決定分區屬於哪個分區,所以分區數量就等於我們設置的reduce數量5個。
*/
@Override
public int getPartition(Text key, Text value, int numPartitions) {
Integer hash = numMap.get(key.toString().substring(0, 1));
//將沒有匹配到的數據放入3號分區
return hash==null?3:hash;
}
}
自定義分組(GroupingComparator)
假設把bean作為key發送給reduce,而在reduce端我們希望將年齡相同的kv聚合成組,那么就可以如下方式實現。
1.自定義分組要繼承WritableComparator,然后重寫compare方法。
2.定義完成后要設置job.setGroupingComparatorClass(CustomGroupingComparator.class);
代碼節選
public class CustomGroupingComparator extends WritableComparator{
protected CustomGroupingComparator() {
super(Person.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Bean abean = (Bean) a;
Bean bbean = (Bean) b;
//將item_id相同的bean都視為相同,從而聚合為一組
return abean.getAge()-bbean.getAge();
}
}
自定義分組排序(SortGroupingComparator)盡快補充上
自定義分組排序(SortComparator)盡快補充上
每個分區內都會調用job.setSortComparatorClass()設置的key比較函數類排序;
如果沒有通過job.setSortComparatorClass()設置key比較函數類,則使用key的實現的compareTo方法