Hadoop基礎---MapReduce對數據進行排序


承接上文:Hadoop基礎---流量求和MapReduce程序及自定義數據類型

一:實驗數據

對上一篇文章中的數據進行排序處理:

13480253104    180    200    380
13502468823    102    7335    7437
13560439658    5892    400    6292
13600217502    186852    200    187052
13602846565    12    1938    1950
13660577991    9    6960    6969
13719199419    0    200    200
13726230503    2481    24681    27162
13760778710    120    200    320
13823070001    180    200    380
13826544101    0    200    200
13922314466    3008    3720    6728
13925057413    63    11058    11121
13926251106    0    200    200
13926435656    1512    200    1712
15013685858    27    3659    3686
15920133257    20    3156    3176
15989002119    3    1938    1941
18211575961    12    1527    1539
18320173382    18    9531    9549
84138413    4116    1432    5548

二:MapReduce程序編寫

(一)自定義數據結構FlowBean編寫

package cn.hadoop.mr.wc;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean> {
    private String phoneNB;
    private long up_flow;
    private long down_flow;
    private long sum_flow;
    
    public FlowBean() {}    //無參構造函數,用於反序列化時使用

    public FlowBean(String phoneNB, long up_flow, long down_flow) {
        this.phoneNB = phoneNB;
        this.up_flow = up_flow;
        this.down_flow = down_flow;
        this.sum_flow = up_flow + down_flow;
    }
    

    public String getPhoneNB() {
        return phoneNB;
    }

    public void setPhoneNB(String phoneNB) {
        this.phoneNB = phoneNB;
    }

    public long getUp_flow() {
        return up_flow;
    }

    public void setUp_flow(long up_flow) {
        this.up_flow = up_flow;
    }

    public long getDown_flow() {
        return down_flow;
    }

    public void setDown_flow(long down_flow) {
        this.down_flow = down_flow;
    }

    public long getSum_flow() {
        return up_flow + down_flow;
    }


    //用於序列化
    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeUTF(phoneNB);
        out.writeLong(up_flow);
        out.writeLong(down_flow);
        out.writeLong(up_flow+down_flow);
    }
    
    //用於反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        phoneNB = in.readUTF();
        up_flow = in.readLong();
        down_flow = in.readLong();
        sum_flow = in.readLong();
    }

 @Override public int compareTo(FlowBean o) {  //用於排序操作 return sum_flow > o.sum_flow ? -1 : 1; //返回值為-1,則排在前面  }

    @Override
    public String toString() {
        return "" + up_flow + "\t" + down_flow + "\t"+ sum_flow;
    }

    
}

(二)Map程序編寫

package cn.hadoop.rs;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import cn.hadoop.mr.wc.FlowBean;

public class ResSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, NullWritable>.Context context)
            throws IOException, InterruptedException {
        //獲取一行數據
        String line = value.toString();
        //進行文本分割
        String[] fields = StringUtils.split(line, '\t');
        //數據獲取
        String phoneNB = fields[0];
        long up_flow = Long.parseLong(fields[1]);
        long down_flow = Long.parseLong(fields[2]);
        
        context.write(new FlowBean(phoneNB, up_flow, down_flow), NullWritable.get());
    }
}

(三)Reduce程序編寫

package cn.hadoop.rs;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import cn.hadoop.mr.wc.FlowBean;

//會在reduce接收數據時,對key進行排序
public class ResSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{
    @Override
    protected void reduce(FlowBean key, Iterable<NullWritable> values,
            Reducer<FlowBean, NullWritable, Text, FlowBean>.Context context) throws IOException, InterruptedException {
        String phoneNB = key.getPhoneNB();
        context.write(new Text(phoneNB), key);
    }
}

注意:排序比較會在Reduce接收到key時進行排序,所以我們需要對輸入的key進行處理

(四)主函數進行調用

package cn.hadoop.rs;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.hadoop.mr.wc.FlowBean;

public class ResSortRunner {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(ResSortRunner.class);
        
        job.setMapperClass(ResSortMapper.class);
        job.setReducerClass(ResSortReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

(五)結果測試

hadoop jar rs.jar cn.hadoop.rs.ResSortRunner /fs/output1 /fs/output6

三:實現將兩個job在main中一次執行 

 

(一)修改main方法,實現連續調用兩個job

package cn.hadoop.rs;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.hadoop.fs.FlowSumMapper;
import cn.hadoop.fs.FlowSumReducer;
import cn.hadoop.fs.FlowSumRunner;
import cn.hadoop.mr.wc.FlowBean;

public class ResSortRunner {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf1 = new Configuration();
        Job job1 = Job.getInstance(conf1);
        
        job1.setJarByClass(FlowSumRunner.class);
        
        job1.setMapperClass(FlowSumMapper.class);
        job1.setReducerClass(FlowSumReducer.class);
        
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(FlowBean.class);
        
        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(FlowBean.class);
        
        FileInputFormat.setInputPaths(job1, new Path(args[0]));
        FileOutputFormat.setOutputPath(job1, new Path(args[1]));
        
        if(!job1.waitForCompletion(true)) {
            System.exit(1);
        }
        
        Configuration conf2 = new Configuration();
        Job job2 = Job.getInstance(conf2);
        
        job2.setJarByClass(ResSortRunner.class);
        
        job2.setMapperClass(ResSortMapper.class);
        job2.setReducerClass(ResSortReducer.class);
        
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(FlowBean.class);
        
        job2.setMapOutputKeyClass(FlowBean.class);
        job2.setMapOutputValueClass(NullWritable.class);
        
        FileInputFormat.setInputPaths(job2, new Path(args[1]));
        FileOutputFormat.setOutputPath(job2, new Path(args[2]));
        
        System.exit(job2.waitForCompletion(true)?0:1);
    }
}

(二)實驗測試,結果查看

 hadoop jar rs.jar  cn.hadoop.rs.ResSortRunner /fs/input /fs/outdata1 /fs/outdata2

(三)補充:使用時,不推薦這種方法。中間結果單獨輸出,使用shell將各個程序串聯,靈活性更大,更容易調試 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM