一、MapReduce中有哪些常見算法
(1)經典之王:單詞計數
這個是MapReduce的經典案例,經典的不能再經典了!
(2)數據去重
"數據去重"主要是為了掌握和利用並行化思想來對數據進行有意義的篩選。統計大數據集上的數據種類個數、從網站日志中計算訪問地等這些看似龐雜的任務都會涉及數據去重。
(3)排序:按某個Key進行升序或降序排列
(4)TopK:對源數據中所有數據進行排序,取出前K個數據,就是TopK。
通常可以借助堆(Heap)來實現TopK問題。
(5)選擇:關系代數基本操作再現
從指定關系中選擇出符合條件的元組(記錄)組成一個新的關系。在關系代數中,選擇運算是針對元組的運算。
在MapReduce中,以求最大最小值為例,從N行數據中取出一行最小值,這就是一個典型的選擇操作。
(6)投影:關系代數基本操作再現
從指定關系的屬性(字段)集合中選取部分屬性組成同類的一個新關系。由於屬性減少而出現的重復元組被自動刪除。投影運算針對的是屬性。
在MapReduce中,以前面的處理手機上網日志為例,在日志中的11個字段中我們選出了五個字段來顯示我們的手機上網流量就是一個典型的投影操作。
(7)分組:Group By XXXX
在MapReduce中,分組類似於分區操作,以處理手機上網日志為例,我們分為了手機號和非手機號這樣的兩個組來分別處理。
(8)多表連接
(9)單表關聯
二、TopK一般類型之前K個問題
TopK問題是一個很常見的實際問題:在一大堆的數據中如何高效地找出前K個最大/最小的數據。我們以前的做法一般是將整個數據文件都加載到內存中,進行排序和統計。但是,當數據文件達到一定量時,這時是無法直接全部加載到內存中的,除非你想冒着宕機的危險。
這時我們想到了分布式計算,利用計算機集群來做這個事,打個比方:本來一台機器需要10小時才能完成的事,現在10台機器並行地來計算,只需要1小時就可以完成。本次我們使用一個隨機生成的100萬個數字的文件,也就是說我們要做的就是在100萬個數中找到最大的前100個數字。
實驗數據下載地址:http://pan.baidu.com/s/1qWt4WaS
2.1 利用TreeMap存儲前K個數據
(1)紅黑樹的實現
如何存儲前K個數據時TopK問題的一大核心,這里我們采用Java中TreeMap來進行存儲。TreeMap的實現是紅黑樹算法的實現,紅黑樹又稱紅-黑二叉樹,它首先是一棵二叉樹,它具體二叉樹所有的特性,同時紅黑樹更是一棵自平衡的排序二叉樹。
平衡二叉樹必須具備如下特性:它是一棵空樹或它的左右兩個子樹的高度差的絕對值不超過1,並且左右兩個子樹都是一棵平衡二叉樹。也就是說該二叉樹的任何一個等等子節點,其左右子樹的高度都相近。
紅黑樹顧名思義就是:節點是紅色或者黑色的平衡二叉樹,它通過顏色的約束來維持着二叉樹的平衡。
About:關於TreeMap與紅黑樹的詳細介紹可以閱讀chenssy的一篇文章:TreeMap與紅黑樹 ,這里不再贅述。
(2)TreeMap中的put方法
在TreeMap的put()的實現方法中主要分為兩個步驟,第一:構建排序二叉樹,第二:平衡二叉樹。
為了平衡二叉樹,往往需要進行左旋和右旋以及着色操作,這里看看左旋和右旋操作,這些操作的目的都是為了維持平衡,保證二叉樹是有序的,可以幫助我們實現有序的效果,即數據的存儲是有序的。
2.2 編寫map和reduce函數代碼
(1)map函數
public static class MyMapper extends Mapper<LongWritable, Text, NullWritable, LongWritable> { public static final int K = 100; private TreeMap<Long, Long> tm = new TreeMap<Long, Long>(); protected void map( LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { try { long temp = Long.parseLong(value.toString().trim()); tm.put(temp, temp); if (tm.size() > K) { tm.remove(tm.firstKey()); // 如果是求topk個最小的那么使用下面的語句 //tm.remove(tm.lastKey()); } } catch (Exception e) { context.getCounter("TopK", "errorLog").increment(1L); } }; protected void cleanup( org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NullWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { for (Long num : tm.values()) { context.write(NullWritable.get(), new LongWritable(num)); } }; }
cleanup()方法是在map方法結束之后才會執行的方法,這里我們將在該map任務中的前100個數據傳入reduce任務中;
(2)reduce函數
public static class MyReducer extends Reducer<NullWritable, LongWritable, NullWritable, LongWritable> { public static final int K = 100; private TreeMap<Long, Long> tm = new TreeMap<Long, Long>(); protected void reduce( NullWritable key, java.lang.Iterable<LongWritable> values, Reducer<NullWritable, LongWritable, NullWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { for (LongWritable num : values) { tm.put(num.get(), num.get()); if (tm.size() > K) { tm.remove(tm.firstKey()); // 如果是求topk個最小的那么使用下面的語句 //tm.remove(tm.lastKey()); } } // 按降序即從大到小排列Key集合 for (Long value : tm.descendingKeySet()) { context.write(NullWritable.get(), new LongWritable(value)); } }; }
在reduce方法中,依次將map方法中傳入的數據放入TreeMap中,並依靠紅黑色的平衡特性來維持數據的有序性。
(3)完整代碼

package algorithm; import java.net.URI; import java.util.TreeMap; import mapreduce.MyWordCountJob.MyMapper; import mapreduce.MyWordCountJob.MyReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.TestJobCounters.NewIdentityReducer; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MyTopKNumJob extends Configured implements Tool { /** * @author Edison Chou * @version 1.0 */ public static class MyMapper extends Mapper<LongWritable, Text, NullWritable, LongWritable> { public static final int K = 100; private TreeMap<Long, Long> tm = new TreeMap<Long, Long>(); protected void map( LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { try { long temp = Long.parseLong(value.toString().trim()); tm.put(temp, temp); if (tm.size() > K) { //tm.remove(tm.firstKey()); // 如果是求topk個最小的那么使用下面的語句 tm.remove(tm.lastKey()); } } catch (Exception e) { context.getCounter("TopK", "errorLog").increment(1L); } }; protected void cleanup( org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NullWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { for (Long num : tm.values()) { context.write(NullWritable.get(), new LongWritable(num)); } }; } /** * @author Edison Chou * @version 1.0 */ public static class MyReducer extends Reducer<NullWritable, LongWritable, NullWritable, LongWritable> { public static final int K = 100; private TreeMap<Long, Long> tm = new TreeMap<Long, Long>(); protected void reduce( NullWritable key, java.lang.Iterable<LongWritable> values, Reducer<NullWritable, LongWritable, NullWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { for (LongWritable num : values) { tm.put(num.get(), num.get()); if (tm.size() > K) { //tm.remove(tm.firstKey()); // 如果是求topk個最小的那么使用下面的語句 tm.remove(tm.lastKey()); } } // 按降序即從大到小排列Key集合 for (Long value : tm.descendingKeySet()) { context.write(NullWritable.get(), new LongWritable(value)); } }; } // 輸入文件路徑 public static String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/seq100w.txt"; // 輸出文件路徑 public static String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/topkapp"; @Override public int run(String[] args) throws Exception { // 首先刪除輸出路徑的已有生成文件 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf()); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(getConf(), "TopKNumberJob"); // 設置輸入目錄 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 設置自定義Mapper job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(LongWritable.class); // 設置自定義Reducer job.setReducerClass(MyReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(LongWritable.class); // 設置輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) { Configuration conf = new Configuration(); // map端輸出啟用壓縮 conf.setBoolean("mapred.compress.map.output", true); // reduce端輸出啟用壓縮 conf.setBoolean("mapred.output.compress", true); // reduce端輸出壓縮使用的類 conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class); try { int res = ToolRunner.run(conf, new MyTopKNumJob(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } }
(4)實現效果:圖片大小有限,這里只顯示了前12個;
三、TopK特殊類型之最值問題
最值問題是一個典型的選擇操作,從100萬個數字中找到最大或最小的一個數字,在本次實驗文件中,最大的數字時32767。現在,我們就來改寫代碼,找到32767。
3.1 改寫map函數
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> { long max = Long.MIN_VALUE; protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context) throws java.io.IOException, InterruptedException { long temp = Long.parseLong(value.toString().trim()); if (temp > max) { max = temp; } }; protected void cleanup( org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context) throws java.io.IOException, InterruptedException { context.write(new LongWritable(max), NullWritable.get()); }; }
是不是很熟悉?其實就是依次與假設的最大值進行比較。
3.2 改寫reduce函數
public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { long max = Long.MIN_VALUE; protected void reduce( LongWritable key, java.lang.Iterable<NullWritable> values, Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context) throws java.io.IOException, InterruptedException { long temp = key.get(); if (temp > max) { max = temp; } }; protected void cleanup( org.apache.hadoop.mapreduce.Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context) throws java.io.IOException, InterruptedException { context.write(new LongWritable(max), NullWritable.get()); }; }
在reduce方法中,繼續對各個map任務傳入的數據進行比較,還是依次地與假設的最大值進行比較,最后所有reduce方法執行完成后通過cleanup方法對最大值進行輸出。
最終的完整代碼如下:

package algorithm; import java.net.URI; import mapreduce.MyWordCountJob.MyMapper; import mapreduce.MyWordCountJob.MyReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MyMaxNumJob extends Configured implements Tool { /** * @author Edison Chou * @version 1.0 */ public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> { long max = Long.MIN_VALUE; protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context) throws java.io.IOException, InterruptedException { long temp = Long.parseLong(value.toString().trim()); if (temp > max) { max = temp; } }; protected void cleanup( org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context) throws java.io.IOException, InterruptedException { context.write(new LongWritable(max), NullWritable.get()); }; } /** * @author Edison Chou * @version 1.0 */ public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { long max = Long.MIN_VALUE; protected void reduce( LongWritable key, java.lang.Iterable<NullWritable> values, Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context) throws java.io.IOException, InterruptedException { long temp = key.get(); if (temp > max) { max = temp; } }; protected void cleanup( org.apache.hadoop.mapreduce.Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context) throws java.io.IOException, InterruptedException { context.write(new LongWritable(max), NullWritable.get()); }; } // 輸入文件路徑 public static String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/seq100w.txt"; // 輸出文件路徑 public static String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/topkapp"; @Override public int run(String[] args) throws Exception { // 首先刪除輸出路徑的已有生成文件 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf()); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(getConf(), "MaxNumberJob"); // 設置輸入目錄 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 設置自定義Mapper job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(NullWritable.class); // 設置自定義Reducer job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(NullWritable.class); // 設置輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } public static void main(String[] args) { Configuration conf = new Configuration(); // map端輸出啟用壓縮 conf.setBoolean("mapred.compress.map.output", true); // reduce端輸出啟用壓縮 conf.setBoolean("mapred.output.compress", true); // reduce端輸出壓縮使用的類 conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class); try { int res = ToolRunner.run(conf, new MyMaxNumJob(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } }
3.3 查看實現效果
可以看出,我們的程序已經求出了最大值:32767。雖然例子很簡單,業務也很簡單,但是我們引入了分布式計算的思想,將MapReduce應用在了最值問題之中,就是一個進步了!
參考資料
(1)吳超,《深入淺出Hadoop》:http://www.superwu.cn/
(2)Suddenly,《Hadoop日記Day18-MapReduce排序和分組》:http://www.cnblogs.com/sunddenly/p/4009751.html
(3)chenssy,《Java提高篇(27)—TreeMap》:http://blog.csdn.net/chenssy/article/details/26668941