mapreduce求topN


(1)利用TreeMap排序,該方式利用小頂堆和集合重復原理的方式 , 每過來一個數據 , 跟堆頂數據進行比較 , 如果比最小的大 , 則將過來的數據替換堆頂元素 , 否則直接跳過數據 . 以此對數據進行排序。

(2)在眾多的Mapper的端,首先計算出各端Mapper的TopN,然后在將每一個Mapper端的TopN匯總到Reducer端進行計算最終的TopN,這樣就可以最大化的提高運行並行處理的能力,通時極大的減少網絡的Shuffle傳輸數據,從而極大的加快的整個處理的效率

(3)setup()與cleanup()方法

 1、setup(),此方法被MapReduce框架僅且執行一次,在執行Map任務前,進行相關變量或者資源的集中初始化工作。若是將資源初始化工作放在方法map()中,導致Mapper任務在解析每一行輸入時都會進行資源初始化工作,導致重復,程序運行效率不高!

 2、cleanup(),此方法被MapReduce框架僅且執行一次,在執行完畢Map任務后,進行相關變量或資源的釋放工作。若是將釋放資源工作放入方法map()中,也會導致Mapper任務在解析、處理每一行文本后釋放資源,而且在下一行文本解析前還要重復初始化,導致反復重復,程序運行效率不高!

 注意:若map與reduce都出現setup()與cleanup()方法,MR都只會執行一次setup()與cleanup()方法。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;

//題目:在大量的數據中計算出資費最高的N個訂單,數據格式為8,1818,9000,20,按“,“進行切分分別對應訂單ID 用戶ID 資費 業務ID。
public class SortedTopN {
    public static class SortedTopNMapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected static void setup(Context context) throws IOException, InterruptedException {
            int N = context.getConfiguration().getInt("topn", 5); //獲取輸入的TopN的長度
            TreeMap<Integer,String> set = new TreeMap<>();

        }
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //數據格式為8,1818,9000,20,按“,“進行切分
            String[] data = value.toString().split(",");
            if (4 == data.length) { //符合格式的數據才進行處理
                int cost = Integer.valueOf(data[2]); //獲取訂單的消費金額
                map.put(cost,"");
                if (map.size() > N) {
                    map.remove(map.firstKey());
                }
            }
            for (int i : map.keySet()) {
                context.write(NullWritable.get(),new IntWritable(i));
            }
        }
    }

    //reduce
    public static class SortedTopNReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            int N = context.getConfiguration().getInt("topn", 5); //獲取輸入的TopN的長度
            TreeMap<Integer,String> map = new TreeMap<>();
            for(IntWritable value : values){
                map.put(value.get(),"");
                if (map.size() > N) {
                    map.remove(map.firstKey());
                }
            }
        }
        protected void cleanup(Context context) throws IOException, InterruptedException {
            for (Integer i : map.descendingMap().keySet()) {
                context.write(NullWritable.get(), new IntWritable(i));
            }
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();//設置MapReduce的配置
        conf.setInt("topn",5);

        String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
        if(otherArgs.length < 2){
            System.out.println("Usage: SortedTopN <in> [<in>...] <out>");
            System.exit(2);
        }

        //設置作業
        //Job job = new Job(conf);
        Job job = Job.getInstance(conf);
        job.setJarByClass(SortedTopN.class);
        job.setJobName("SortedTopN");
        //設置處理map,reduce的類
        job.setMapperClass(SortedTopNMapper.class);
        job.setReducerClass(SortedTopNReducer.class);
        //設置輸入輸出格式的處理
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //設定輸入輸出路徑
        for (int i = 0; i < otherArgs.length-1;++i){
            FileInputFormat.addInputPath(job,new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM