MapReduce 學習4 ---- 自定義分區、自定義排序、自定義組分


1. map任務處理

1.3 對輸出的key、value進行分區。

分區的目的指的是把相同分類的<k,v>交給同一個reducer任務處理。

 

public static class MyPartitioner<Text, LongWritable> extends Partitioner<Text, LongWritable>{

		static HashMap<String,Integer> map = null;
		static{
			map = new HashMap<String,Integer>();
			map.put("gz1", 0);
			map.put("gz2", 0);
			map.put("sz1", 1);
			map.put("sz2", 1);
		}
		/**
		 * 這里是對mapper任務輸出的<k2,v2>進行操作
		 * getPartition函數返回多少的值,就會有多少個reducer任務
		 * 
		 * “gz1”與“gz2”的返回的都是0,所以與分發到同一個reducer任務上,但是k2的值不一樣
		 * 所以分組就是
		 * <gz1,123>
		 * <gz2,234>
		 * 然后出現在不同reduce函數上
		 */
		@Override
		public int getPartition(Text key, LongWritable value, int numPartitions) {
			
			return (Integer)map.get(key.toString()).intValue();
		}
		
}


//設置分區
        wcjob.setPartitionerClass(MyPartitioner.class);

 

自定義排序,排序是根據k2來進行排序的,k2就需要自己進行自定義類型

 private static class MyNewKey implements WritableComparable<MyNewKey> {
        long firstNum;
        long secondNum;

        public MyNewKey() {
        }

        public MyNewKey(long first, long second) {
            firstNum = first;
            secondNum = second;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(firstNum);
            out.writeLong(secondNum);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            firstNum = in.readLong();
            secondNum = in.readLong();
        }

        /*
         * 當key進行排序時會調用以下這個compreTo方法
         */
        @Override
        public int compareTo(MyNewKey anotherKey) {
            long min = firstNum - anotherKey.firstNum;
            if (min != 0) {
                // 說明第一列不相等,則返回兩數之間小的數
                return (int) min;
            } else {
                return (int) (secondNum - anotherKey.secondNum);
            }
        }
    }

 

 

自定義分組

為了針對新的key類型作分組,我們也需要自定義一下分組規則:

(1)編寫一個新的分組比較類型用於我們的分組:

 private static class MyGroupingComparator implements
            RawComparator<MyNewKey> {

        /*
         * 基本分組規則:按第一列firstNum進行分組
         */
        @Override
        public int compare(MyNewKey key1, MyNewKey key2) {
            return (int) (key1.firstNum - key2.firstNum);
        }

        /*
         * @param b1 表示第一個參與比較的字節數組
         * 
         * @param s1 表示第一個參與比較的字節數組的起始位置
         * 
         * @param l1 表示第一個參與比較的字節數組的偏移量
         * 
         * @param b2 表示第二個參與比較的字節數組
         * 
         * @param s2 表示第二個參與比較的字節數組的起始位置
         * 
         * @param l2 表示第二個參與比較的字節數組的偏移量
         */
        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
        }

    }

 從代碼中我們可以知道,我們自定義了一個分組比較器MyGroupingComparator,該類實現了RawComparator接口,而RawComparator接口又實現了Comparator接口,下面看看這兩個接口的定義:

  首先是RawComparator接口的定義:

public interface RawComparator<T> extends Comparator<T> {
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}

 其次是Comparator接口的定義:

public interface Comparator<T> {
    int compare(T o1, T o2);
    boolean equals(Object obj);
}

 

MyGroupingComparator中分別對這兩個接口中的定義進行了實現,RawComparator中的compare()方法是基於字節的比較,Comparator中的compare()方法是基於對象的比較。

  在基於字節的比較方法中,有六個參數,一下子眼花了:

Params:

* @param arg0 表示第一個參與比較的字節數組
* @param arg1 表示第一個參與比較的字節數組的起始位置
* @param arg2 表示第一個參與比較的字節數組的偏移量

* @param arg3 表示第二個參與比較的字節數組
* @param arg4 表示第二個參與比較的字節數組的起始位置
* @param arg5 表示第二個參與比較的字節數組的偏移量

 

由於在MyNewKey中有兩個long類型,每個long類型又占8個字節。這里因為比較的是第一列數字,所以讀取的偏移量為8字節。

  (2)添加對分組規則的設置:

// 設置自定義分組規則
   job.setGroupingComparatorClass(MyGroupingComparator.class);

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM