一、寫在之前的
1.1 回顧Map階段四大步驟
首先,我們回顧一下在MapReduce中,排序和分組在哪里被執行:
從上圖中可以清楚地看出,在Step1.4也就是第四步中,需要對不同分區中的數據進行排序和分組,默認情況下,是按照key進行排序和分組。
1.2 實驗場景數據文件
在一些特定的數據文件中,不一定都是類似於WordCount單次統計這種規范的數據,比如下面這類數據,它雖然只有兩列,但是卻有一定的實踐意義。
3 3 3 2 3 1 2 2 2 1 1 1
(1)如果按照第一列升序排列,當第一列相同時,第二列升序排列,結果如下所示
1 1 2 1 2 2 3 1 3 2 3 3
(2)如果當第一列相同時,求出第二列的最小值,結果如下所示
3 1 2 1 1 1
接着,我們會針對這個數據文件,進行排序和分組的實踐嘗試,以求達到結果所示的效果。
二、初步探索排序
2.1 默認的排序
在Hadoop默認的排序算法中,只會針對key值進行排序,我們最初的代碼如下(這里只展示了map和reduce函數):
public class MySortJob extends Configured implements Tool { public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable> { protected void map( LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { String[] spilted = value.toString().split("\t"); long firstNum = Long.parseLong(spilted[0]); long secondNum = Long.parseLong(spilted[1]); context.write(new LongWritable(firstNum), new LongWritable( secondNum)); }; } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce( LongWritable key, java.lang.Iterable<LongWritable> values, Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { for (LongWritable value : values) { context.write(key, value); } }; } }
這里我們將第一列作為了key,第二列作為了value。
可以查看一下運行后的結果,如下所示:
1 1 2 2 2 1 3 3 3 2 3 1
從運行結果來看,並沒有達到我們最初的目的,於是,我們需要拋棄默認的排序規則,因此我們要自定義排序。
2.2 自定義排序
(1)封裝一個自定義類型作為key的新類型:將第一列與第二列都作為key
private static class MyNewKey implements WritableComparable<MyNewKey> { long firstNum; long secondNum; public MyNewKey() { } public MyNewKey(long first, long second) { firstNum = first; secondNum = second; } @Override public void write(DataOutput out) throws IOException { out.writeLong(firstNum); out.writeLong(secondNum); } @Override public void readFields(DataInput in) throws IOException { firstNum = in.readLong(); secondNum = in.readLong(); } /* * 當key進行排序時會調用以下這個compreTo方法 */ @Override public int compareTo(MyNewKey anotherKey) { long min = firstNum - anotherKey.firstNum; if (min != 0) { // 說明第一列不相等,則返回兩數之間小的數 return (int) min; } else { return (int) (secondNum - anotherKey.secondNum); } } }
PS:這里為什么需要封裝一個新類型呢?因為原來只有key參與排序,現在將第一個數和第二個數都參與排序,作為一個新的key。
(2)改寫最初的MapReduce方法函數代碼:(只展示了map和reduce函數,還需要修改map和reduce輸出的類型設置)
public static class MyMapper extends Mapper<LongWritable, Text, MyNewKey, LongWritable> { protected void map( LongWritable key, Text value, Mapper<LongWritable, Text, MyNewKey, LongWritable>.Context context) throws java.io.IOException, InterruptedException { String[] spilted = value.toString().split("\t"); long firstNum = Long.parseLong(spilted[0]); long secondNum = Long.parseLong(spilted[1]); // 使用新的類型作為key參與排序 MyNewKey newKey = new MyNewKey(firstNum, secondNum); context.write(newKey, new LongWritable(secondNum)); }; } public static class MyReducer extends Reducer<MyNewKey, LongWritable, LongWritable, LongWritable> { protected void reduce( MyNewKey key, java.lang.Iterable<LongWritable> values, Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { context.write(new LongWritable(key.firstNum), new LongWritable( key.secondNum)); }; }
從上面的代碼中我們可以發現,新類型MyNewKey實現了一個叫做WritableComparable的接口,該接口中有一個compareTo()方法,當對key進行比較時會調用該方法,而我們將其改為了我們自己定義的比較規則,從而實現我們想要的效果。
其實,這個WritableComparable還實現了兩個接口,我們看看其定義:
public interface WritableComparable<T> extends Writable, Comparable<T> { }
Writable接口是為了實現序列化,而Comparable則是為了實現比較。
(3)現在看看運行結果:
1 1 2 1 2 2 3 1 3 2 3 3
運行結果與預期的已經一致,自定義排序生效!
三、初步探索分組
3.1 默認的分組
在Hadoop中的默認分組規則中,也是基於Key進行的,會將相同key的value放到一個集合中去。這里以上面的例子繼續看看分組,因為我們自定義了一個新的key,它是以兩列數據作為key的,因此這6行數據中每個key都不相同,也就是說會產生6組,它們是:1 1,2 1,2 2,3 1,3 2,3 3。而實際上只可以分為3組,分別是1,2,3。
現在首先改寫一下reduce函數代碼,目的是求出第一列相同時第二列的最小值,看看它會有怎么樣的分組:
public static class MyReducer extends Reducer<MyNewKey, LongWritable, LongWritable, LongWritable> { protected void reduce( MyNewKey key, java.lang.Iterable<LongWritable> values, Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { long min = Long.MAX_VALUE; for (LongWritable number : values) { long temp = number.get(); if (temp < min) { min = temp; } } context.write(new LongWritable(key.firstNum), new LongWritable(min)); }; }
其運行結果為:
1 1 2 1 2 2 3 1 3 2 3 3
但是我們預期的結果為:
#當第一列相同時,求出第二列的最小值 3 3 3 2 3 1 2 2 2 1 1 1 ------------------- #預期結果應該是 3 1 2 1 1 1
3.2 自定義分組
為了針對新的key類型作分組,我們也需要自定義一下分組規則:
(1)編寫一個新的分組比較類型用於我們的分組:
private static class MyGroupingComparator implements RawComparator<MyNewKey> { /* * 基本分組規則:按第一列firstNum進行分組 */ @Override public int compare(MyNewKey key1, MyNewKey key2) { return (int) (key1.firstNum - key2.firstNum); } /* * @param b1 表示第一個參與比較的字節數組 * * @param s1 表示第一個參與比較的字節數組的起始位置 * * @param l1 表示第一個參與比較的字節數組的偏移量 * * @param b2 表示第二個參與比較的字節數組 * * @param s2 表示第二個參與比較的字節數組的起始位置 * * @param l2 表示第二個參與比較的字節數組的偏移量 */ @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8); } }
從代碼中我們可以知道,我們自定義了一個分組比較器MyGroupingComparator,該類實現了RawComparator接口,而RawComparator接口又實現了Comparator接口,下面看看這兩個接口的定義:
首先是RawComparator接口的定義:
public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
其次是Comparator接口的定義:
public interface Comparator<T> { int compare(T o1, T o2); boolean equals(Object obj); }
在MyGroupingComparator中分別對這兩個接口中的定義進行了實現,RawComparator中的compare()方法是基於字節的比較,Comparator中的compare()方法是基於對象的比較。
在基於字節的比較方法中,有六個參數,一下子眼花了:
Params:
* @param arg0 表示第一個參與比較的字節數組
* @param arg1 表示第一個參與比較的字節數組的起始位置
* @param arg2 表示第一個參與比較的字節數組的偏移量
*
* @param arg3 表示第二個參與比較的字節數組
* @param arg4 表示第二個參與比較的字節數組的起始位置
* @param arg5 表示第二個參與比較的字節數組的偏移量
由於在MyNewKey中有兩個long類型,每個long類型又占8個字節。這里因為比較的是第一列數字,所以讀取的偏移量為8字節。
(2)添加對分組規則的設置:
// 設置自定義分組規則 job.setGroupingComparatorClass(MyGroupingComparator.class);
(3)現在看看運行結果:
參考資料
(1)吳超,《深入淺出Hadoop》:http://www.superwu.cn/
(2)Suddenly,《Hadoop日記Day18-MapReduce排序和分組》:http://www.cnblogs.com/sunddenly/p/4009751.html