http://www.aboutyun.com/thread-8927-1-1.html Mapreduce在hadoop中是一個比較難以的概念。以下須要用心看,然后自己就能總結出來了。 概括: combine和partition都是函數。中間的步驟應該僅僅有shuffle! 1.combine combine分為map端和reduce端,作用是把同一個key的鍵值對合並在一起,能夠自己定義的。 combine函數把一個map函數產生的<key,value>對(多個key,value)合並成一個新的<key2,value2>.將新的<key2,value2>作為輸入到reduce函數中 這個value2亦可稱之為values,由於有多個。這個合並的目的是為了降低網絡傳輸。 詳細實現是由Combine類。 實現combine函數,該類的主要功能是合並同樣的key鍵。通過job.setCombinerClass()方法設置。默覺得null,不合並中間結果。實現map函數 詳細調用:(下圖是調用reduce,合並map的個數) 難點:不知道這個reduce和mapreduce中的reduce差別是什么? 以下簡單說一下:后面慢慢琢磨: 在mapreduce中。map多,reduce少。 在reduce中因為數據量比較多。所以干脆。我們先把自己map里面的數據歸類,這樣到了reduce的時候就減輕了壓力。 這里舉個樣例: map與reduce的樣例 map理解為銷售人員,reduce理解為銷售經理。 每一個人(map)僅僅管銷售,賺了多少錢銷售人員不統計。也就是說這個銷售人員沒有Combine,那么這個銷售經理就累垮了。由於每一個人都沒有統計,它須要統計全部人員賣了多少件。賺錢了多少錢。 這樣是不行的。所以銷售經理(reduce)為了減輕壓力,每一個人(map)都必須統計自己賣了多少錢,賺了多少錢(Combine),然后經理所做的事情就是統計每一個人統計之后的結果。這樣經理就輕松多了。所以Combine在map所做的事情。減輕了reduce的事情。 (這就是為什么說map的Combine干reduce的事情。相信你應該明確了)
public static void main(String[] args)throws IOException {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(Mapper.class);
job.setCombinerClass(reduce.class);
job.setPartitionerClass(HashPartitioner.class);
job.setReducerClass(Reducer.class);
job.setOutputFormatClass(TextOutFormat.class);
}
}
2.partition partition是切割map每一個節點的結果,依照key分別映射給不同的reduce。也是能夠自己定義的。這里事實上能夠理解歸類。 我們對於錯綜復雜的數據歸類。比方在動物園里有牛羊雞鴨鵝。他們都是混在一起的。可是到了晚上他們就各自牛回牛棚。羊回羊圈,雞回雞窩。partition的作用就是把這些數據歸類。僅僅只是在敲代碼的時候,mapreduce使用哈希HashPartitioner幫我們歸類了。這個我們也能夠自己定義。
HashPartitioner是mapreduce的默認partitioner。
計算方法是 which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks。得到當前的目的reducer。
以下在看該怎樣自己定義,該怎樣調用:(以下便是自己定義了一個Partition函數。紅字部分是算法的核心,也就是分區的核心)
public static class Partition extends Partitioner<IntWritable, IntWritable> {
@Override
public int getPartition(IntWritable key, IntWritable value,
int numPartitions) {
int Maxnumber = 65223;
int bound = Maxnumber / numPartitions + 1;
int keynumber = key.get();
for (int i = 0; i < numPartitions; i++) {
if (keynumber < bound * i && keynumber >= bound * (i - 1)) { return i - 1; }
}
return 0;
}
}
那么我們該怎樣調用:(以下調用之后,你的分區函數就生效了)
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = new Job(conf, "sort");
job.setJarByClass(Sort.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setPartitionerClass(Partition.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, "/home/asheng/hadoop/in");
FileOutputFormat
.setOutputPath(job, new Path("/home/asheng/hadoop/out"));
job.waitForCompletion(true);
}
}
3.shuffle shuffle就是map和reduce之間的過程。包括了兩端的combine和partition。它比較難以理解,由於我們摸不着。看不到它。它僅僅是理論存在的。並且確實存在,它屬於mapreduce的框架。編程的時候。我們用不到它,它屬於mapreduce框架。具體能夠看通過實例讓你真正明確mapreduce---填空式、分布(切割)編程。 3.1shuffle的作用是 Map的結果,會通過partition分發到Reducer上,Reducer做完Reduce操作后。通過OutputFormat,進行輸出 shuffle階段的主要函數是fetchOutputs(),這個函數的功能就是將map階段的輸出,copy到reduce 節點本地。
|