(1)利用TreeMap排序,該方式利用小頂堆和集合重復原理的方式 , 每過來一個數據 , 跟堆頂數據進行比較 , 如果比最小的大 , 則將過來的數據替換堆頂元素 , 否則直接跳過數據 . 以此對數據進行排序。
(2)在眾多的Mapper的端,首先計算出各端Mapper的TopN,然后在將每一個Mapper端的TopN匯總到Reducer端進行計算最終的TopN,這樣就可以最大化的提高運行並行處理的能力,通時極大的減少網絡的Shuffle傳輸數據,從而極大的加快的整個處理的效率
(3)setup()與cleanup()方法
1、setup(),此方法被MapReduce框架僅且執行一次,在執行Map任務前,進行相關變量或者資源的集中初始化工作。若是將資源初始化工作放在方法map()中,導致Mapper任務在解析每一行輸入時都會進行資源初始化工作,導致重復,程序運行效率不高!
2、cleanup(),此方法被MapReduce框架僅且執行一次,在執行完畢Map任務后,進行相關變量或資源的釋放工作。若是將釋放資源工作放入方法map()中,也會導致Mapper任務在解析、處理每一行文本后釋放資源,而且在下一行文本解析前還要重復初始化,導致反復重復,程序運行效率不高!
注意:若map與reduce都出現setup()與cleanup()方法,MR都只會執行一次setup()與cleanup()方法。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; //題目:在大量的數據中計算出資費最高的N個訂單,數據格式為8,1818,9000,20,按“,“進行切分分別對應訂單ID 用戶ID 資費 業務ID。 public class SortedTopN { public static class SortedTopNMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected static void setup(Context context) throws IOException, InterruptedException { int N = context.getConfiguration().getInt("topn", 5); //獲取輸入的TopN的長度 TreeMap<Integer,String> set = new TreeMap<>(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //數據格式為8,1818,9000,20,按“,“進行切分 String[] data = value.toString().split(","); if (4 == data.length) { //符合格式的數據才進行處理 int cost = Integer.valueOf(data[2]); //獲取訂單的消費金額 map.put(cost,""); if (map.size() > N) { map.remove(map.firstKey()); } } for (int i : map.keySet()) { context.write(NullWritable.get(),new IntWritable(i)); } } } //reduce public static class SortedTopNReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int N = context.getConfiguration().getInt("topn", 5); //獲取輸入的TopN的長度 TreeMap<Integer,String> map = new TreeMap<>(); for(IntWritable value : values){ map.put(value.get(),""); if (map.size() > N) { map.remove(map.firstKey()); } } } protected void cleanup(Context context) throws IOException, InterruptedException { for (Integer i : map.descendingMap().keySet()) { context.write(NullWritable.get(), new IntWritable(i)); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration();//設置MapReduce的配置 conf.setInt("topn",5); String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length < 2){ System.out.println("Usage: SortedTopN <in> [<in>...] <out>"); System.exit(2); } //設置作業 //Job job = new Job(conf); Job job = Job.getInstance(conf); job.setJarByClass(SortedTopN.class); job.setJobName("SortedTopN"); //設置處理map,reduce的類 job.setMapperClass(SortedTopNMapper.class); job.setReducerClass(SortedTopNReducer.class); //設置輸入輸出格式的處理 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //設定輸入輸出路徑 for (int i = 0; i < otherArgs.length-1;++i){ FileInputFormat.addInputPath(job,new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1])); System.exit(job.waitForCompletion(true)?0:1); } }
