Mapreduce實例——二次排序


原理

Map階段,使用job.setInputFormatClass定義的InputFormat將輸入的數據集分割成小數據塊splites,同時InputFormat提供一個RecordReder的實現。本實驗中使用的是TextInputFormat,他提供的RecordReder會將文本的字節偏移量作為key,這一行的文本作為value。這就是自定義Map的輸入是<LongWritable, Text>的原因。然后調用自定義Mapmap方法,將一個個<LongWritable, Text>鍵值對輸入給Mapmap方法。注意輸出應該符合自定義Map中定義的輸出<IntPair, IntWritable>。最終是生成一個List<IntPair, IntWritable>。在map階段的最后,會先調用job.setPartitionerClass對這個List進行分區,每個分區映射到一個reducer。每個分區內又調用job.setSortComparatorClass設置的key比較函數類排序。可以看到,這本身就是一個二次排序。如果沒有通過job.setSortComparatorClass設置key比較函數類,則可以使用key實現的compareTo方法進行排序。在本實驗中,就使用了IntPair實現的compareTo方法。

Reduce階段,reducer接收到所有映射到這個reducermap輸出后,也是會調用job.setSortComparatorClass設置的key比較函數類對所有數據對排序。然后開始構造一個key對應的value迭代器。這時就要用到分組,使用job.setGroupingComparatorClass設置的分組函數類。只要這個比較器比較的兩個key相同,他們就屬於同一個組,它們的value放在一個value迭代器,而這個迭代器的key使用屬於同一個組的所有key的第一個key。最后就是進入Reducerreduce方法,reduce方法的輸入是所有的(key和它的value迭代器)。同樣注意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。

環境

Linux Ubuntu 14.04

jdk-7u75-linux-x64

hadoop-2.6.0-cdh5.4.5

hadoop-2.6.0-eclipse-cdh5.4.5.jar

eclipse-java-juno-SR2-linux-gtk-x86_64

內容

在電商網站中,用戶進入頁面瀏覽商品時會產生訪問日志,記錄用戶對商品的訪問情況,現有goods_visit2表,包含(goods_id,click_num)兩個字段,數據內容如下:

  1. goods_id click_num  
  2. 1010037 100  
  3. 1010102 100  
  4. 1010152 97  
  5. 1010178 96  
  6. 1010280 104  
  7. 1010320 103  
  8. 1010510 104  
  9. 1010603 96  
  10. 1010637 97  

編寫MapReduce代碼,功能為根據商品的點擊次數(click_num)進行降序排序,再根據goods_id升序排序,並輸出所有商品。

輸出結果如下:

  1. 點擊次數 商品id  
  2. ------------------------------------------------  
  3. 104 1010280  
  4. 104 1010510  
  5. ------------------------------------------------  
  6. 103 1010320  
  7. ------------------------------------------------  
  8. 100 1010037  
  9. 100 1010102  
  10. ------------------------------------------------  
  11. 97  1010152  
  12. 97  1010637  
  13. ------------------------------------------------  
  14. 96  1010178  
  15. 96  1010603  

實驗步驟

1.切換到/apps/hadoop/sbin目錄下,開啟Hadoop

  1. cd /apps/hadoop/sbin  
  2. ./start-all.sh  

2.Linux本地新建/data/mapreduce8目錄。

  1. mkdir -p /data/mapreduce8  

3.Linux中切換到/data/mapreduce8目錄下,用wget命令從http://192.168.1.100:60000/allfiles/mapreduce8/goods_visit2網址上下載文本文件goods_visit2

  1. cd /data/mapreduce8  
  2. wget http://192.168.1.100:60000/allfiles/mapreduce8/goods_visit2  

然后在當前目錄下用wget命令從http://192.168.1.100:60000/allfiles/mapreduce8/hadoop2lib.tar.gz網址上下載項目用到的依賴包。

  1. wget http://192.168.1.100:60000/allfiles/mapreduce8/hadoop2lib.tar.gz  

hadoop2lib.tar.gz解壓到當前目錄下。

  1. tar zxvf hadoop2lib.tar.gz  

4.首先在HDFS上新建/mymapreduce8/in目錄,然后將Linux本地/data/mapreduce8目錄下的goods_visit2文件導入到HDFS/mymapreduce8/in目錄中。

  1. hadoop fs -mkdir -p /mymapreduce8/in  
  2. hadoop fs -put /data/mapreduce8/goods_visit2 /mymapreduce8/in  

5.新建Java Project項目,項目名為mapreduce8

mapreduce8項目下新建一個package包,包名為mapreduce

mapreducepackage包下新建一個SecondarySort類。

6.添加項目所需依賴的jar包,右鍵單擊mapreduce8,新建一個文件夾hadoop2lib,用於存放項目所需的jar包。

/data/mapreduce8目錄下,hadoop2lib目錄中的jar包,拷貝到eclipsemapreduce8項目的hadopo2lib目錄下。

選中hadoop2lib目錄下所有jar包,並添加到Build Path中。

7.編寫Java代碼,並描述其設計思路

二次排序:在mapreduce中,所有的key是需要被比較和排序的,並且是二次,先根據partitioner,再根據大小。而本例中也是要比較兩次。先按照第一字段排序,然后在第一字段相同時按照第二字段排序。根據這一點,我們可以構造一個復合類IntPair,他有兩個字段,先利用分區對第一字段排序,再利用分區內的比較對第二字段排序。Java代碼主要分為四部分:自定義key,自定義分區函數類,map部分,reduce部分。

自定義key的代碼:

 

  1. public static class IntPair implements WritableComparable<IntPair>  
  2.     {  
  3.     int first;  //第一個成員變量  
  4.     int second;  //第二個成員變量  
  5.     
  6.     public void set(int left, int right)  
  7.     {  
  8.     first = left;  
  9.     second = right;  
  10.     }  
  11.     public int getFirst()  
  12.     {  
  13.     return first;  
  14.     }  
  15.     public int getSecond()  
  16.     {  
  17.     return second;  
  18.     }  
  19.     @Override  
  20.     //反序列化,從流中的二進制轉換成IntPair  
  21.     public void readFields(DataInput inthrows IOException  
  22.     {  
  23.     // TODO Auto-generated method stub  
  24.     first = in.readInt();  
  25.     second = in.readInt();  
  26.     }  
  27.     @Override  
  28.     //序列化,將IntPair轉化成使用流傳送的二進制  
  29.     public void write(DataOutput out) throws IOException  
  30.     {  
  31.     // TODO Auto-generated method stub  
  32.     out.writeInt(first);  
  33.     out.writeInt(second);  
  34.     }  
  35.     @Override  
  36.     //key的比較  
  37.     public int compareTo(IntPair o)  
  38.     {  
  39.     // TODO Auto-generated method stub  
  40.     if (first != o.first)  
  41.     {  
  42.     return first < o.first ? 1 : -1;  
  43.     }  
  44.     else if (second != o.second)  
  45.     {  
  46.     return second < o.second ? -1 : 1;  
  47.     }  
  48.     else  
  49.     {  
  50.     return 0;  
  51.     }  
  52.     }  
  53.     @Override  
  54.     public int hashCode()  
  55.     {  
  56.     return first * 157 + second;  
  57.     }  
  58.     @Override  
  59.     public boolean equals(Object right)  
  60.     {  
  61.     if (right == null)  
  62.     return false;  
  63.     if (this == right)  
  64.     return true;  
  65.     if (right instanceof IntPair)  
  66.     {  
  67.     IntPair r = (IntPair) right;  
  68.     return r.first == first && r.second == second;  
  69.     }  
  70.     else  
  71.     {  
  72.     return false;  
  73.     }  
  74.     }  
  75.     }  

所有自定義的key應該實現接口WritableComparable,因為是可序列的並且可比較的,並重載方法。該類中包含以下幾種方法:1.反序列化,從流中的二進制轉換成IntPair 方法為public void readFields(DataInput in) throws IOException 2.序列化,將IntPair轉化成使用流傳送的二進制方法為public void write(DataOutput out)3. key的比較 public int compareTo(IntPair o) 另外新定義的類應該重寫的兩個方法 public int hashCode() public boolean equals(Object right)

分區函數類代碼

  1. public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>  
  2.    {  
  3.        @Override  
  4.        public int getPartition(IntPair key, IntWritable value,int numPartitions)  
  5.        {  
  6.            return Math.abs(key.getFirst() * 127) % numPartitions;  
  7.        }  
  8.    }  

key進行分區,根據自定義keyfirst乘以127取絕對值在對numPartions取余來進行分區。這主要是為實現了第一次排序。按分區分。

分組函數類代碼

  1. public static class GroupingComparator extends WritableComparator  
  2.    {  
  3.        protected GroupingComparator()  
  4.        {  
  5.            super(IntPair.classtrue);  
  6.        }  
  7.        @Override  
  8.        //Compare two WritableComparables.  
  9.        public int compare(WritableComparable w1, WritableComparable w2)  
  10.        {  
  11.            IntPair ip1 = (IntPair) w1;  
  12.            IntPair ip2 = (IntPair) w2;  
  13.            int l = ip1.getFirst();  
  14.            int r = ip2.getFirst();  
  15.            return l == r ? 0 : (l < r ? -1 : 1);  
  16.        }  
  17.    }  

分組函數類。在reduce階段,構造一個key對應的value迭代器的時候,只要first相同就屬於同一個組,放在一個value迭代器。這是一個比較器,需要繼承WritableComparator

map代碼:

  1. public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>  
  2.    {  
  3.       //自定義map  
  4.        private final IntPair intkey = new IntPair();  
  5.        private final IntWritable intvalue = new IntWritable();  
  6.        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException  
  7.        {  
  8.            String line = value.toString();  
  9.            StringTokenizer tokenizer = new StringTokenizer(line);  
  10.            int left = 0;  
  11.            int right = 0;  
  12.            if (tokenizer.hasMoreTokens())  
  13.            {  
  14.                left = Integer.parseInt(tokenizer.nextToken());  
  15.                if (tokenizer.hasMoreTokens())  
  16.                    right = Integer.parseInt(tokenizer.nextToken());  
  17.                intkey.set(right, left);  
  18.                intvalue.set(left);  
  19.                context.write(intkey, intvalue);  
  20.            }  
  21.        }  
  22.    }  

map階段,使用job.setInputFormatClass定義的InputFormat將輸入的數據集分割成小數據塊splites,同時InputFormat提供一個RecordReder的實現。本例子中使用的是TextInputFormat,他提供的RecordReder會將文本的一行的行號作為key,這一行的文本作為value。這就是自定義Map的輸入是<LongWritable, Text>的原因。然后調用自定義Mapmap方法,將一個個<LongWritable, Text>鍵值對輸入給Mapmap方法。注意輸出應該符合自定義Map中定義的輸出<IntPair, IntWritable>。最終是生成一個List<IntPair, IntWritable>。在map階段的最后,會先調用job.setPartitionerClass對這個List進行分區,每個分區映射到一個reducer。每個分區內又調用job.setSortComparatorClass設置的key比較函數類排序。可以看到,這本身就是一個二次排序。如果沒有通過job.setSortComparatorClass設置key比較函數類,則使用key的實現的compareTo方法。在本例子中,使用了IntPair實現的compareTo方法。

Reduce代碼:

  1. public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable>  
  2.     {  
  3.         private final Text left = new Text();  
  4.         private static final Text SEPARATOR = new Text("------------------------------------------------");  
  5.     
  6.         public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException  
  7.     {  
  8.     context.write(SEPARATOR, null);  
  9.     left.set(Integer.toString(key.getFirst()));  
  10.     System.out.println(left);  
  11.     for (IntWritable val : values)  
  12.     {  
  13.     context.write(left, val);  
  14.     //System.out.println(val);  
  15.     }  
  16.     }  
  17.     }  

reduce階段,reducer接收到所有映射到這個reducermap輸出后,也是會調用job.setSortComparatorClass設置的key比較函數類對所有數據對排序。然后開始構造一個key對應的value迭代器。這時就要用到分組,使用job.setGroupingComparatorClass設置的分組函數類。只要這個比較器比較的兩個key相同,他們就屬於同一個組,它們的value放在一個value迭代器,而這個迭代器的key使用屬於同一個組的所有key的第一個key。最后就是進入Reducerreduce方法,reduce方法的輸入是所有的(key和它的value迭代器)。同樣注意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。

完整代碼:

  1. package mapreduce;  
  2. import java.io.DataInput;  
  3. import java.io.DataOutput;  
  4. import java.io.IOException;  
  5. import java.util.StringTokenizer;  
  6. import org.apache.hadoop.conf.Configuration;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.IntWritable;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.io.WritableComparable;  
  12. import org.apache.hadoop.io.WritableComparator;  
  13. import org.apache.hadoop.mapreduce.Job;  
  14. import org.apache.hadoop.mapreduce.Mapper;  
  15. import org.apache.hadoop.mapreduce.Partitioner;  
  16. import org.apache.hadoop.mapreduce.Reducer;  
  17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  18. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  20. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  21. public class SecondarySort  
  22. {  
  23.     
  24.     public static class IntPair implements WritableComparable<IntPair>  
  25.     {  
  26.     int first;  
  27.     int second;  
  28.     
  29.     public void set(int left, int right)  
  30.     {  
  31.     first = left;  
  32.     second = right;  
  33.     }  
  34.     public int getFirst()  
  35.     {  
  36.     return first;  
  37.     }  
  38.     public int getSecond()  
  39.     {  
  40.     return second;  
  41.     }  
  42.     @Override  
  43.     
  44.     public void readFields(DataInput inthrows IOException  
  45.     {  
  46.     // TODO Auto-generated method stub  
  47.     first = in.readInt();  
  48.     second = in.readInt();  
  49.     }  
  50.     @Override  
  51.     
  52.     public void write(DataOutput out) throws IOException  
  53.     {  
  54.     // TODO Auto-generated method stub  
  55.     out.writeInt(first);  
  56.     out.writeInt(second);  
  57.     }  
  58.     @Override  
  59.     
  60.     public int compareTo(IntPair o)  
  61.     {  
  62.     // TODO Auto-generated method stub  
  63.     if (first != o.first)  
  64.     {  
  65.     return first < o.first ? 1 : -1;  
  66.     }  
  67.     else if (second != o.second)  
  68.     {  
  69.     return second < o.second ? -1 : 1;  
  70.     }  
  71.     else  
  72.     {  
  73.     return 0;  
  74.     }  
  75.     }  
  76.     @Override  
  77.     public int hashCode()  
  78.     {  
  79.     return first * 157 + second;  
  80.     }  
  81.     @Override  
  82.     public boolean equals(Object right)  
  83.     {  
  84.     if (right == null)  
  85.     return false;  
  86.     if (this == right)  
  87.     return true;  
  88.     if (right instanceof IntPair)  
  89.     {  
  90.     IntPair r = (IntPair) right;  
  91.     return r.first == first && r.second == second;  
  92.     }  
  93.     else  
  94.     {  
  95.     return false;  
  96.     }  
  97.     }  
  98.     }  
  99.     
  100.     public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>  
  101.     {  
  102.     @Override  
  103.     public int getPartition(IntPair key, IntWritable value,int numPartitions)  
  104.     {  
  105.     return Math.abs(key.getFirst() * 127) % numPartitions;  
  106.     }  
  107.     }  
  108.     public static class GroupingComparator extends WritableComparator  
  109.     {  
  110.     protected GroupingComparator()  
  111.     {  
  112.     super(IntPair.classtrue);  
  113.     }  
  114.     @Override  
  115.     //Compare two WritableComparables.  
  116.     public int compare(WritableComparable w1, WritableComparable w2)  
  117.     {  
  118.     IntPair ip1 = (IntPair) w1;  
  119.     IntPair ip2 = (IntPair) w2;  
  120.     int l = ip1.getFirst();  
  121.     int r = ip2.getFirst();  
  122.     return l == r ? 0 : (l < r ? -1 : 1);  
  123.     }  
  124.     }  
  125.     public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>  
  126.     {  
  127.     private final IntPair intkey = new IntPair();  
  128.     private final IntWritable intvalue = new IntWritable();  
  129.     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException  
  130.     {  
  131.     String line = value.toString();  
  132.     StringTokenizer tokenizer = new StringTokenizer(line);  
  133.     int left = 0;  
  134.     int right = 0;  
  135.     if (tokenizer.hasMoreTokens())  
  136.     {  
  137.     left = Integer.parseInt(tokenizer.nextToken());  
  138.     if (tokenizer.hasMoreTokens())  
  139.     right = Integer.parseInt(tokenizer.nextToken());  
  140.     intkey.set(right, left);  
  141.     intvalue.set(left);  
  142.     context.write(intkey, intvalue);  
  143.     }  
  144.     }  
  145.     }  
  146.     
  147.     public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable>  
  148.     {  
  149.     private final Text left = new Text();  
  150.     private static final Text SEPARATOR = new Text("------------------------------------------------");  
  151.     
  152.     public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException  
  153.     {  
  154.     context.write(SEPARATOR, null);  
  155.     left.set(Integer.toString(key.getFirst()));  
  156.     System.out.println(left);  
  157.     for (IntWritable val : values)  
  158.     {  
  159.     context.write(left, val);  
  160.     //System.out.println(val);  
  161.     }  
  162.     }  
  163.     }  
  164.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException  
  165.     {  
  166.     
  167.     Configuration conf = new Configuration();  
  168.     Job job = new Job(conf, "secondarysort");  
  169.     job.setJarByClass(SecondarySort.class);  
  170.     job.setMapperClass(Map.class);  
  171.     job.setReducerClass(Reduce.class);  
  172.     job.setPartitionerClass(FirstPartitioner.class);  
  173.     
  174.     job.setGroupingComparatorClass(GroupingComparator.class);  
  175.     job.setMapOutputKeyClass(IntPair.class);  
  176.     
  177.     job.setMapOutputValueClass(IntWritable.class);  
  178.     
  179.     job.setOutputKeyClass(Text.class);  
  180.     
  181.     job.setOutputValueClass(IntWritable.class);  
  182.     
  183.     job.setInputFormatClass(TextInputFormat.class);  
  184.     
  185.     job.setOutputFormatClass(TextOutputFormat.class);  
  186.     String[] otherArgs=new String[2];  
  187.     otherArgs[0]="hdfs://localhost:9000/mymapreduce8/in/goods_visit2";  
  188.     otherArgs[1]="hdfs://localhost:9000/mymapreduce8/out";  
  189.     
  190.     FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));  
  191.     
  192.     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
  193.     
  194.     System.exit(job.waitForCompletion(true) ? 0 : 1);  
  195.     }  
  196.     }  

8.SecondarySort類文件中,右鍵並點擊=>Run As=>Run on Hadoop選項。

9.待執行完畢后,進入命令模式,在hdfs上從Java代碼指定的輸出路徑中查看實驗結果。

  1. hadoop fs -ls /mymapreduce8/out  
  2. hadoop fs -cat /mymapreduce8/out/part-r-00000  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM