MapReduce中combine、partition、shuffle的作用是什么


http://www.aboutyun.com/thread-8927-1-1.html


Mapreduce在hadoop中是一個比較難以的概念。以下須要用心看,然后自己就能總結出來了。



概括:
combine和partition都是函數。中間的步驟應該僅僅有shuffle!

1.combine
combine分為map端和reduce端,作用是把同一個key的鍵值對合並在一起,能夠自己定義的。
combine函數把一個map函數產生的<key,value>對(多個key,value)合並成一個新的<key2,value2>.將新的<key2,value2>作為輸入到reduce函數中
這個value2亦可稱之為values,由於有多個。這個合並的目的是為了降低網絡傳輸。

詳細實現是由Combine類。
實現combine函數,該類的主要功能是合並同樣的key鍵。通過job.setCombinerClass()方法設置。默覺得null,不合並中間結果。實現map函數
詳細調用:(下圖是調用reduce,合並map的個數)

難點:不知道這個reduce和mapreduce中的reduce差別是什么?
以下簡單說一下:后面慢慢琢磨:
在mapreduce中。map多,reduce少。
在reduce中因為數據量比較多。所以干脆。我們先把自己map里面的數據歸類,這樣到了reduce的時候就減輕了壓力。

這里舉個樣例:
map與reduce的樣例
map理解為銷售人員,reduce理解為銷售經理。
每一個人(map)僅僅管銷售,賺了多少錢銷售人員不統計。也就是說這個銷售人員沒有Combine,那么這個銷售經理就累垮了。由於每一個人都沒有統計,它須要統計全部人員賣了多少件賺錢了多少錢。
這樣是不行的。所以銷售經理(reduce)為了減輕壓力,每一個人(map)都必須統計自己賣了多少錢,賺了多少錢(Combine),然后經理所做的事情就是統計每一個人統計之后的結果。這樣經理就輕松多了。所以Combine在map所做的事情。減輕了reduce的事情。
(這就是為什么說map的Combine干reduce的事情。相信你應該明確了)

public  static void main(String[] args)throws IOException {
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(Mapper.class);
        job.setCombinerClass(reduce.class);
        job.setPartitionerClass(HashPartitioner.class);
        job.setReducerClass(Reducer.class);
        job.setOutputFormatClass(TextOutFormat.class);
    }
}



2.partition
partition是切割map每一個節點的結果,依照key分別映射給不同的reduce。也是能夠自己定義的。這里事實上能夠理解歸類。
我們對於錯綜復雜的數據歸類。比方在動物園里有牛羊雞鴨鵝。他們都是混在一起的。可是到了晚上他們就各自牛回牛棚。羊回羊圈,雞回雞窩。partition的作用就是把這些數據歸類。僅僅只是在敲代碼的時候,mapreduce使用哈希HashPartitioner幫我們歸類了。這個我們也能夠自己定義。



        HashPartitioner是mapreduce的默認partitioner。

計算方法是

which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks。得到當前的目的reducer。


以下在看該怎樣自己定義,該怎樣調用:(以下便是自己定義了一個Partition函數。紅字部分是算法的核心,也就是分區的核心
public static class Partition extends Partitioner<IntWritable, IntWritable> {
                @Override
                public int getPartition(IntWritable key, IntWritable value,
                                int numPartitions) {
                        int Maxnumber = 65223;
                        int bound = Maxnumber / numPartitions + 1;
                        int keynumber = key.get();
                        for (int i = 0; i < numPartitions; i++) {
                                if (keynumber < bound * i && keynumber >= bound * (i - 1)) {
                                        return i - 1;
                                }

                        }
                        return 0;
                }

        }

那么我們該怎樣調用:(以下調用之后,你的分區函數就生效了)

public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = new Job(conf, "sort");
job.setJarByClass(Sort.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setPartitionerClass(Partition.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, "/home/asheng/hadoop/in");
FileOutputFormat
.setOutputPath(job, new Path("/home/asheng/hadoop/out"));
job.waitForCompletion(true);
}
}



3.shuffle

shuffle就是map和reduce之間的過程。包括了兩端的combine和partition。

它比較難以理解,由於我們摸不着。看不到它。它僅僅是理論存在的。並且確實存在,它屬於mapreduce的框架。編程的時候。我們用不到它,它屬於mapreduce框架。具體能夠看通過實例讓你真正明確mapreduce---填空式、分布(切割)編程



3.1shuffle的作用是
Map的結果,會通過partition分發到Reducer上,Reducer做完Reduce操作后。通過OutputFormat,進行輸出
shuffle階段的主要函數是fetchOutputs(),這個函數的功能就是將map階段的輸出,copy到reduce 節點本地。




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM