Hadoop日記Day17---計數器、map規約、分區學習


一、Hadoop計數器

1.1 什么是Hadoop計數器

  Haoop是處理大數據的,不適合處理小數據,有些大數據問題是小數據程序是處理不了的,他是一個高延遲的任務,有時處理一個大數據需要花費好幾個小時這都是正常的。下面我們說一下Hadoop計數器,Hadoop計數器就相當於我們的日志,而日志可以讓我們查看程序運行時的很多狀態,而計數器也有這方面的作用。那么就研究一下Hadoop自身的計數器。計數器的程序如代碼1.1所示,下面代碼還是以內容為“hello you;hell0 me”的單詞統計為例。

 1 package counter;
 2 
 3 import java.net.URI;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.FileSystem;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Counter;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
19 
20 public class WordCountApp {
21     static final String INPUT_PATH = "hdfs://hadoop:9000/input";
22     static final String OUT_PATH = "hdfs://hadoop:9000/output";
23     
24     public static void main(String[] args) throws Exception {
25         
26         Configuration conf = new Configuration();
27         
28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
29         final Path outPath = new Path(OUT_PATH);
30         
31         if(fileSystem.exists(outPath)){
32             fileSystem.delete(outPath, true);
33         }        
34         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
35         
36         //1.1指定讀取的文件位於哪里
37         FileInputFormat.setInputPaths(job, INPUT_PATH);        
38         job.setInputFormatClass(TextInputFormat.class);//指定如何對輸入文件進行格式化,把輸入文件每一行解析成鍵值對
39         
40         //1.2 指定自定義的map類
41         job.setMapperClass(MyMapper.class);
42         job.setMapOutputKeyClass(Text.class);//map輸出的<k,v>類型。
43         job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的類型與<k2,v2>類型一致,則可以省略
44         
45         //1.3 分區
46         job.setPartitionerClass(HashPartitioner.class);        
47         job.setNumReduceTasks(1);//有一個reduce任務運行                
48         
49         //2.2 指定自定義reduce類
50         job.setReducerClass(MyReducer.class);
51         
52         job.setOutputKeyClass(Text.class);//指定reduce的輸出類型
53         job.setOutputValueClass(LongWritable.class);
54         
55         //2.3 指定寫出到哪里
56         FileOutputFormat.setOutputPath(job, outPath);        
57         job.setOutputFormatClass(TextOutputFormat.class);//指定輸出文件的格式化類
58                 
59         job.waitForCompletion(true);//把job提交給JobTracker運行
60     }
61     
62     /**
63      * KEYIN    即k1        表示行的偏移量
64      * VALUEIN    即v1        表示行文本內容
65      * KEYOUT    即k2        表示行中出現的單詞
66      * VALUEOUT    即v2        表示行中出現的單詞的次數,固定值1
67      */
68     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
69         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {            
70             final String line = v1.toString();        
71             final String[] splited = line.split("\t");
72             for (String word : splited) {
73                 context.write(new Text(word), new LongWritable(1));
74             }
75         };
76     }
77     
78     /**
79      * KEYIN    即k2        表示行中出現的單詞
80      * VALUEIN    即v2        表示行中出現的單詞的次數
81      * KEYOUT    即k3        表示文本中出現的不同單詞
82      * VALUEOUT    即v3        表示文本中出現的不同單詞的總次數
83      *
84      */
85     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
86         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
87             long times = 0L;
88             for (LongWritable count : v2s) {
89                 times += count.get();
90             }
91             ctx.write(k2, new LongWritable(times));
92         };
93     }
94         
95 }
View Code

代碼 1.1

  運行結果如下圖1.1所示。

Counters: 19//Counter表示計數器,19表示有19個計數器(下面一共4計數器組)
   File Output Format Counters //文件輸出格式化計數器組
     Bytes Written=19       //reduce輸出到hdfs的字節數,一共19個字節
   FileSystemCounters//文件系統計數器組
     FILE_BYTES_READ=481
     HDFS_BYTES_READ=38
     FILE_BYTES_WRITTEN=81316
     HDFS_BYTES_WRITTEN=19 File Input Format Counters //文件輸入格式化計數器組
     Bytes Read=19     //map從hdfs讀取的字節數
   Map-Reduce Framework//MapReduce框架
     Map output materialized bytes=49 Map input records=2       //map讀入的記錄行數,讀取兩行記錄,”hello you”,”hello me”
     Reduce shuffle bytes=0//規約分區的字節數
     Spilled Records=8
     Map output bytes=35
     Total committed heap usage (bytes)=266469376
     SPLIT_RAW_BYTES=105 Combine input records=0//合並輸入的記錄數
     Reduce input records=4     //reduce從map端接收的記錄行數
     Reduce input groups=3     //reduce函數接收的key數量,即歸並后的k2數量
     Combine output records=0//合並輸出的記錄數
     Reduce output records=3    //reduce輸出的記錄行數。<helllo,{1,1}>,<you,{1}>,<me,{1}>
     Map output records=4     //map輸出的記錄行數,輸出4行記錄

圖 1.1

  通過上面我們對計數器的分析,可以知道,我們可以通過計數器來分析MapReduece程序的運行狀態。

1.2 自定義計數器

  通過上面的分析,我們了解了計數器的作用,那么我們可以自定義一個計數器,來實現我們自己想要的功能。定義一個記錄敏感詞的計數器,記錄敏感詞在一行所出現的次數,如代碼2.1所示。我們處理文件內容為“hello you”,“hello me”。

 1 Counters: 19//Counter表示計數器,19表示有19個計數器(下面一共4計數器組)
 2    File Output Format Counters //文件輸出格式化計數器組
 3      Bytes Written=19       //reduce輸出到hdfs的字節數,一共19個字節
 4    FileSystemCounters//文件系統計數器組
 5      FILE_BYTES_READ=481
 6      HDFS_BYTES_READ=38
 7      FILE_BYTES_WRITTEN=81316
 8      HDFS_BYTES_WRITTEN=19
 9    File Input Format Counters //文件輸入格式化計數器組
10      Bytes Read=19     //map從hdfs讀取的字節數
11    Map-Reduce Framework//MapReduce框架
12      Map output materialized bytes=49
13      Map input records=2       //map讀入的記錄行數,讀取兩行記錄,”hello you”,”hello me”
14      Reduce shuffle bytes=0//規約分區的字節數
15      Spilled Records=8
16      Map output bytes=35
17      Total committed heap usage (bytes)=266469376
18      SPLIT_RAW_BYTES=105
19      Combine input records=0//合並輸入的記錄數
20      Reduce input records=4     //reduce從map端接收的記錄行數
21      Reduce input groups=3     //reduce函數接收的key數量,即歸並后的k2數量
22      Combine output records=0//合並輸出的記錄數
23      Reduce output records=3    //reduce輸出的記錄行數。<helllo,{1,1}>,<you,{1}>,<me,{1}>
24      Map output records=4     //map輸出的記錄行數,輸出4行記錄
View Code

代碼2.1

運行結果如下圖2.1所示。

 Counters: 20 Sensitive Words
     hello=2 File Output Format Counters 
     Bytes Written=21 FileSystemCounters
     FILE_BYTES_READ=359
     HDFS_BYTES_READ=42
     FILE_BYTES_WRITTEN=129080
     HDFS_BYTES_WRITTEN=21 File Input Format Counters 
     Bytes Read=21 Map-Reduce Framework
     Map output materialized bytes=67
     Map input records=2
     Reduce shuffle bytes=0
     Spilled Records=8
     Map output bytes=53
     Total committed heap usage (bytes)=391774208
     SPLIT_RAW_BYTES=95 Combine input records=0
     Reduce input records=4
     Reduce input groups=3 Combine output records=0
     Reduce output records=3
     Map output records=4

圖 2.1

二、Combiners編程

2.1 什么是Combiners

  從上面程序運行的結果我們可以發現,在Map-Reduce Framework即MapReduce框架的輸出中,Combine input records這個字段為零, 那么combine怎么使用呢?其實這是MapReduce程序中Mapper任務中第五步這是可選的一步,使用方法非常簡單,以上面單詞統計為例,只需添加下面一行代碼即可,如下: job.setCombinerClass(MyReducer.class);

  combine操作是一個可選的操作,使用時需要我們自己設定,我們用MyReducer類來設置Combiners,表示Combiners與Reduce功能相同,帶有combine功能的MapRduce程序如代碼3.1所示。

  1 package combine;
  2 
  3 import java.net.URI;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Partitioner;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.jasper.tagplugins.jstl.core.If;
 18 
 19 public class WordCountApp2 {
 20     static final String INPUT_PATH = "hdfs://hadoop:9000/hello";
 21     static final String OUT_PATH = "hdfs://hadoop:9000/out";
 22     
 23     public static void main(String[] args) throws Exception {
 24         Configuration conf = new Configuration();
 25         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
 26         final Path outPath = new Path(OUT_PATH);
 27         if(fileSystem.exists(outPath)){
 28             fileSystem.delete(outPath, true);
 29         }
 30         final Job job = new Job(conf , WordCountApp2.class.getSimpleName());
 31         job.setJarByClass(WordCountApp2.class);
 32         
 33         //1.1指定讀取的文件位於哪里
 34         FileInputFormat.setInputPaths(job, INPUT_PATH);        
 35         job.setInputFormatClass(TextInputFormat.class);//指定如何對輸入文件進行格式化,把輸入文件每一行解析成鍵值對
 36         
 37         //1.2 指定自定義的map類
 38         job.setMapperClass(MyMapper.class);
 39         job.setMapOutputKeyClass(Text.class);//map輸出的<k,v>類型。
 40         job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的類型與<k2,v2>類型一致,則可以省略
 41         
 42         //1.3 分區
 43         job.setPartitionerClass(MyPartitioner.class);
 44         //有幾個reduce任務運行
 45         job.setNumReduceTasks(2);
 46         
 47         //1.4 TODO 排序、分組
 48         
 49         //1.5 規約
 50         job.setCombinerClass(MyCombiner.class);
 51         
 52         //2.2 指定自定義reduce類
 53         job.setReducerClass(MyReducer.class);
 54         //指定reduce的輸出類型
 55         job.setOutputKeyClass(Text.class);
 56         job.setOutputValueClass(LongWritable.class);
 57         
 58         //2.3 指定寫出到哪里
 59         FileOutputFormat.setOutputPath(job, outPath);
 60         //指定輸出文件的格式化類
 61         //job.setOutputFormatClass(TextOutputFormat.class);
 62         
 63         //把job提交給JobTracker運行
 64         job.waitForCompletion(true);
 65     }
 66     
 67     static class MyPartitioner extends Partitioner<Text, LongWritable>{
 68         @Override
 69         public int getPartition(Text key, LongWritable value, int numReduceTasks) {
 70             return (key.toString().equals("hello"))?0:1;
 71         }
 72     }
 73     
 74     /**
 75      * KEYIN    即k1        表示行的偏移量
 76      * VALUEIN    即v1        表示行文本內容
 77      * KEYOUT    即k2        表示行中出現的單詞
 78      * VALUEOUT    即v2        表示行中出現的單詞的次數,固定值1
 79      */
 80     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
 81         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
 82             final String[] splited = v1.toString().split("\t");
 83             for (String word : splited) {
 84                 context.write(new Text(word), new LongWritable(1));
 85                 System.out.println("Mapper輸出<"+word+","+1+">");
 86             }
 87         };
 88     }
 89     
 90     /**
 91      * KEYIN    即k2        表示行中出現的單詞
 92      * VALUEIN    即v2        表示行中出現的單詞的次數
 93      * KEYOUT    即k3        表示文本中出現的不同單詞
 94      * VALUEOUT    即v3        表示文本中出現的不同單詞的總次數
 95      *
 96      */
 97     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
 98         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
 99             //顯示次數表示redcue函數被調用了多少次,表示k2有多少個分組
100             System.out.println("MyReducer輸入分組<"+k2.toString()+",...>");
101             long times = 0L;
102             for (LongWritable count : v2s) {
103                 times += count.get();
104                 //顯示次數表示輸入的k2,v2的鍵值對數量
105                 System.out.println("MyReducer輸入鍵值對<"+k2.toString()+","+count.get()+">");
106             }
107             ctx.write(k2, new LongWritable(times));
108         };
109     }
110     
111     
112     static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
113         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
114             //顯示次數表示redcue函數被調用了多少次,表示k2有多少個分組
115             System.out.println("Combiner輸入分組<"+k2.toString()+",...>");
116             long times = 0L;
117             for (LongWritable count : v2s) {
118                 times += count.get();
119                 //顯示次數表示輸入的k2,v2的鍵值對數量
120                 System.out.println("Combiner輸入鍵值對<"+k2.toString()+","+count.get()+">");
121             }
122             
123             ctx.write(k2, new LongWritable(times));
124             //顯示次數表示輸出的k2,v2的鍵值對數量
125             System.out.println("Combiner輸出鍵值對<"+k2.toString()+","+times+">");
126         };
127     }
128 }
View Code

代碼 3.1

  運行結果如下圖3.1所示。

Counters: 20 Sensitive Words
     hello=2 File Output Format Counters 
     Bytes Written=21 FileSystemCounters
     FILE_BYTES_READ=359
     HDFS_BYTES_READ=42
     FILE_BYTES_WRITTEN=129080
     HDFS_BYTES_WRITTEN=21 File Input Format Counters 
     Bytes Read=21 Map-Reduce Framework
     Map output materialized bytes=67
     Map input records=2
     Reduce shuffle bytes=0
     Spilled Records=8
     Map output bytes=53
     Total committed heap usage (bytes)=391774208
     SPLIT_RAW_BYTES=95
     Combine input records=4
     Reduce input records=3
     Reduce input groups=3
     Combine output records=3
     Reduce output records=3
     Map output records=4

圖 3.1

  從上面的運行結果我們可以發現,此時Combine input records=4,Combine output records=3,Reduce input records=3,因為Combine階段在Ma pper結束與Reducer開始之間,Combiners處理的數據,就是在不設置Combiners時,Reduce所應該接受的數據,所以為4,然后再將Combiners的輸出作為Re duce端的輸入,所以Reduce input records這個字段由4變成了3。注意,combine操作是一個可選的操作,使用時需要我們自己設定,在本代碼中我們用MyRed ucer類來設置Combiners,Combine方法的使用的是Reduce的方法,這說明歸約的方法是通用的,Reducer階段的方法也可以用到Mapper階段。

2.1 自定義Combiners

  為了能夠更加清晰的理解Combiners的工作原理,我們自定義一個Combiners類,不再使用MyReduce做為Combiners的類,如代碼3.2所示。

  1 package combine;
  2 
  3 import java.net.URI;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Partitioner;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.jasper.tagplugins.jstl.core.If;
 18 
 19 /**
 20  * 問:為什么使用Combiner?
 21  * 答:Combiner發生在Map端,對數據進行規約處理,數據量變小了,傳送到reduce端的數據量變小了,傳輸時間變短,作業的整體時間變短。
 22  * 
 23  * 問:為什么Combiner不作為MR運行的標配,而是可選步驟哪?
 24  * 答:因為不是所有的算法都適合使用Combiner處理,例如求平均數。
 25  *
 26  * 問:Combiner本身已經執行了reduce操作,為什么在Reducer階段還要執行reduce操作哪?
 27  * 答:combiner操作發生在map端的,處理一個任務所接收的文件中的數據,不能跨map任務執行;只有reduce可以接收多個map任務處理的數據。
 28  *
 29  */
 30 public class WordCountApp2 {
 31     static final String INPUT_PATH = "hdfs://hadoop:9000/hello";
 32     static final String OUT_PATH = "hdfs://hadoop:9000/out";
 33     
 34     public static void main(String[] args) throws Exception {
 35         Configuration conf = new Configuration();
 36         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
 37         final Path outPath = new Path(OUT_PATH);
 38         if(fileSystem.exists(outPath)){
 39             fileSystem.delete(outPath, true);
 40         }
 41         final Job job = new Job(conf , WordCountApp2.class.getSimpleName());
 42         job.setJarByClass(WordCountApp2.class);
 43         
 44         //1.1指定讀取的文件位於哪里
 45         FileInputFormat.setInputPaths(job, INPUT_PATH);        
 46         job.setInputFormatClass(TextInputFormat.class);//指定如何對輸入文件進行格式化,把輸入文件每一行解析成鍵值對
 47         
 48         //1.2 指定自定義的map類
 49         job.setMapperClass(MyMapper.class);
 50         job.setMapOutputKeyClass(Text.class);//map輸出的<k,v>類型。
 51         job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的類型與<k2,v2>類型一致,則可以省略
 52         
 53         //1.3 分區
 54         job.setPartitionerClass(MyPartitioner.class);
 55         //有幾個reduce任務運行
 56         job.setNumReduceTasks(2);
 57         
 58         //1.4 TODO 排序、分組
 59         
 60         //1.5 規約
 61         job.setCombinerClass(MyCombiner.class);
 62         
 63         //2.2 指定自定義reduce類
 64         job.setReducerClass(MyReducer.class);
 65         //指定reduce的輸出類型
 66         job.setOutputKeyClass(Text.class);
 67         job.setOutputValueClass(LongWritable.class);
 68         
 69         //2.3 指定寫出到哪里
 70         FileOutputFormat.setOutputPath(job, outPath);
 71         //指定輸出文件的格式化類
 72         //job.setOutputFormatClass(TextOutputFormat.class);
 73         
 74         //把job提交給JobTracker運行
 75         job.waitForCompletion(true);
 76     }
 77     
 78     static class MyPartitioner extends Partitioner<Text, LongWritable>{
 79         @Override
 80         public int getPartition(Text key, LongWritable value, int numReduceTasks) {
 81             return (key.toString().equals("hello"))?0:1;
 82         }
 83     }
 84     
 85     /**
 86      * KEYIN    即k1        表示行的偏移量
 87      * VALUEIN    即v1        表示行文本內容
 88      * KEYOUT    即k2        表示行中出現的單詞
 89      * VALUEOUT    即v2        表示行中出現的單詞的次數,固定值1
 90      */
 91     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
 92         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
 93             final String[] splited = v1.toString().split("\t");
 94             for (String word : splited) {
 95                 context.write(new Text(word), new LongWritable(1));
 96                 System.out.println("Mapper輸出<"+word+","+1+">");
 97             }
 98         };
 99     }
100     
101     /**
102      * KEYIN    即k2        表示行中出現的單詞
103      * VALUEIN    即v2        表示行中出現的單詞的次數
104      * KEYOUT    即k3        表示文本中出現的不同單詞
105      * VALUEOUT    即v3        表示文本中出現的不同單詞的總次數
106      *
107      */
108     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
109         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
110             //顯示次數表示redcue函數被調用了多少次,表示k2有多少個分組
111             System.out.println("MyReducer輸入分組<"+k2.toString()+",...>");
112             long times = 0L;
113             for (LongWritable count : v2s) {
114                 times += count.get();
115                 //顯示次數表示輸入的k2,v2的鍵值對數量
116                 System.out.println("MyReducer輸入鍵值對<"+k2.toString()+","+count.get()+">");
117             }
118             ctx.write(k2, new LongWritable(times));
119         };
120     }
121     
122     
123     static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
124         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
125             //顯示次數表示redcue函數被調用了多少次,表示k2有多少個分組
126             System.out.println("Combiner輸入分組<"+k2.toString()+",...>");
127             long times = 0L;
128             for (LongWritable count : v2s) {
129                 times += count.get();
130                 //顯示次數表示輸入的k2,v2的鍵值對數量
131                 System.out.println("Combiner輸入鍵值對<"+k2.toString()+","+count.get()+">");
132             }
133             
134             ctx.write(k2, new LongWritable(times));
135             //顯示次數表示輸出的k2,v2的鍵值對數量
136             System.out.println("Combiner輸出鍵值對<"+k2.toString()+","+times+">");
137         };
138     }
139 }
View Code

代碼 3.2

運行結果如圖3.2所示。

14/10/07 18:56:32 INFO mapred.MapTask: record buffer = 262144/327680
Mapper輸出<hello,1>
14/10/07 18:56:32 INFO mapred.MapTask: Starting flush of map output
Mapper輸出<world,1>
Mapper輸出<hello,1>
Mapper輸出<me,1>
Combiner輸入分組<hello,...>
Combiner輸入鍵值對<hello,1>
Combiner輸入鍵值對<hello,1>
Combiner輸出鍵值對<hello,2>
Combiner輸入分組<me,...>
Combiner輸入鍵值對<me,1>
Combiner輸出鍵值對<me,1>
Combiner輸入分組<world,...>
Combiner輸入鍵值對<world,1>
Combiner輸出鍵值對<world,1>
14/10/07 18:56:32 INFO mapred.MapTask: Finished spill 0
14/10/07 18:56:32 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/10/07 18:56:32 INFO mapred.LocalJobRunner: 
14/10/07 18:56:32 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
14/10/07 18:56:32 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
14/10/07 18:56:32 INFO mapred.LocalJobRunner: 
14/10/07 18:56:32 INFO mapred.Merger: Merging 1 sorted segments
14/10/07 18:56:32 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 47 bytes
14/10/07 18:56:32 INFO mapred.LocalJobRunner: 
MyReducer輸入分組<hello,...>
MyReducer輸入鍵值對<hello,2>
MyReducer輸入分組<me,...>
MyReducer輸入鍵值對<me,1>
MyReducer輸入分組<world,...>
MyReducer輸入鍵值對<world,1>
14/10/07 18:56:33 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/10/07 18:56:33 INFO mapred.LocalJobRunner: 
14/10/07 18:56:33 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/10/07 18:56:33 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://hadoop:9000/output
14/10/07 18:56:33 INFO mapred.LocalJobRunner: reduce > reduce
14/10/07 18:56:33 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
14/10/07 18:56:33 INFO mapred.JobClient:  map 100% reduce 100%
14/10/07 18:56:33 INFO mapred.JobClient: Job complete: job_local_0001
14/10/07 18:56:33 INFO mapred.JobClient: Counters: 19
14/10/07 18:56:33 INFO mapred.JobClient:   File Output Format Counters 
14/10/07 18:56:33 INFO mapred.JobClient:     Bytes Written=21
14/10/07 18:56:33 INFO mapred.JobClient:   FileSystemCounters
14/10/07 18:56:33 INFO mapred.JobClient:     FILE_BYTES_READ=343
14/10/07 18:56:33 INFO mapred.JobClient:     HDFS_BYTES_READ=42
14/10/07 18:56:33 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=129572
14/10/07 18:56:33 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=21
14/10/07 18:56:33 INFO mapred.JobClient:   File Input Format Counters 
14/10/07 18:56:33 INFO mapred.JobClient:     Bytes Read=21
14/10/07 18:56:33 INFO mapred.JobClient:   Map-Reduce Framework
14/10/07 18:56:33 INFO mapred.JobClient:     Map output materialized bytes=51
14/10/07 18:56:33 INFO mapred.JobClient:     Map input records=2
14/10/07 18:56:33 INFO mapred.JobClient:     Reduce shuffle bytes=0
14/10/07 18:56:33 INFO mapred.JobClient:     Spilled Records=6
14/10/07 18:56:33 INFO mapred.JobClient:     Map output bytes=53
14/10/07 18:56:33 INFO mapred.JobClient:     Total committed heap usage (bytes)=391774208
14/10/07 18:56:33 INFO mapred.JobClient:     SPLIT_RAW_BYTES=95
14/10/07 18:56:33 INFO mapred.JobClient:     Combine input records=4
14/10/07 18:56:33 INFO mapred.JobClient:     Reduce input records=3
14/10/07 18:56:33 INFO mapred.JobClient:     Reduce input groups=3
14/10/07 18:56:33 INFO mapred.JobClient:     Combine output records=3
14/10/07 18:56:33 INFO mapred.JobClient:     Reduce output records=3
14/10/07 18:56:33 INFO mapred.JobClient:     Map output records=4

圖 3.2

  從上面的運行結果我們可以得知,combine具體作用如下:

  • 每一個map可能會產生大量的輸出,combiner的作用就是在map端對輸出先做一次合並,以減少傳輸到reducer的數據量。
  • combiner最基本是實現本地key的歸並,combiner具有類似本地的reduce功能。
  • 如果不用combiner,那么,所有的結果都是reduce完成,效率會相對低下。使用combiner,先完成的map會在本地聚合,提升速度。

   注意:Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。所以從我的想法來看,Combiner只應該用於那 種Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結果的場景。比如累加,最大值等。

解釋一下

*問:為什么使用Combiner?
   答:Combiner發生在Map端,對數據進行規約處理,數據量變小了,傳送到reduce端的數據量變小了,傳輸時間變短,作業的整體時間變短。
* 問:為什么Combiner不作為MR運行的標配,而是可選步驟?
    答:因為不是所有的算法都適合使用Combiner處理,例如求平均數。
* 問:Combiner本身已經執行了reduce操作,為什么在Reducer階段還要執行reduce操作?
    答:combiner操作發生在map端的,智能處理一個map任務中的數據,不能跨map任務執行;只有reduce可以接收多個map任務處理的數據。

三、Partitioner編程

4.1 什么是分區

  在MapReuce程序中的Mapper任務的第三步就是分區,那么分區到底是干什么的呢?其實,把數據分區是為了更好的利用數據,根據數據的屬性不同來分成不同區,再根據不同的分區完成不同的任務。MapReduce程序中他的默認分區是1個分區,我們看一下默認分區的代碼,還是以單詞統計為例如代碼4.1所示。

  1 package counter;
  2 
  3 import java.net.URI;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Counter;
 11 import org.apache.hadoop.mapreduce.Job;
 12 import org.apache.hadoop.mapreduce.Mapper;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 19 
 20 public class WordCountApp {
 21     static final String INPUT_PATH = "hdfs://hadoop:9000/input";
 22     static final String OUT_PATH = "hdfs://hadoop:9000/output";
 23     
 24     public static void main(String[] args) throws Exception {
 25         
 26         Configuration conf = new Configuration();
 27         
 28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
 29         final Path outPath = new Path(OUT_PATH);
 30         
 31         if(fileSystem.exists(outPath)){
 32             fileSystem.delete(outPath, true);
 33         }        
 34         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
 35         
 36         //1.1指定讀取的文件位於哪里
 37         FileInputFormat.setInputPaths(job, INPUT_PATH);        
 38         job.setInputFormatClass(TextInputFormat.class);//指定如何對輸入文件進行格式化,把輸入文件每一行解析成鍵值對
 39         
 40         //1.2 指定自定義的map類
 41         job.setMapperClass(MyMapper.class);
 42         job.setMapOutputKeyClass(Text.class);//map輸出的<k,v>類型。
 43         job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的類型與<k2,v2>類型一致,則可以省略
 44         
 45         //1.3 分區
 46         job.setPartitionerClass(HashPartitioner.class);        
 47         job.setNumReduceTasks(1);//有一個reduce任務運行                
 48         
 49         job.setCombinerClass(MyReducer.class);
 50         //2.2 指定自定義reduce類
 51         job.setReducerClass(MyReducer.class);
 52         
 53         job.setOutputKeyClass(Text.class);//指定reduce的輸出類型
 54         job.setOutputValueClass(LongWritable.class);
 55         
 56         //2.3 指定寫出到哪里
 57         FileOutputFormat.setOutputPath(job, outPath);        
 58         job.setOutputFormatClass(TextOutputFormat.class);//指定輸出文件的格式化類
 59                 
 60         job.waitForCompletion(true);//把job提交給JobTracker運行
 61     }
 62     
 63     /**
 64      * KEYIN    即k1        表示行的偏移量
 65      * VALUEIN    即v1        表示行文本內容
 66      * KEYOUT    即k2        表示行中出現的單詞
 67      * VALUEOUT    即v2        表示行中出現的單詞的次數,固定值1
 68      */
 69     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
 70         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
 71             final Counter helloCounter = context.getCounter("Sensitive Words", "hello");
 72             
 73             final String line = v1.toString();
 74             if(line.contains("hello")){
 75                 //記錄敏感詞出現在一行中
 76                 helloCounter.increment(1L);
 77             }
 78             final String[] splited = line.split("\t");
 79             for (String word : splited) {
 80                 context.write(new Text(word), new LongWritable(1));
 81             }
 82         };
 83     }
 84     
 85     /**
 86      * KEYIN    即k2        表示行中出現的單詞
 87      * VALUEIN    即v2        表示行中出現的單詞的次數
 88      * KEYOUT    即k3        表示文本中出現的不同單詞
 89      * VALUEOUT    即v3        表示文本中出現的不同單詞的總次數
 90      *
 91      */
 92     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
 93         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
 94             long times = 0L;
 95             for (LongWritable count : v2s) {
 96                 times += count.get();
 97             }
 98             ctx.write(k2, new LongWritable(times));
 99         };
100     }
101         
102 }
View Code

代碼 4.1

  在MapReduce程序中默認的分區方法HashPartitioner,代碼job.setNumReduceTasks(1)表示運行的Reduce任務數,他會將numReduceTask這個變量設為1. HashPartitioner繼承自Partitioner,Partitioner是Partitioner的基類,如果需要定制partitioner也需要繼承該類。 HashPartitioner計算方法如代碼4.2所示。

1 public class HashPartitioner<K, V> extends Partitioner<K, V> {
2 
3   /** Use {@link Object#hashCode()} to partition. */
4   public int getPartition(K key, V value,
5                           int numReduceTasks) {
6     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
7   }
8 
9 }

代碼 4.2

  在上面的代碼中K和V,表示k2和v2,該類中只有一個方法getPartition(),返回值如下”(key.hashCode()& Integer.MAX_VALUE)%numReduceTasks其中key.hashCode()表示該關鍵是否屬於該類numReduceTasks的值在上面代碼中設置為1,取模后只有一種結果那就是0。getPartition()的意義就是表示划分到不同區域的一個標記,返回0,就是表示划分到第0區,所以我們可以把它理解分區的下標,來代表不同的分區。

4.2 自定義分區

  下面我們嘗試自定義一個分區,來處理一下手機的日志數據(在前面學習中用過),手機日志數據如下圖4.1所示。

圖 4.1

  從圖中我們可以發現,在第二列上並不是所有的數據都是手機號,我們任務就是在統計手機流量時,將手機號碼和非手機號輸出到不同的文件中。我們的分區是按手機和非手機號碼來分的,所以我們可以按該字段的長度來划分,如代碼4.3所示。

 

  1 package partition;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.io.Writable;
 12 import org.apache.hadoop.mapreduce.Job;
 13 import org.apache.hadoop.mapreduce.Mapper;
 14 import org.apache.hadoop.mapreduce.Reducer;
 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 20 
 21 public class KpiApp {
 22     static final String INPUT_PATH = "hdfs://hadoop:9000/wlan";
 23     static final String OUT_PATH = "hdfs://hadoop:9000/out";
 24     public static void main(String[] args) throws Exception{
 25         final Job job = new Job(new Configuration(), KpiApp.class.getSimpleName());
 26         
 27         job.setJarByClass(KpiApp.class);
 28         
 29         //1.1 指定輸入文件路徑
 30         FileInputFormat.setInputPaths(job, INPUT_PATH);
 31         job.setInputFormatClass(TextInputFormat.class);//指定哪個類用來格式化輸入文件
 32         
 33         //1.2指定自定義的Mapper類
 34         job.setMapperClass(MyMapper.class);        
 35         job.setMapOutputKeyClass(Text.class);//指定輸出<k2,v2>的類型
 36         job.setMapOutputValueClass(KpiWritable.class);
 37         
 38         //1.3 指定分區類
 39         job.setPartitionerClass(KpiPartitioner.class);
 40         job.setNumReduceTasks(2);
 41                 
 42         //2.2 指定自定義的reduce類
 43         job.setReducerClass(MyReducer.class);
 44         job.setOutputKeyClass(Text.class);//指定輸出<k3,v3>的類型
 45         job.setOutputValueClass(KpiWritable.class);
 46         
 47         //2.3 指定輸出到哪里
 48         FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
 49         job.setOutputFormatClass(TextOutputFormat.class);//設定輸出文件的格式化類
 50         job.waitForCompletion(true);//把代碼提交給JobTracker執行
 51     }
 52 
 53     static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{
 54         protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,KpiWritable>.Context context) throws IOException ,InterruptedException {
 55             final String[] splited = value.toString().split("\t");
 56             final String msisdn = splited[1];
 57             final Text k2 = new Text(msisdn);
 58             final KpiWritable v2 = new KpiWritable(splited[6],splited[7],splited[8],splited[9]);
 59             context.write(k2, v2);
 60         };
 61     }
 62     
 63     static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{
 64         /**
 65          * @param    k2    表示整個文件中不同的手機號碼    
 66          * @param    v2s    表示該手機號在不同時段的流量的集合
 67          */
 68         protected void reduce(Text k2, java.lang.Iterable<KpiWritable> v2s, org.apache.hadoop.mapreduce.Reducer<Text,KpiWritable,Text,KpiWritable>.Context context) throws IOException ,InterruptedException {
 69             long upPackNum = 0L;
 70             long downPackNum = 0L;
 71             long upPayLoad = 0L;
 72             long downPayLoad = 0L;
 73             
 74             for (KpiWritable kpiWritable : v2s) {
 75                 upPackNum += kpiWritable.upPackNum;
 76                 downPackNum += kpiWritable.downPackNum;
 77                 upPayLoad += kpiWritable.upPayLoad;
 78                 downPayLoad += kpiWritable.downPayLoad;
 79             }
 80             
 81             final KpiWritable v3 = new KpiWritable(upPackNum+"", downPackNum+"", upPayLoad+"", downPayLoad+"");
 82             context.write(k2, v3);
 83         };
 84     }
 85     
 86     static class KpiPartitioner extends HashPartitioner<Text, KpiWritable>{
 87         @Override
 88         public int getPartition(Text key, KpiWritable value, int numReduceTasks) {
 89             return (key.toString().length()==11)?0:1;
 90         }
 91     }
 92 }
 93 
 94 class KpiWritable implements Writable{
 95     long upPackNum;
 96     long downPackNum;
 97     long upPayLoad;
 98     long downPayLoad;
 99     
100     public KpiWritable(){}
101     
102     public KpiWritable(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad){
103         this.upPackNum = Long.parseLong(upPackNum);
104         this.downPackNum = Long.parseLong(downPackNum);
105         this.upPayLoad = Long.parseLong(upPayLoad);
106         this.downPayLoad = Long.parseLong(downPayLoad);
107     }
108     
109     
110     @Override
111     public void readFields(DataInput in) throws IOException {
112         this.upPackNum = in.readLong();
113         this.downPackNum = in.readLong();
114         this.upPayLoad = in.readLong();
115         this.downPayLoad = in.readLong();
116     }
117 
118     @Override
119     public void write(DataOutput out) throws IOException {
120         out.writeLong(upPackNum);
121         out.writeLong(downPackNum);
122         out.writeLong(upPayLoad);
123         out.writeLong(downPayLoad);
124     }
125     
126     @Override
127     public String toString() {
128         return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
129     }
130 }
View Code

代碼 4.3

  注意:分區的例子必須打成jar運行,運行結果如下圖4.3,4.4所示,4.3表示手機號碼流量,4.4為非手機號流量。

圖 4.3

圖4.4

  我們知道一個分區對應一個Reducer任務是否是這樣呢,我可以通過訪問50030MapReduce端口來驗證,在瀏覽器輸入”http://hadoop:50030"可以看到MapReduce界面,如圖4.5,4.6所示。

圖 4.5

圖4.6

  從圖中可以知道,該MapReduce任務有一個Mapper任務,兩個Reducer任務,那么我們細看一下Reducer的兩個任務到底是什么?如圖4.7,4.8,4.9所示。task_201410070239_0002_r_000000表示第一個分區的輸出,有20條記錄,task_201410070239_0002_r_000001表示第二分區,有一條輸出記錄。和我們程序運行結果一樣。


圖 4.7

圖 4.8 第一分區

圖 4.9 第二分區

  綜上一些列分析,分區的用處如下:
    1.根據業務需要,產生多個輸出文件
    2.多個reduce任務在並發運行,提高整體job的運行效率

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM