Hadoop學習筆記—11.MapReduce中的排序和分組


一、寫在之前的

1.1 回顧Map階段四大步驟

  首先,我們回顧一下在MapReduce中,排序和分組在哪里被執行:

map stage

  從上圖中可以清楚地看出,在Step1.4也就是第四步中,需要對不同分區中的數據進行排序和分組,默認情況下,是按照key進行排序和分組。

1.2 實驗場景數據文件

  在一些特定的數據文件中,不一定都是類似於WordCount單次統計這種規范的數據,比如下面這類數據,它雖然只有兩列,但是卻有一定的實踐意義。

3    3
3    2
3    1
2    2
2    1
1    1

  (1)如果按照第一列升序排列,當第一列相同時,第二列升序排列,結果如下所示

1    1
2    1
2    2
3    1
3    2
3    3

  (2)如果當第一列相同時,求出第二列的最小值,結果如下所示

3    1
2    1
1    1

  接着,我們會針對這個數據文件,進行排序和分組的實踐嘗試,以求達到結果所示的效果。

二、初步探索排序

2.1 默認的排序

  在Hadoop默認的排序算法中,只會針對key值進行排序,我們最初的代碼如下(這里只展示了map和reduce函數):

public class MySortJob extends Configured implements Tool {

    public static class MyMapper extends
            Mapper<LongWritable, Text, LongWritable, LongWritable> {
        protected void map(
                LongWritable key,
                Text value,
                Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            String[] spilted = value.toString().split("\t");
            long firstNum = Long.parseLong(spilted[0]);
            long secondNum = Long.parseLong(spilted[1]);

            context.write(new LongWritable(firstNum), new LongWritable(
                    secondNum));
        };
    }

    public static class MyReducer extends
            Reducer<Text, LongWritable, Text, LongWritable> {
        protected void reduce(
                LongWritable key,
                java.lang.Iterable<LongWritable> values,
                Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            for (LongWritable value : values) {
                context.write(key, value);
            }
        };
    }

}

  這里我們將第一列作為了key,第二列作為了value。

  可以查看一下運行后的結果,如下所示:

1    1
2    2
2    1
3    3
3    2
3    1

  從運行結果來看,並沒有達到我們最初的目的,於是,我們需要拋棄默認的排序規則,因此我們要自定義排序。

2.2 自定義排序

  (1)封裝一個自定義類型作為key的新類型:將第一列與第二列都作為key

    private static class MyNewKey implements WritableComparable<MyNewKey> {
        long firstNum;
        long secondNum;

        public MyNewKey() {
        }

        public MyNewKey(long first, long second) {
            firstNum = first;
            secondNum = second;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(firstNum);
            out.writeLong(secondNum);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            firstNum = in.readLong();
            secondNum = in.readLong();
        }

        /*
         * 當key進行排序時會調用以下這個compreTo方法
         */
        @Override
        public int compareTo(MyNewKey anotherKey) {
            long min = firstNum - anotherKey.firstNum;
            if (min != 0) {
                // 說明第一列不相等,則返回兩數之間小的數
                return (int) min;
            } else {
                return (int) (secondNum - anotherKey.secondNum);
            }
        }
    }

PS:這里為什么需要封裝一個新類型呢?因為原來只有key參與排序,現在將第一個數和第二個數都參與排序,作為一個新的key。

  (2)改寫最初的MapReduce方法函數代碼:(只展示了map和reduce函數,還需要修改map和reduce輸出的類型設置)

        public static class MyMapper extends
            Mapper<LongWritable, Text, MyNewKey, LongWritable> {
        protected void map(
                LongWritable key,
                Text value,
                Mapper<LongWritable, Text, MyNewKey, LongWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            String[] spilted = value.toString().split("\t");
            long firstNum = Long.parseLong(spilted[0]);
            long secondNum = Long.parseLong(spilted[1]);
            // 使用新的類型作為key參與排序
            MyNewKey newKey = new MyNewKey(firstNum, secondNum);

            context.write(newKey, new LongWritable(secondNum));
        };
    }

    public static class MyReducer extends
            Reducer<MyNewKey, LongWritable, LongWritable, LongWritable> {
        protected void reduce(
                MyNewKey key,
                java.lang.Iterable<LongWritable> values,
                Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            context.write(new LongWritable(key.firstNum), new LongWritable(
                    key.secondNum));
        };
    }

  從上面的代碼中我們可以發現,新類型MyNewKey實現了一個叫做WritableComparable的接口,該接口中有一個compareTo()方法,當對key進行比較時會調用該方法,而我們將其改為了我們自己定義的比較規則,從而實現我們想要的效果。

  其實,這個WritableComparable還實現了兩個接口,我們看看其定義:

public interface WritableComparable<T> extends Writable, Comparable<T> {
}

  Writable接口是為了實現序列化,而Comparable則是為了實現比較。

  (3)現在看看運行結果:

1    1
2    1
2    2
3    1
3    2
3    3

  運行結果與預期的已經一致,自定義排序生效!

三、初步探索分組

3.1 默認的分組

  在Hadoop中的默認分組規則中,也是基於Key進行的,會將相同key的value放到一個集合中去。這里以上面的例子繼續看看分組,因為我們自定義了一個新的key,它是以兩列數據作為key的,因此這6行數據中每個key都不相同,也就是說會產生6組,它們是:1 1,2 1,2 2,3 1,3 2,3 3。而實際上只可以分為3組,分別是1,2,3。

  現在首先改寫一下reduce函數代碼,目的是求出第一列相同時第二列的最小值,看看它會有怎么樣的分組:

    public static class MyReducer extends
            Reducer<MyNewKey, LongWritable, LongWritable, LongWritable> {
        protected void reduce(
                MyNewKey key,
                java.lang.Iterable<LongWritable> values,
                Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            long min = Long.MAX_VALUE;
            for (LongWritable number : values) {
                long temp = number.get();
                if (temp < min) {
                    min = temp;
                }
            }

            context.write(new LongWritable(key.firstNum), new LongWritable(min));
        };
    }

  其運行結果為:

1    1
2    1
2    2
3    1
3    2
3    3

  但是我們預期的結果為:

#當第一列相同時,求出第二列的最小值
3    3
3    2
3    1
2    2
2    1
1    1
-------------------
#預期結果應該是
3    1
2    1
1    1

3.2 自定義分組

  為了針對新的key類型作分組,我們也需要自定義一下分組規則:

  (1)編寫一個新的分組比較類型用於我們的分組:

    private static class MyGroupingComparator implements
            RawComparator<MyNewKey> {

        /*
         * 基本分組規則:按第一列firstNum進行分組
         */
        @Override
        public int compare(MyNewKey key1, MyNewKey key2) {
            return (int) (key1.firstNum - key2.firstNum);
        }

        /*
         * @param b1 表示第一個參與比較的字節數組
         * 
         * @param s1 表示第一個參與比較的字節數組的起始位置
         * 
         * @param l1 表示第一個參與比較的字節數組的偏移量
         * 
         * @param b2 表示第二個參與比較的字節數組
         * 
         * @param s2 表示第二個參與比較的字節數組的起始位置
         * 
         * @param l2 表示第二個參與比較的字節數組的偏移量
         */
        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
        }

    }

  從代碼中我們可以知道,我們自定義了一個分組比較器MyGroupingComparator,該類實現了RawComparator接口,而RawComparator接口又實現了Comparator接口,下面看看這兩個接口的定義:

  首先是RawComparator接口的定義:

public interface RawComparator<T> extends Comparator<T> {
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}

  其次是Comparator接口的定義:

public interface Comparator<T> {
    int compare(T o1, T o2);
    boolean equals(Object obj);
}

  在MyGroupingComparator中分別對這兩個接口中的定義進行了實現,RawComparator中的compare()方法是基於字節的比較,Comparator中的compare()方法是基於對象的比較。

  在基於字節的比較方法中,有六個參數,一下子眼花了:

Params:

* @param arg0 表示第一個參與比較的字節數組
* @param arg1 表示第一個參與比較的字節數組的起始位置
* @param arg2 表示第一個參與比較的字節數組的偏移量

* @param arg3 表示第二個參與比較的字節數組
* @param arg4 表示第二個參與比較的字節數組的起始位置
* @param arg5 表示第二個參與比較的字節數組的偏移量

  由於在MyNewKey中有兩個long類型,每個long類型又占8個字節。這里因為比較的是第一列數字,所以讀取的偏移量為8字節。

  (2)添加對分組規則的設置:

  // 設置自定義分組規則
   job.setGroupingComparatorClass(MyGroupingComparator.class);

  (3)現在看看運行結果:

參考資料

(1)吳超,《深入淺出Hadoop》:http://www.superwu.cn/

(2)Suddenly,《Hadoop日記Day18-MapReduce排序和分組》:http://www.cnblogs.com/sunddenly/p/4009751.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM