二次排序


二次排序

1 原理

    二次排序就是首先按照第一字段排序,然后再對第一字段相同的行按照第二字段排序,注意不能破壞第一次排序的結果。

    這里主要講如何使用一個Mapreduce就可以實現二次排序。Hadoop有自帶的SecondarySort程序,但這個程序只能對整數進行排序,所以我們需要對其進行改進,使其可以對任意字符串進行排序。下面會分別列出這兩個程序的詳解。   

    Hadoop自帶的例子中定義的map和reduce如下,關鍵是它對輸入輸出類型的定義:(java泛型編程)

        public static  class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>  
        public static class Reduce extends Reducer<IntPair, NullWritable,  IntWritable, IntWritable>

    在 map階段,使用job.setInputFormatClass定義的InputFormat將輸入的數據集分割成小數據塊splites,同時 InputFormat提供一個RecordReder的實現。本例子中使用的是TextInputFormat,他提供的RecordReder會將文 本的一行的行號作為key,這一行的文本作為value。這就是自定義Map的輸入是<LongWritable,   Text>的原因。然后調用自定義Map的map方法,將一個個<LongWritable,   Text>對輸入給Map的map方法。注意輸出應該符合自定義Map中定義的輸出<IntPair,   IntWritable>。最終是生成一個List<IntPair,   IntWritable>。在map階段的最后,會先調用job.setPartitionerClass對這個List進行分區,每個分區映射到 一個reducer。每個分區內又調用job.setSortComparatorClass設置的key比較函數類排序。可以看到,這本身就是一個二次 排序。如果沒有通過job.setSortComparatorClass設置key比較函數類,則使用key的實現的compareTo方法。在第一個 例子中,使用了IntPair實現的compareTo方法,而在下一個例子中,專門定義了key比較函數類。 
    在reduce階 段,reducer接收到所有映射到這個reducer的map輸出后,也是會調用job.setSortComparatorClass設置的key比 較函數類對所有數據對排序。然后開始構造一個key對應的value迭代器。這時就要用到分組,使用 jobjob.setGroupingComparatorClass設置的分組函數類。只要這個比較器比較的兩個key相同,他們就屬於同一個組,它們 的value放在一個value迭代器,而這個迭代器的key使用屬於同一個組的所有key的第一個key。最后就是進入Reducer的reduce方 法,reduce方法的輸入是所有的(key和它的value迭代器)。同樣注意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。  

 

2 Hadoop自帶的只對兩個整型自帶排序例子詳解

 

2.1 測試數據如下所示:

20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8

 

2.2 程序如下所示:(這里的程序是可以直接在Eclipse中提交任務的,需要的其它兩個Java文件可查看自己另外一篇帖子http://my.oschina.net/mkh/blog/340112

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job.JobState;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
import service.plugin.EJob;
 
public class SecondarySort{
    /**
     * @ClassName IntPair
     * @Description 定義IntPair對象,該對象實現WritableComparable接口,描述第一列和第二列數據,同時完成兩列數據的相關操作,這里是對二者進行比較
     * 
     */
    public static class IntPair implements WritableComparable<IntPair> {
        int first;
        int second;
 
        /**
         * Set the left and right values.
         */
        public void set(int left, int right) {
            first = left;
            second = right;
        }
 
        public int getFirst() {
            return first;
        }
 
        public int getSecond() {
            return second;
        }
 
        @Override
        // 反序列化,從流中的二進制轉換成IntPair
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            first = in.readInt();
            second = in.readInt();
        }
 
        @Override
        // 序列化,將IntPair轉化成使用流傳送的二進制
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            out.writeInt(first);
            out.writeInt(second);
        }
 
        @Override
        // key的比較
        public int compareTo(IntPair o) {
            // TODO Auto-generated method stub
            if (first != o.first) {
                return first < o.first ? -1 : 1;
            } else if (second != o.second) {
                return second < o.second ? -1 : 1;
            } else {
                return 0;
            }
        }
 
        // 新定義類應該重寫的兩個方法,不用這個方法好像也可以
        // @Override
        // The hashCode() method is used by the HashPartitioner (the default
        // partitioner in MapReduce)
        // public int hashCode() {
        // return first * 157 + second;
        // }
 
        @Override
        public boolean equals(Object right) {
            if (right == null)
                return false;
            if (this == right)
                return true;
            if (right instanceof IntPair) {
                IntPair r = (IntPair) right;
                return r.first == first && r.second == second;
            } else {
                return false;
            }
        }
    }
 
    /**
     * 分區函數類。根據first確定Partition。
     */
    public static class FirstPartitioner extends Partitioner<IntPair, IntWritable> {
        @Override
        public int getPartition(IntPair key, IntWritable value, int numPartitions) {
            System.out.println("FirstPartitioner-----------------------------------------------");
            System.out.println("Math.abs(key.getFirst() * 127) % numPartitions: " + Math.abs(key.getFirst() * 127) % numPartitions);
            return Math.abs(key.getFirst() * 127) % numPartitions;
        }
    }
 
    /**
     * 分組函數類。只要first相同就屬於同一個組。
     */
    /*
     * //第一種方法,實現接口RawComparator public static class GroupingComparator
     * implements RawComparator<IntPair> {
     * 
     * @Override public int compare(IntPair o1, IntPair o2) { int l =
     * o1.getFirst(); int r = o2.getFirst(); return l == r ? 0 : (l < r ? -1 :
     * 1); }
     * 
     * @Override //一個字節一個字節的比,直到找到一個不相同的字節,然后比這個字節的大小作為兩個字節流的大小比較結果。 public int
     * compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ // TODO
     * Auto-generated method stub return WritableComparator.compareBytes(b1, s1,
     * Integer.SIZE/8, b2, s2, Integer.SIZE/8); } }
     */
    // 第二種方法,繼承WritableComparator
    public static class GroupingComparator extends WritableComparator {
        protected GroupingComparator() {
            super(IntPair.class, true);
            System.out.println("GroupingComparator---------------------------------");
        }
 
        @Override
        // Compare two WritableComparables.
        public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            int l = ip1.getFirst();
            int r = ip2.getFirst();
            return l == r ? 0 : (l < r ? -1 : 1);
        }
    }
 
    /**
     * @ClassName Map
     * @Description 自定義map類,將每行數據進行分拆,第一列的數據存入left變量,第二列數據存入right變量
     *              在map階段的最后,會先調用job.setPartitionerClass對這個List進行分區,每個分區映射到一個reducer
     *              。每個分區內又調用job.setSortComparatorClass設置的key比較函數類排序。可以看到,這本身就是一個二次排序。
     */
    public static class Map extends
            Mapper<LongWritable, Text, IntPair, IntWritable> {
        private final IntPair intkey = new IntPair();
        private final IntWritable intvalue = new IntWritable();
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            // 調用java自己的工具類StringTokenizer(),將map輸入的每行字符串按規則進行分割成每個字符串,這些規則有\t\n\r\f,基本上分割的結果都可以保證到最細的字符串粒度
            StringTokenizer tokenizer = new StringTokenizer(line);
            int left = 0;
            int right = 0;
            if (tokenizer.hasMoreTokens()) {
                left = Integer.parseInt(tokenizer.nextToken());
                System.out.println("left: " + left);
                if (tokenizer.hasMoreTokens())
                    right = Integer.parseInt(tokenizer.nextToken());
                intkey.set(left, right);
                intvalue.set(right);
                context.write(intkey, intvalue);
            }
        }
    }
 
    // 自定義reduce
    public static class Reduce extends
            Reducer<IntPair, IntWritable, Text, IntWritable> {
        private final Text left = new Text();
        private static final Text SEPARATOR = new Text("------------------------------------------------");
        public void reduce(IntPair key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            context.write(SEPARATOR, null);
            System.out.println("------------------------------------------------");
            left.set(Integer.toString(key.getFirst()));
            for (IntWritable val : values) {
                System.out.println("reduce: left " + left + "    , val " + val);
                context.write(left, val);
            }
        }
    }
 
    /**
     * @param args
     */
    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        // 讀取hadoop配置
        File jarFile = EJob.createTempJar("bin");
        ClassLoader classLoader = EJob.getClassLoader();
        Thread.currentThread().setContextClassLoader(classLoader);
 
        Configuration conf = new Configuration(true);
        String[] otherArgs = new String[2];
        otherArgs[0] = "hdfs://192.168.1.100:9000/test_in/secondary_sort_data.txt";
        String time = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
        otherArgs[1] = "hdfs://192.168.1.100:9000/test_out/mr-" + time;
 
        Job job = new Job(conf, "secondarysort");
        job.setJarByClass(SecondarySort.class);
        ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
        job.setMapperClass(Map.class);
        // 不再需要Combiner類型,因為Combiner的輸出類型<Text,
        // IntWritable>對Reduce的輸入類型<IntPair, IntWritable>不適用
        // job.setCombinerClass(Reduce.class);
 
        // 分區函數
        job.setPartitionerClass(FirstPartitioner.class);
        // 分組函數
        job.setGroupingComparatorClass(GroupingComparator.class);
 
        // Reducer類型
        job.setReducerClass(Reduce.class);
 
        // map輸出Key的類型
        job.setMapOutputKeyClass(IntPair.class);
        // map輸出Value的類型
        job.setMapOutputValueClass(IntWritable.class);
        // reduce輸出Key的類型,是Text,因為使用的OutputFormatClass是TextOutputFormat
        job.setOutputKeyClass(Text.class);
        // reduce輸出Value的類型
        job.setOutputValueClass(IntWritable.class);
 
        // 將輸入的數據集分割成小數據塊splites,同時提供一個RecordReder的實現。
        job.setInputFormatClass(TextInputFormat.class);
        // 提供一個RecordWriter的實現,負責數據輸出。
        job.setOutputFormatClass(TextOutputFormat.class);
 
        FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
 
        // 提交job
        if (job.waitForCompletion(false)) {
            System.out.println("job ok !");
        } else {
            System.out.println("job error !");
        }
    }
}

 

2.3 執行結果如下所示:

------------------------------------------------
1    2
------------------------------------------------
3    4
------------------------------------------------
5    6
------------------------------------------------
7    8
7    82
------------------------------------------------
12    211
------------------------------------------------
20    21
20    53
20    522
------------------------------------------------
31    42
------------------------------------------------
40    511
------------------------------------------------
50    51
50    52
50    53
50    53
50    54
50    62
50    512
50    522
------------------------------------------------
60    51
60    52
60    53
60    56
60    56
60    57
60    57
60    61
------------------------------------------------
63    61
------------------------------------------------
70    54
70    55
70    56
70    57
70    58
70    58
------------------------------------------------
71    55
71    56
------------------------------------------------
73    57
------------------------------------------------
74    58
------------------------------------------------
203    21
------------------------------------------------
530    54
------------------------------------------------
730    54
------------------------------------------------
740    58


3 改進后的二次排序(可對字符串進行排序)

 

3.1 測試數據如下所示:

import java
import java
import java
import java
            
import1 org
import org1
import1 org
import2 org2
import org
import2 org1
import1 org
import1 org
import org2
import2 org3
            org
import org
import1 org
importin org
import org
hello time


3.2 程序如下所示:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
import service.plugin.EJob;
 
public class SecondarySortString {
    // 自己定義的key類應該實現WritableComparable接口
    public static class IntPair implements WritableComparable<IntPair> {
        String first;
        String second;
 
        /**
         * Set the left and right values.
         */
        public void set(String left, String right) {
            first = left;
            second = right;
        }
 
        public String getFirst() {
            return first;
        }
 
        public String getSecond() {
            return second;
        }
 
        // 反序列化,從流中的二進制轉換成IntPair
        public void readFields(DataInput in) throws IOException {
            first = in.readUTF();
            second = in.readUTF();
        }
 
        // 序列化,將IntPair轉化成使用流傳送的二進制
        public void write(DataOutput out) throws IOException {
            out.writeUTF(first);
            out.writeUTF(second);
        }
 
        // 重載 compareTo 方法,進行組合鍵 key 的比較,該過程是默認行為。
        // 分組后的二次排序會隱式調用該方法。
        public int compareTo(IntPair o) {
            if (!first.equals(o.first)) {
                return first.compareTo(o.first);
            } else if (!second.equals(o.second)) {
                return second.compareTo(o.second);
            } else {
                return 0;
            }
        }
 
        // 新定義類應該重寫的兩個方法
        // The hashCode() method is used by the HashPartitioner (the default
        // partitioner in MapReduce)
        public int hashCode() {
            return first.hashCode() * 157 + second.hashCode();
        }
 
        public boolean equals(Object right) {
            if (right == null)
                return false;
            if (this == right)
                return true;
            if (right instanceof IntPair) {
                IntPair r = (IntPair) right;
                return r.first.equals(first) && r.second.equals(second);
            } else {
                return false;
            }
        }
    }
 
    /**
     * 分區函數類。根據first確定Partition。
     */
    public static class FirstPartitioner extends Partitioner<IntPair, Text> {
        public int getPartition(IntPair key, Text value, int numPartitions) {
            return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
        }
    }
 
    /**
     * 分組函數類。只要first相同就屬於同一個組。
     */
    /*
     * //第一種方法,實現接口RawComparator public static class GroupingComparator
     * implements RawComparator<IntPair> { public int compare(IntPair o1,
     * IntPair o2) { int l = o1.getFirst(); int r = o2.getFirst(); return l == r
     * ? 0 : (l < r ? -1 : 1); }
     * //一個字節一個字節的比,直到找到一個不相同的字節,然后比這個字節的大小作為兩個字節流的大小比較結果。 public int
     * compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ return
     * WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2,
     * Integer.SIZE/8); } }
     */
    // 第二種方法,繼承WritableComparator
    public static class GroupingComparator extends WritableComparator {
        protected GroupingComparator() {
            super(IntPair.class, true);
        }
        // Compare two WritableComparables.
        // 重載 compare:對組合鍵按第一個自然鍵排序分組
        public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            String l = ip1.getFirst();
            String r = ip2.getFirst();
            return l.compareTo(r);
        }
    }
 
    // 自定義map
    public static class Map extends Mapper<LongWritable, Text, IntPair, Text> {
        private final IntPair keyPair = new IntPair();
        String[] lineArr = null;
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            if(line.isEmpty()){
                return;
            }
            lineArr = line.split(" ", -1);
            keyPair.set(lineArr[0], lineArr[1]);
            context.write(keyPair, value);
        }
    }
 
    // 自定義reduce
    public static class Reduce extends Reducer<IntPair, Text, Text, Text> {
        private static final Text SEPARATOR = new Text("------------------------------------------------");
        public void reduce(IntPair key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            context.write(SEPARATOR, null);
            for (Text val : values) {
                context.write(null, val);
            }
        }
    }
 
    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        File jarFile = EJob.createTempJar("bin");
        ClassLoader classLoader = EJob.getClassLoader();
        Thread.currentThread().setContextClassLoader(classLoader);
 
        Configuration conf = new Configuration(true);
        String[] otherArgs = new String[2];
        otherArgs[0] = "hdfs://192.168.1.100:9000/data/test_in/secondary_sort_data_string.txt";
        String time = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
        otherArgs[1] = "hdfs://192.168.1.100:9000/data/test_out/mr-" + time;
         
        // 實例化一道作業
        Job job = new Job(conf, "secondarysort");
        job.setJarByClass(SecondarySort.class);
         
        ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
         
        // Mapper類型
        job.setMapperClass(Map.class);
        // 不再需要Combiner類型,因為Combiner的輸出類型<Text,
        // IntWritable>對Reduce的輸入類型<IntPair, IntWritable>不適用
        // job.setCombinerClass(Reduce.class);
        // Reducer類型
        job.setReducerClass(Reduce.class);
        // 分區函數
        job.setPartitionerClass(FirstPartitioner.class);
        // 分組函數
        job.setGroupingComparatorClass(GroupingComparator.class);
 
        // map 輸出Key的類型
        job.setMapOutputKeyClass(IntPair.class);
        // map輸出Value的類型
        job.setMapOutputValueClass(Text.class);
        // rduce輸出Key的類型,是Text,因為使用的OutputFormatClass是TextOutputFormat
        job.setOutputKeyClass(Text.class);
        // rduce輸出Value的類型
        job.setOutputValueClass(Text.class);
 
        // 將輸入的數據集分割成小數據塊splites,同時提供一個RecordReder的實現。
        job.setInputFormatClass(TextInputFormat.class);
        // 提供一個RecordWriter的實現,負責數據輸出。
        job.setOutputFormatClass(TextOutputFormat.class);
 
        // 輸入hdfs路徑
        FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
        // 輸出hdfs路徑
//        FileSystem.get(conf).delete(new Path(args[1]), true);
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        // 提交job
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

3.3 執行結果如下所示:

------------------------------------------------
            org
            
------------------------------------------------
hello time
------------------------------------------------
import java
import java
import java
import java
import org
import org
import org
import org1
import org2
------------------------------------------------
import1 org
import1 org
import1 org
import1 org
import1 org
------------------------------------------------
import2 org1
import2 org2
import2 org3
------------------------------------------------
importin org

 

  原創文章歡迎轉載,轉載時請注明出處。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM