1. map任務處理
1.3 對輸出的key、value進行分區。
分區的目的指的是把相同分類的<k,v>交給同一個reducer任務處理。
public static class MyPartitioner<Text, LongWritable> extends Partitioner<Text, LongWritable>{
static HashMap<String,Integer> map = null;
static{
map = new HashMap<String,Integer>();
map.put("gz1", 0);
map.put("gz2", 0);
map.put("sz1", 1);
map.put("sz2", 1);
}
/**
* 這里是對mapper任務輸出的<k2,v2>進行操作
* getPartition函數返回多少的值,就會有多少個reducer任務
*
* “gz1”與“gz2”的返回的都是0,所以與分發到同一個reducer任務上,但是k2的值不一樣
* 所以分組就是
* <gz1,123>
* <gz2,234>
* 然后出現在不同reduce函數上
*/
@Override
public int getPartition(Text key, LongWritable value, int numPartitions) {
return (Integer)map.get(key.toString()).intValue();
}
}
//設置分區
wcjob.setPartitionerClass(MyPartitioner.class);
自定義排序,排序是根據k2來進行排序的,k2就需要自己進行自定義類型
private static class MyNewKey implements WritableComparable<MyNewKey> {
long firstNum;
long secondNum;
public MyNewKey() {
}
public MyNewKey(long first, long second) {
firstNum = first;
secondNum = second;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(firstNum);
out.writeLong(secondNum);
}
@Override
public void readFields(DataInput in) throws IOException {
firstNum = in.readLong();
secondNum = in.readLong();
}
/*
* 當key進行排序時會調用以下這個compreTo方法
*/
@Override
public int compareTo(MyNewKey anotherKey) {
long min = firstNum - anotherKey.firstNum;
if (min != 0) {
// 說明第一列不相等,則返回兩數之間小的數
return (int) min;
} else {
return (int) (secondNum - anotherKey.secondNum);
}
}
}
自定義分組
為了針對新的key類型作分組,我們也需要自定義一下分組規則:
(1)編寫一個新的分組比較類型用於我們的分組:
private static class MyGroupingComparator implements
RawComparator<MyNewKey> {
/*
* 基本分組規則:按第一列firstNum進行分組
*/
@Override
public int compare(MyNewKey key1, MyNewKey key2) {
return (int) (key1.firstNum - key2.firstNum);
}
/*
* @param b1 表示第一個參與比較的字節數組
*
* @param s1 表示第一個參與比較的字節數組的起始位置
*
* @param l1 表示第一個參與比較的字節數組的偏移量
*
* @param b2 表示第二個參與比較的字節數組
*
* @param s2 表示第二個參與比較的字節數組的起始位置
*
* @param l2 表示第二個參與比較的字節數組的偏移量
*/
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
}
}
從代碼中我們可以知道,我們自定義了一個分組比較器MyGroupingComparator,該類實現了RawComparator接口,而RawComparator接口又實現了Comparator接口,下面看看這兩個接口的定義:
首先是RawComparator接口的定義:
public interface RawComparator<T> extends Comparator<T> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
其次是Comparator接口的定義:
public interface Comparator<T> {
int compare(T o1, T o2);
boolean equals(Object obj);
}
在MyGroupingComparator中分別對這兩個接口中的定義進行了實現,RawComparator中的compare()方法是基於字節的比較,Comparator中的compare()方法是基於對象的比較。
在基於字節的比較方法中,有六個參數,一下子眼花了:
Params:
* @param arg0 表示第一個參與比較的字節數組
* @param arg1 表示第一個參與比較的字節數組的起始位置
* @param arg2 表示第一個參與比較的字節數組的偏移量
*
* @param arg3 表示第二個參與比較的字節數組
* @param arg4 表示第二個參與比較的字節數組的起始位置
* @param arg5 表示第二個參與比較的字節數組的偏移量
由於在MyNewKey中有兩個long類型,每個long類型又占8個字節。這里因為比較的是第一列數字,所以讀取的偏移量為8字節。
(2)添加對分組規則的設置:
// 設置自定義分組規則 job.setGroupingComparatorClass(MyGroupingComparator.class);
