Partitioner分區類的作用是什么?
在進行MapReduce計算時,有時候需要把最終的輸出數據分到不同的文件中,比如按照省份划分的話,需要把同一省份的數據放到一個文件中;按照性別划分的話,需要把同一性別的數據放到一個文件中。我們知道最終的輸出數據是來自於Reducer任務。那么,如果要得到多個文件,意味着有同樣數量的Reducer任務在運行。Reducer任務的數據來自於Mapper任務,也就說Mapper任務要划分數據,對於不同的數據分配給不同的Reducer任務運行。Mapper任務划分數據的過程就稱作Partition。負責實現划分數據的類稱作Partitioner。
Partitoner類的源碼如下:
package org.apache.hadoop.mapreduce.lib.partition; import org.apache.hadoop.mapreduce.Partitioner; /** Partition keys by their {@link Object#hashCode()}. */ public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { //默認使用key的hash值與上int的最大值,避免出現數據溢出 的情況 return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
getPartition()三個參數分別是什么?
HashPartitioner是處理Mapper任務輸出的,getPartition()方法有三個形參,源碼中key、value分別指的是Mapper任務的輸出,numReduceTasks指的是設置的Reducer任務數量,默認值是1。那么任何整數與1相除的余數肯定是0。也就是說getPartition(…)方法的返回值總是0。也就是Mapper任務的輸出總是送給一個Reducer任務,最終只能輸出到一個文件中。
據此分析,如果想要最終輸出到多個文件中,在Mapper任務中對數據應該划分到多個區中。那么,我們只需要按照一定的規則讓getPartition(…)方法的返回值是0,1,2,3…即可。
大部分情況下,我們都會使用默認的分區函數,但有時我們又有一些,特殊的需求,而需要定制Partition來完成我們的業務,
案例如下:按照能否被5除盡去分區
1、如果除以5的余數是0, 放在0號分區
2、如果除以5的余數部是0, 放在1分區
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; public class FivePartitioner extends Partitioner<IntWritable, IntWritable>{ /** * 我們的需求:按照能否被5除盡去分區 * * 1、如果除以5的余數是0, 放在0號分區 * 2、如果除以5的余數部是0, 放在1分區 */ @Override public int getPartition(IntWritable key, IntWritable value, int numPartitions) { int intValue = key.get(); if(intValue % 5 == 0){ return 0; }else{ if(intValue % 2 == 0){ return 1; }else{ return 2; } } } }
在運行Mapreduce程序時,只需在主函數里加入如下兩行代碼即可:
job.setPartitionerClass(FivePartitioner.class); job.setNumReduceTasks(3);//設置為3