1. map任務處理
1.3 對輸出的key、value進行分區。
分區的目的指的是把相同分類的<k,v>交給同一個reducer任務處理。
public static class MyPartitioner<Text, LongWritable> extends Partitioner<Text, LongWritable>{ static HashMap<String,Integer> map = null; static{ map = new HashMap<String,Integer>(); map.put("gz1", 0); map.put("gz2", 0); map.put("sz1", 1); map.put("sz2", 1); } /** * 這里是對mapper任務輸出的<k2,v2>進行操作 * getPartition函數返回多少的值,就會有多少個reducer任務 * * “gz1”與“gz2”的返回的都是0,所以與分發到同一個reducer任務上,但是k2的值不一樣 * 所以分組就是 * <gz1,123> * <gz2,234> * 然后出現在不同reduce函數上 */ @Override public int getPartition(Text key, LongWritable value, int numPartitions) { return (Integer)map.get(key.toString()).intValue(); } }
//設置分區
wcjob.setPartitionerClass(MyPartitioner.class);
自定義排序,排序是根據k2來進行排序的,k2就需要自己進行自定義類型
private static class MyNewKey implements WritableComparable<MyNewKey> { long firstNum; long secondNum; public MyNewKey() { } public MyNewKey(long first, long second) { firstNum = first; secondNum = second; } @Override public void write(DataOutput out) throws IOException { out.writeLong(firstNum); out.writeLong(secondNum); } @Override public void readFields(DataInput in) throws IOException { firstNum = in.readLong(); secondNum = in.readLong(); } /* * 當key進行排序時會調用以下這個compreTo方法 */ @Override public int compareTo(MyNewKey anotherKey) { long min = firstNum - anotherKey.firstNum; if (min != 0) { // 說明第一列不相等,則返回兩數之間小的數 return (int) min; } else { return (int) (secondNum - anotherKey.secondNum); } } }
自定義分組
為了針對新的key類型作分組,我們也需要自定義一下分組規則:
(1)編寫一個新的分組比較類型用於我們的分組:
private static class MyGroupingComparator implements RawComparator<MyNewKey> { /* * 基本分組規則:按第一列firstNum進行分組 */ @Override public int compare(MyNewKey key1, MyNewKey key2) { return (int) (key1.firstNum - key2.firstNum); } /* * @param b1 表示第一個參與比較的字節數組 * * @param s1 表示第一個參與比較的字節數組的起始位置 * * @param l1 表示第一個參與比較的字節數組的偏移量 * * @param b2 表示第二個參與比較的字節數組 * * @param s2 表示第二個參與比較的字節數組的起始位置 * * @param l2 表示第二個參與比較的字節數組的偏移量 */ @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8); } }
從代碼中我們可以知道,我們自定義了一個分組比較器MyGroupingComparator,該類實現了RawComparator接口,而RawComparator接口又實現了Comparator接口,下面看看這兩個接口的定義:
首先是RawComparator接口的定義:
public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
其次是Comparator接口的定義:
public interface Comparator<T> { int compare(T o1, T o2); boolean equals(Object obj); }
在MyGroupingComparator中分別對這兩個接口中的定義進行了實現,RawComparator中的compare()方法是基於字節的比較,Comparator中的compare()方法是基於對象的比較。
在基於字節的比較方法中,有六個參數,一下子眼花了:
Params:
* @param arg0 表示第一個參與比較的字節數組
* @param arg1 表示第一個參與比較的字節數組的起始位置
* @param arg2 表示第一個參與比較的字節數組的偏移量
*
* @param arg3 表示第二個參與比較的字節數組
* @param arg4 表示第二個參與比較的字節數組的起始位置
* @param arg5 表示第二個參與比較的字節數組的偏移量
由於在MyNewKey中有兩個long類型,每個long類型又占8個字節。這里因為比較的是第一列數字,所以讀取的偏移量為8字節。
(2)添加對分組規則的設置:
// 設置自定義分組規則 job.setGroupingComparatorClass(MyGroupingComparator.class);