一、題目:最小的k個數
題目:輸入n個整數,找出其中最小的k個數。例如輸入4、5、1、6、2、7、3、8這8個數字,則最小的4個數字是1、2、3、4。
這道題是典型的TopK問題,其最簡單的思路莫過於把輸入的n個整數排序,排序之后位於最前面的k個數就是最小的k個數。這種思路的時間復雜度是O(nlogn),但是面試官會要求時間復雜度保持在O(n)。
二、解題思路
2.1 需要修改數據源的O(n)解法
基於快速排序中的Partition函數來解決這個問題。如果基於數組的第k個數字來調整,使得比第k個數字小的所有數字都位於數組的左邊,比第k個數字大的所有數字都位於數組的右邊。這樣調整之后,位於數組中左邊的k個數字就是最小的k個數字(這k個數字不一定是排序的)。
But,采用這種思路是有限制的。我們需要修改輸入的數組,因為函數Partition會調整數組中數字的順序。
2.2 適合處理海量數據的O(nlogk)解法
可以先創建一個大小為k的數據容器來存儲最小的k個數字,接下來我們每次從輸入的n個整數中讀入一個數。
如果容器中已有的數字少於k個,則直接把這次讀入的整數放入容器之中;
如果容器中已有k個數字了,也就是容器已滿,此時我們不能再插入新的數字而只能替換已有的數字。
找出這已有的k個數中的最大值,然后拿這次待插入的整數和最大值進行比較。如果待插入的值比當前已有的最大值小,則用這個數替換當前已有的最大值;如果待插入的值比當前已有的最大值還要大,那么這個數不可能是最小的k個整數之一,於是我們可以拋棄這個整數。
因此當容器滿了之后,我們要做3件事情:一是在k個整數中找到最大數;二是有可能在這個容器中刪除最大數;三是有可能要插入一個新的數字。如果用一個二叉樹來實現這個數據容器,那么我們能在O(logk)時間內實現這三步操作。因此對於n個輸入數字而言,總的時間效率就是O(nlogk)。
根據以上步驟,這里采用C#實現代碼如下:(采用了紅黑樹結構作為容器,當然也可以采用堆來實現,有關紅黑樹的細節可以閱讀yangecnu的《淺談算法和數據結構之紅黑樹》)
public static void GetLeastNumbersByRedBlackTree(List<int> data, SortedDictionary<int, int> leastNumbers, int k) { leastNumbers.Clear(); if (k < 1 || data.Count < k) { return; } for (int i = 0; i < data.Count; i++) { int num = data[i]; if (leastNumbers.Count < k) { leastNumbers.Add(num, num); } else { int greastNum = leastNumbers.ElementAt(leastNumbers.Count - 1).Value; if (num < greastNum) { leastNumbers.Remove(greastNum); leastNumbers.Add(num, num); } } } }
此解法雖然要慢一點,但它有兩個明顯的優點:
一是沒有修改輸入的數據(代碼中的變量data)。我們每次只是從data中讀入數字,所有的寫操作都是在容器leastNumbers中進行的。
二是該算法適合海量數據的輸入(包括百度在內的多家公司非常喜歡與海量輸入數據相關的問題)。
假設題目是要求從海量的數據中找出最小的k個數字,由於內存的大小是有限的,有可能不能把這些海量的數據一次性全部載入內存。這個時候,我們可以從輔助存儲空間(比如硬盤)中每次讀入一個數字,根據GetLeastNumbers的方式判斷是不是需要放入容器leastNumbers即可。
這種思路只要求內存能夠容納leastNumbers即可,因此它最適合的情形就是n很大並且k較小的問題。
三、單元測試
3.1 測試用例
(1)封裝輔助測試方法TestPortal()

public static void TestPortal(string testName, int[] data, int[] expected, int k) { if (!string.IsNullOrEmpty(testName)) { Console.WriteLine("{0} begins:", testName); } Console.WriteLine("Result for solution:"); if (expected != null) { Console.WriteLine("Expected result:"); for (int i = 0; i < expected.Length; i++) { Console.Write("{0}\t", expected[i]); } Console.WriteLine(); } if(data == null) { return; } List<int> dataList = new List<int>(); for (int i = 0; i < data.Length; i++) { dataList.Add(data[i]); } SortedDictionary<int, int> leastNumbers = new SortedDictionary<int, int>(); GetLeastNumbersByRedBlackTree(dataList, leastNumbers, k); Console.WriteLine("Actual result:"); foreach (var item in leastNumbers) { Console.Write("{0}\t", item.Value); } Console.WriteLine("\n"); }
(2)功能測試、特殊輸入測試
// k小於數組的長度 public static void Test1() { int[] data = { 4, 5, 1, 6, 2, 7, 3, 8 }; int[] expected = { 1, 2, 3, 4 }; TestPortal("Test1", data, expected, expected.Length); } // k等於數組的長度 public static void Test2() { int[] data = { 4, 5, 1, 6, 2, 7, 3, 8 }; int[] expected = { 1, 2, 3, 4, 5, 6, 7, 8 }; TestPortal("Test2", data, expected, expected.Length); } // k大於數組的長度 public static void Test3() { int[] data = { 4, 5, 1, 6, 2, 7, 3, 8 }; int[] expected = null; TestPortal("Test3", data, expected, 10); } // k等於1 public static void Test4() { int[] data = { 4, 5, 1, 6, 2, 7, 3, 8 }; int[] expected = { 1 }; TestPortal("Test4", data, expected, expected.Length); } // k等於0 public static void Test5() { int[] data = { 4, 5, 1, 6, 2, 7, 3, 8 }; int[] expected = null; TestPortal("Test5", data, expected, 0); } // 數組中有相同的數字 public static void Test6() { int[] data = { 4, 5, 1, 6, 2, 7, 2, 8 }; int[] expected = { 1, 2 }; TestPortal("Test6", data, expected, expected.Length); } // 輸入空指針 public static void Test7() { TestPortal("Test7", null, null, 0); }
3.2 測試結果
四、分布式計算
4.1 Hadoop MapReduce簡單介紹
Hadoop MapReduce是一個軟件框架,基於該框架能夠容易地編寫應用程序,這些應用程序能夠運行在由上千個商用機器組成的大集群上,並以一種可靠的,具有容錯能力的方式並行地處理上TB級別的海量數據集。
因此,對於MapReduce,可以簡潔地認為,它是一個軟件框架,海量數據是它的“菜”,它在大規模集群上以一種可靠且容錯的方式並行地“烹飪這道菜”。
4.2 使用MapReduce解決TopK問題
這里我們使用一個隨機生成的100萬個數字的文件,也就是說我們要做的就是在100萬個數中找到最大的前100個數字。
實驗數據下載地址:http://pan.baidu.com/s/1qWt4WaS
(1)map函數

public static class MyMapper extends Mapper<LongWritable, Text, NullWritable, LongWritable> { public static final int K = 100; private TreeMap<Long, Long> tm = new TreeMap<Long, Long>(); protected void map( LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { try { long temp = Long.parseLong(value.toString().trim()); tm.put(temp, temp); if (tm.size() > K) { tm.remove(tm.firstKey()); // 如果是求topk個最小的那么使用下面的語句 //tm.remove(tm.lastKey()); } } catch (Exception e) { context.getCounter("TopK", "errorLog").increment(1L); } }; protected void cleanup( org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NullWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { for (Long num : tm.values()) { context.write(NullWritable.get(), new LongWritable(num)); } }; }
其中,使用了java中的紅黑樹對應的數據結構TreeMap類,cleanup()方法是在map方法結束之后才會執行的方法,這里我們將在該map任務中的前100個數據傳入reduce任務中;
(2)reduce函數

public static class MyReducer extends Reducer<NullWritable, LongWritable, NullWritable, LongWritable> { public static final int K = 100; private TreeMap<Long, Long> tm = new TreeMap<Long, Long>(); protected void reduce( NullWritable key, java.lang.Iterable<LongWritable> values, Reducer<NullWritable, LongWritable, NullWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { for (LongWritable num : values) { tm.put(num.get(), num.get()); if (tm.size() > K) { tm.remove(tm.firstKey()); // 如果是求topk個最小的那么使用下面的語句 //tm.remove(tm.lastKey()); } } // 按降序即從大到小排列Key集合 for (Long value : tm.descendingKeySet()) { context.write(NullWritable.get(), new LongWritable(value)); } }; }
在reduce方法中,依次將map方法中傳入的數據放入TreeMap中,並依靠紅黑色的平衡特性來維持數據的有序性。
(3)完整代碼

package algorithm; import java.net.URI; import java.util.TreeMap; import mapreduce.MyWordCountJob.MyMapper; import mapreduce.MyWordCountJob.MyReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.TestJobCounters.NewIdentityReducer; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MyTopKNumJob extends Configured implements Tool { /** * @author Edison Chou * @version 1.0 */ public static class MyMapper extends Mapper<LongWritable, Text, NullWritable, LongWritable> { public static final int K = 100; private TreeMap<Long, Long> tm = new TreeMap<Long, Long>(); protected void map( LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { try { long temp = Long.parseLong(value.toString().trim()); tm.put(temp, temp); if (tm.size() > K) { //tm.remove(tm.firstKey()); // 如果是求topk個最小的那么使用下面的語句 tm.remove(tm.lastKey()); } } catch (Exception e) { context.getCounter("TopK", "errorLog").increment(1L); } }; protected void cleanup( org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NullWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { for (Long num : tm.values()) { context.write(NullWritable.get(), new LongWritable(num)); } }; } /** * @author Edison Chou * @version 1.0 */ public static class MyReducer extends Reducer<NullWritable, LongWritable, NullWritable, LongWritable> { public static final int K = 100; private TreeMap<Long, Long> tm = new TreeMap<Long, Long>(); protected void reduce( NullWritable key, java.lang.Iterable<LongWritable> values, Reducer<NullWritable, LongWritable, NullWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { for (LongWritable num : values) { tm.put(num.get(), num.get()); if (tm.size() > K) { //tm.remove(tm.firstKey()); // 如果是求topk個最小的那么使用下面的語句 tm.remove(tm.lastKey()); } } // 按降序即從大到小排列Key集合 for (Long value : tm.descendingKeySet()) { context.write(NullWritable.get(), new LongWritable(value)); } }; } // 輸入文件路徑 public static String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/seq100w.txt"; // 輸出文件路徑 public static String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/topkapp"; @Override public int run(String[] args) throws Exception { // 首先刪除輸出路徑的已有生成文件 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf()); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(getConf(), "TopKNumberJob"); // 設置輸入目錄 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 設置自定義Mapper job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(LongWritable.class); // 設置自定義Reducer job.setReducerClass(MyReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(LongWritable.class); // 設置輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) { Configuration conf = new Configuration(); // map端輸出啟用壓縮 conf.setBoolean("mapred.compress.map.output", true); // reduce端輸出啟用壓縮 conf.setBoolean("mapred.output.compress", true); // reduce端輸出壓縮使用的類 conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class); try { int res = ToolRunner.run(conf, new MyTopKNumJob(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } }
(4)實現效果:圖片大小有限,這里只顯示了前12個;
雖然例子很簡單,業務也很簡單,但是我們引入了分布式計算的思想,將MapReduce應用在了最值問題之中,也算是一個進步了。