一、Combiner的出現背景
1.1 回顧Map階段五大步驟
在第四篇博文《初識MapReduce》中,我們認識了MapReduce的八大步湊,其中在Map階段總共五個步驟,如下圖所示:
其中,step1.5是一個可選步驟,它就是我們今天需要了解的 Map規約 階段。現在,我們再來看看前一篇博文《計數器與自定義計數器》中的第一張關於計數器的圖:
我們可以發現,其中有兩個計數器:Combine output records和Combine input records,他們的計數都是0,這是因為我們在代碼中沒有進行Map階段的規約操作。
1.2 為什么需要進行Map規約操作
眾所周知,Hadoop框架使用Mapper將數據處理成一個個的<key,value>鍵值對,在網絡節點間對其進行整理(shuffle),然后使用Reducer處理數據並進行最終輸出。
在上述過程中,我們看到至少兩個性能瓶頸:
(1)如果我們有10億個數據,Mapper會生成10億個鍵值對在網絡間進行傳輸,但如果我們只是對數據求最大值,那么很明顯的Mapper只需要輸出它所知道的最大值即可。這樣做不僅可以減輕網絡壓力,同樣也可以大幅度提高程序效率。
總結:網絡帶寬嚴重被占降低程序效率;
(2)假設使用美國專利數據集中的國家一項來闡述數據傾斜這個定義,這樣的數據遠遠不是一致性的或者說平衡分布的,由於大多數專利的國家都屬於美國,這樣不僅Mapper中的鍵值對、中間階段(shuffle)的鍵值對等,大多數的鍵值對最終會聚集於一個單一的Reducer之上,壓倒這個Reducer,從而大大降低程序的性能。
總結:單一節點承載過重降低程序性能;
那么,有木有一種方案能夠解決這兩個問題呢?
二、初步探索Combiner
2.1 Combiner的橫空出世
在MapReduce編程模型中,在Mapper和Reducer之間有一個非常重要的組件,它解決了上述的性能瓶頸問題,它就是Combiner。
PS:
①與mapper和reducer不同的是,combiner沒有默認的實現,需要顯式的設置在conf中才有作用。
②並不是所有的job都適用combiner,只有操作滿足結合律的才可設置combiner。combine操作類似於:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt為求和、求最大值的話,可以使用,但是如果是求中值的話,不適用。
每一個map都可能會產生大量的本地輸出,Combiner的作用就是對map端的輸出先做一次合並,以減少在map和reduce節點之間的數據傳輸量,以提高網絡IO性能,是MapReduce的一種優化手段之一,其具體的作用如下所述。
(1)Combiner最基本是實現本地key的聚合,對map輸出的key排序,value進行迭代。如下所示:
map: (K1, V1) → list(K2, V2)
combine: (K2, list(V2)) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
(2)Combiner還有本地reduce功能(其本質上就是一個reduce),例如Hadoop自帶的wordcount的例子和找出value的最大值的程序,combiner和reduce完全一致,如下所示:
map: (K1, V1) → list(K2, V2)
combine: (K2, list(V2)) → list(K3, V3)
reduce: (K3, list(V3)) → list(K4, V4)
PS:現在想想,如果在wordcount中不用combiner,那么所有的結果都是reduce完成,效率會相對低下。使用combiner之后,先完成的map會在本地聚合,提升速度。對於hadoop自帶的wordcount的例子,value就是一個疊加的數字,所以map一結束就可以進行reduce的value疊加,而不必要等到所有的map結束再去進行reduce的value疊加。
2.2 融合Combiner的MapReduce
前面文章中的代碼都忽略了一個可以優化MapReduce作業所使用帶寬的步驟—Combiner,它在Mapper之后Reducer之前運行。Combiner是可選的,如果這個過程適合於你的作業,Combiner實例會在每一個運行map任務的節點上運行。Combiner會接收特定節點上的Mapper實例的輸出作為輸入,接着Combiner的輸出會被發送到Reducer那里,而不是發送Mapper的輸出。Combiner是一個“迷你reduce”過程,它只處理單台機器生成的數據。
2.3 使用MyReducer作為Combiner
在前面文章中的WordCount代碼中加入以下一句簡單的代碼,即可加入Combiner方法:
// 設置Map規約Combiner job.setCombinerClass(MyReducer.class);
還是以下面的文件內容為例,看看這次計數器會發生怎樣的改變?
(1)上傳的測試文件的內容
hello edison
hello kevin
(2)調試后的計數器日志信息
可以看到,原本都為0的Combine input records和Combine output records發生了改變。我們可以清楚地看到map的輸出和combine的輸入統計是一致的,而combine的輸出與reduce的輸入統計是一樣的。由此可以看出規約操作成功,而且執行在map的最后,reduce之前。
三、自己定義Combiner
為了能夠更加清晰的理解Combiner的工作原理,我們自定義一個Combiners類,不再使用MyReduce做為Combiners的類,具體的代碼下面一一道來。
3.1 改寫Mapper類的map方法
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { String line = value.toString(); String[] spilted = line.split(" "); for (String word : spilted) { context.write(new Text(word), new LongWritable(1L)); // 為了顯示效果而輸出Mapper的輸出鍵值對信息 System.out.println("Mapper輸出<" + word + "," + 1 + ">"); } }; }
3.2 改寫Reducer類的reduce方法
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text key, java.lang.Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { // 顯示次數表示redcue函數被調用了多少次,表示k2有多少個分組 System.out.println("Reducer輸入分組<" + key.toString() + ",N(N>=1)>"); long count = 0L; for (LongWritable value : values) { count += value.get(); // 顯示次數表示輸入的k2,v2的鍵值對數量 System.out.println("Reducer輸入鍵值對<" + key.toString() + "," + value.get() + ">"); } context.write(key, new LongWritable(count)); }; }
3.3 添加MyCombiner類並重寫reduce方法
public static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce( Text key, java.lang.Iterable<LongWritable> values, org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { // 顯示次數表示規約函數被調用了多少次,表示k2有多少個分組 System.out.println("Combiner輸入分組<" + key.toString() + ",N(N>=1)>"); long count = 0L; for (LongWritable value : values) { count += value.get(); // 顯示次數表示輸入的k2,v2的鍵值對數量 System.out.println("Combiner輸入鍵值對<" + key.toString() + "," + value.get() + ">"); } context.write(key, new LongWritable(count)); // 顯示次數表示輸出的k2,v2的鍵值對數量 System.out.println("Combiner輸出鍵值對<" + key.toString() + "," + count + ">"); }; }
3.4 添加設置Combiner的代碼
// 設置Map規約Combiner job.setCombinerClass(MyCombiner.class);
3.5 調試運行的控制台輸出信息
(1)Mapper
Mapper輸出<hello,1> Mapper輸出<edison,1> Mapper輸出<hello,1> Mapper輸出<kevin,1>
(2)Combiner
Combiner輸入分組<edison,N(N>=1)> Combiner輸入鍵值對<edison,1> Combiner輸出鍵值對<edison,1> Combiner輸入分組<hello,N(N>=1)> Combiner輸入鍵值對<hello,1> Combiner輸入鍵值對<hello,1> Combiner輸出鍵值對<hello,2> Combiner輸入分組<kevin,N(N>=1)> Combiner輸入鍵值對<kevin,1> Combiner輸出鍵值對<kevin,1>
這里可以看出,在Combiner中進行了一次本地的Reduce操作,從而簡化了遠程Reduce節點的歸並壓力。
(3)Reducer
Reducer輸入分組<edison,N(N>=1)> Reducer輸入鍵值對<edison,1> Reducer輸入分組<hello,N(N>=1)> Reducer輸入鍵值對<hello,2> Reducer輸入分組<kevin,N(N>=1)> Reducer輸入鍵值對<kevin,1>
這里可以看出,在對hello的歸並上,只進行了一次操作就完成了。
那么,如果我們再來看看不添加Combiner時的控制台輸出信息:
(1)Mapper
Mapper輸出<hello,1> Mapper輸出<edison,1> Mapper輸出<hello,1> Mapper輸出<kevin,1>
(2)Reducer
Reducer輸入分組<edison,N(N>=1)> Reducer輸入鍵值對<edison,1> Reducer輸入分組<hello,N(N>=1)> Reducer輸入鍵值對<hello,1> Reducer輸入鍵值對<hello,1> Reducer輸入分組<kevin,N(N>=1)> Reducer輸入鍵值對<kevin,1>
可以看出,沒有采用Combiner時hello都是由Reducer節點來進行統一的歸並,也就是這里為何會有兩次hello的輸入鍵值對了。
總結:從控制台的輸出信息我們可以發現,其實combine只是把兩個相同的hello進行規約,由此輸入給reduce的就變成了<hello,2>。在實際的Hadoop集群操作中,我們是由多台主機一起進行MapReduce的,如果加入規約操作,每一台主機會在reduce之前進行一次對本機數據的規約,然后在通過集群進行reduce操作,這樣就會大大節省reduce的時間,從而加快MapReduce的處理速度。
參考資料
(1)萬川梅、謝正蘭,《Hadoop應用開發實戰詳解(修訂版)》:http://item.jd.com/11508248.html
(2)Suddenly,《Hadoop日記Day17-計數器、map規約與分區學習》:http://www.cnblogs.com/sunddenly/p/4009568.html
(3)guoery,《MapReduce中Combiner的使用及誤區》:http://blog.csdn.net/guoery/article/details/8529004
(4)iPolaris,《Hadoop中Combiner的使用》:http://blog.csdn.net/ipolaris/article/details/8723782