一、Hadoop計數器
1.1 什么是Hadoop計數器
Haoop是處理大數據的,不適合處理小數據,有些大數據問題是小數據程序是處理不了的,他是一個高延遲的任務,有時處理一個大數據需要花費好幾個小時這都是正常的。下面我們說一下Hadoop計數器,Hadoop計數器就相當於我們的日志,而日志可以讓我們查看程序運行時的很多狀態,而計數器也有這方面的作用。那么就研究一下Hadoop自身的計數器。計數器的程序如代碼1.1所示,下面代碼還是以內容為“hello you;hell0 me”的單詞統計為例。

1 package counter; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Counter; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 19 20 public class WordCountApp { 21 static final String INPUT_PATH = "hdfs://hadoop:9000/input"; 22 static final String OUT_PATH = "hdfs://hadoop:9000/output"; 23 24 public static void main(String[] args) throws Exception { 25 26 Configuration conf = new Configuration(); 27 28 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 29 final Path outPath = new Path(OUT_PATH); 30 31 if(fileSystem.exists(outPath)){ 32 fileSystem.delete(outPath, true); 33 } 34 final Job job = new Job(conf , WordCountApp.class.getSimpleName()); 35 36 //1.1指定讀取的文件位於哪里 37 FileInputFormat.setInputPaths(job, INPUT_PATH); 38 job.setInputFormatClass(TextInputFormat.class);//指定如何對輸入文件進行格式化,把輸入文件每一行解析成鍵值對 39 40 //1.2 指定自定義的map類 41 job.setMapperClass(MyMapper.class); 42 job.setMapOutputKeyClass(Text.class);//map輸出的<k,v>類型。 43 job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的類型與<k2,v2>類型一致,則可以省略 44 45 //1.3 分區 46 job.setPartitionerClass(HashPartitioner.class); 47 job.setNumReduceTasks(1);//有一個reduce任務運行 48 49 //2.2 指定自定義reduce類 50 job.setReducerClass(MyReducer.class); 51 52 job.setOutputKeyClass(Text.class);//指定reduce的輸出類型 53 job.setOutputValueClass(LongWritable.class); 54 55 //2.3 指定寫出到哪里 56 FileOutputFormat.setOutputPath(job, outPath); 57 job.setOutputFormatClass(TextOutputFormat.class);//指定輸出文件的格式化類 58 59 job.waitForCompletion(true);//把job提交給JobTracker運行 60 } 61 62 /** 63 * KEYIN 即k1 表示行的偏移量 64 * VALUEIN 即v1 表示行文本內容 65 * KEYOUT 即k2 表示行中出現的單詞 66 * VALUEOUT 即v2 表示行中出現的單詞的次數,固定值1 67 */ 68 static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 69 protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 70 final String line = v1.toString(); 71 final String[] splited = line.split("\t"); 72 for (String word : splited) { 73 context.write(new Text(word), new LongWritable(1)); 74 } 75 }; 76 } 77 78 /** 79 * KEYIN 即k2 表示行中出現的單詞 80 * VALUEIN 即v2 表示行中出現的單詞的次數 81 * KEYOUT 即k3 表示文本中出現的不同單詞 82 * VALUEOUT 即v3 表示文本中出現的不同單詞的總次數 83 * 84 */ 85 static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 86 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 87 long times = 0L; 88 for (LongWritable count : v2s) { 89 times += count.get(); 90 } 91 ctx.write(k2, new LongWritable(times)); 92 }; 93 } 94 95 }
代碼 1.1
運行結果如下圖1.1所示。
Counters: 19//Counter表示計數器,19表示有19個計數器(下面一共4計數器組) File Output Format Counters //文件輸出格式化計數器組 Bytes Written=19 //reduce輸出到hdfs的字節數,一共19個字節 FileSystemCounters//文件系統計數器組 FILE_BYTES_READ=481 HDFS_BYTES_READ=38 FILE_BYTES_WRITTEN=81316 HDFS_BYTES_WRITTEN=19 File Input Format Counters //文件輸入格式化計數器組 Bytes Read=19 //map從hdfs讀取的字節數 Map-Reduce Framework//MapReduce框架 Map output materialized bytes=49 Map input records=2 //map讀入的記錄行數,讀取兩行記錄,”hello you”,”hello me” Reduce shuffle bytes=0//規約分區的字節數 Spilled Records=8 Map output bytes=35 Total committed heap usage (bytes)=266469376 SPLIT_RAW_BYTES=105 Combine input records=0//合並輸入的記錄數 Reduce input records=4 //reduce從map端接收的記錄行數 Reduce input groups=3 //reduce函數接收的key數量,即歸並后的k2數量 Combine output records=0//合並輸出的記錄數 Reduce output records=3 //reduce輸出的記錄行數。<helllo,{1,1}>,<you,{1}>,<me,{1}> Map output records=4 //map輸出的記錄行數,輸出4行記錄
圖 1.1
通過上面我們對計數器的分析,可以知道,我們可以通過計數器來分析MapReduece程序的運行狀態。
1.2 自定義計數器
通過上面的分析,我們了解了計數器的作用,那么我們可以自定義一個計數器,來實現我們自己想要的功能。如定義一個記錄敏感詞的計數器,記錄敏感詞在一行所出現的次數,如代碼2.1所示。我們處理文件內容為“hello you”,“hello me”。

1 Counters: 19//Counter表示計數器,19表示有19個計數器(下面一共4計數器組) 2 File Output Format Counters //文件輸出格式化計數器組 3 Bytes Written=19 //reduce輸出到hdfs的字節數,一共19個字節 4 FileSystemCounters//文件系統計數器組 5 FILE_BYTES_READ=481 6 HDFS_BYTES_READ=38 7 FILE_BYTES_WRITTEN=81316 8 HDFS_BYTES_WRITTEN=19 9 File Input Format Counters //文件輸入格式化計數器組 10 Bytes Read=19 //map從hdfs讀取的字節數 11 Map-Reduce Framework//MapReduce框架 12 Map output materialized bytes=49 13 Map input records=2 //map讀入的記錄行數,讀取兩行記錄,”hello you”,”hello me” 14 Reduce shuffle bytes=0//規約分區的字節數 15 Spilled Records=8 16 Map output bytes=35 17 Total committed heap usage (bytes)=266469376 18 SPLIT_RAW_BYTES=105 19 Combine input records=0//合並輸入的記錄數 20 Reduce input records=4 //reduce從map端接收的記錄行數 21 Reduce input groups=3 //reduce函數接收的key數量,即歸並后的k2數量 22 Combine output records=0//合並輸出的記錄數 23 Reduce output records=3 //reduce輸出的記錄行數。<helllo,{1,1}>,<you,{1}>,<me,{1}> 24 Map output records=4 //map輸出的記錄行數,輸出4行記錄
代碼2.1
運行結果如下圖2.1所示。
Counters: 20 Sensitive Words hello=2 File Output Format Counters Bytes Written=21 FileSystemCounters FILE_BYTES_READ=359 HDFS_BYTES_READ=42 FILE_BYTES_WRITTEN=129080 HDFS_BYTES_WRITTEN=21 File Input Format Counters Bytes Read=21 Map-Reduce Framework Map output materialized bytes=67 Map input records=2 Reduce shuffle bytes=0 Spilled Records=8 Map output bytes=53 Total committed heap usage (bytes)=391774208 SPLIT_RAW_BYTES=95 Combine input records=0 Reduce input records=4 Reduce input groups=3 Combine output records=0 Reduce output records=3 Map output records=4
圖 2.1
二、Combiners編程
2.1 什么是Combiners
從上面程序運行的結果我們可以發現,在Map-Reduce Framework即MapReduce框架的輸出中,Combine input records這個字段為零, 那么combine怎么使用呢?其實這是MapReduce程序中Mapper任務中第五步,這是可選的一步,使用方法非常簡單,以上面單詞統計為例,只需添加下面一行代碼即可,如下: job.setCombinerClass(MyReducer.class);
combine操作是一個可選的操作,使用時需要我們自己設定,我們用MyReducer類來設置Combiners,表示Combiners與Reduce功能相同,帶有combine功能的MapRduce程序如代碼3.1所示。

1 package combine; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Partitioner; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.jasper.tagplugins.jstl.core.If; 18 19 public class WordCountApp2 { 20 static final String INPUT_PATH = "hdfs://hadoop:9000/hello"; 21 static final String OUT_PATH = "hdfs://hadoop:9000/out"; 22 23 public static void main(String[] args) throws Exception { 24 Configuration conf = new Configuration(); 25 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 26 final Path outPath = new Path(OUT_PATH); 27 if(fileSystem.exists(outPath)){ 28 fileSystem.delete(outPath, true); 29 } 30 final Job job = new Job(conf , WordCountApp2.class.getSimpleName()); 31 job.setJarByClass(WordCountApp2.class); 32 33 //1.1指定讀取的文件位於哪里 34 FileInputFormat.setInputPaths(job, INPUT_PATH); 35 job.setInputFormatClass(TextInputFormat.class);//指定如何對輸入文件進行格式化,把輸入文件每一行解析成鍵值對 36 37 //1.2 指定自定義的map類 38 job.setMapperClass(MyMapper.class); 39 job.setMapOutputKeyClass(Text.class);//map輸出的<k,v>類型。 40 job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的類型與<k2,v2>類型一致,則可以省略 41 42 //1.3 分區 43 job.setPartitionerClass(MyPartitioner.class); 44 //有幾個reduce任務運行 45 job.setNumReduceTasks(2); 46 47 //1.4 TODO 排序、分組 48 49 //1.5 規約 50 job.setCombinerClass(MyCombiner.class); 51 52 //2.2 指定自定義reduce類 53 job.setReducerClass(MyReducer.class); 54 //指定reduce的輸出類型 55 job.setOutputKeyClass(Text.class); 56 job.setOutputValueClass(LongWritable.class); 57 58 //2.3 指定寫出到哪里 59 FileOutputFormat.setOutputPath(job, outPath); 60 //指定輸出文件的格式化類 61 //job.setOutputFormatClass(TextOutputFormat.class); 62 63 //把job提交給JobTracker運行 64 job.waitForCompletion(true); 65 } 66 67 static class MyPartitioner extends Partitioner<Text, LongWritable>{ 68 @Override 69 public int getPartition(Text key, LongWritable value, int numReduceTasks) { 70 return (key.toString().equals("hello"))?0:1; 71 } 72 } 73 74 /** 75 * KEYIN 即k1 表示行的偏移量 76 * VALUEIN 即v1 表示行文本內容 77 * KEYOUT 即k2 表示行中出現的單詞 78 * VALUEOUT 即v2 表示行中出現的單詞的次數,固定值1 79 */ 80 static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 81 protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 82 final String[] splited = v1.toString().split("\t"); 83 for (String word : splited) { 84 context.write(new Text(word), new LongWritable(1)); 85 System.out.println("Mapper輸出<"+word+","+1+">"); 86 } 87 }; 88 } 89 90 /** 91 * KEYIN 即k2 表示行中出現的單詞 92 * VALUEIN 即v2 表示行中出現的單詞的次數 93 * KEYOUT 即k3 表示文本中出現的不同單詞 94 * VALUEOUT 即v3 表示文本中出現的不同單詞的總次數 95 * 96 */ 97 static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 98 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 99 //顯示次數表示redcue函數被調用了多少次,表示k2有多少個分組 100 System.out.println("MyReducer輸入分組<"+k2.toString()+",...>"); 101 long times = 0L; 102 for (LongWritable count : v2s) { 103 times += count.get(); 104 //顯示次數表示輸入的k2,v2的鍵值對數量 105 System.out.println("MyReducer輸入鍵值對<"+k2.toString()+","+count.get()+">"); 106 } 107 ctx.write(k2, new LongWritable(times)); 108 }; 109 } 110 111 112 static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{ 113 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 114 //顯示次數表示redcue函數被調用了多少次,表示k2有多少個分組 115 System.out.println("Combiner輸入分組<"+k2.toString()+",...>"); 116 long times = 0L; 117 for (LongWritable count : v2s) { 118 times += count.get(); 119 //顯示次數表示輸入的k2,v2的鍵值對數量 120 System.out.println("Combiner輸入鍵值對<"+k2.toString()+","+count.get()+">"); 121 } 122 123 ctx.write(k2, new LongWritable(times)); 124 //顯示次數表示輸出的k2,v2的鍵值對數量 125 System.out.println("Combiner輸出鍵值對<"+k2.toString()+","+times+">"); 126 }; 127 } 128 }
代碼 3.1
運行結果如下圖3.1所示。
Counters: 20 Sensitive Words hello=2 File Output Format Counters Bytes Written=21 FileSystemCounters FILE_BYTES_READ=359 HDFS_BYTES_READ=42 FILE_BYTES_WRITTEN=129080 HDFS_BYTES_WRITTEN=21 File Input Format Counters Bytes Read=21 Map-Reduce Framework Map output materialized bytes=67 Map input records=2 Reduce shuffle bytes=0 Spilled Records=8 Map output bytes=53 Total committed heap usage (bytes)=391774208 SPLIT_RAW_BYTES=95 Combine input records=4 Reduce input records=3 Reduce input groups=3 Combine output records=3 Reduce output records=3 Map output records=4
圖 3.1
從上面的運行結果我們可以發現,此時Combine input records=4,Combine output records=3,Reduce input records=3,因為Combine階段在Ma pper結束與Reducer開始之間,Combiners處理的數據,就是在不設置Combiners時,Reduce所應該接受的數據,所以為4,然后再將Combiners的輸出作為Re duce端的輸入,所以Reduce input records這個字段由4變成了3。注意,combine操作是一個可選的操作,使用時需要我們自己設定,在本代碼中我們用MyRed ucer類來設置Combiners,Combine方法的使用的是Reduce的方法,這說明歸約的方法是通用的,Reducer階段的方法也可以用到Mapper階段。
2.1 自定義Combiners
為了能夠更加清晰的理解Combiners的工作原理,我們自定義一個Combiners類,不再使用MyReduce做為Combiners的類,如代碼3.2所示。

1 package combine; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Partitioner; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.jasper.tagplugins.jstl.core.If; 18 19 /** 20 * 問:為什么使用Combiner? 21 * 答:Combiner發生在Map端,對數據進行規約處理,數據量變小了,傳送到reduce端的數據量變小了,傳輸時間變短,作業的整體時間變短。 22 * 23 * 問:為什么Combiner不作為MR運行的標配,而是可選步驟哪? 24 * 答:因為不是所有的算法都適合使用Combiner處理,例如求平均數。 25 * 26 * 問:Combiner本身已經執行了reduce操作,為什么在Reducer階段還要執行reduce操作哪? 27 * 答:combiner操作發生在map端的,處理一個任務所接收的文件中的數據,不能跨map任務執行;只有reduce可以接收多個map任務處理的數據。 28 * 29 */ 30 public class WordCountApp2 { 31 static final String INPUT_PATH = "hdfs://hadoop:9000/hello"; 32 static final String OUT_PATH = "hdfs://hadoop:9000/out"; 33 34 public static void main(String[] args) throws Exception { 35 Configuration conf = new Configuration(); 36 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 37 final Path outPath = new Path(OUT_PATH); 38 if(fileSystem.exists(outPath)){ 39 fileSystem.delete(outPath, true); 40 } 41 final Job job = new Job(conf , WordCountApp2.class.getSimpleName()); 42 job.setJarByClass(WordCountApp2.class); 43 44 //1.1指定讀取的文件位於哪里 45 FileInputFormat.setInputPaths(job, INPUT_PATH); 46 job.setInputFormatClass(TextInputFormat.class);//指定如何對輸入文件進行格式化,把輸入文件每一行解析成鍵值對 47 48 //1.2 指定自定義的map類 49 job.setMapperClass(MyMapper.class); 50 job.setMapOutputKeyClass(Text.class);//map輸出的<k,v>類型。 51 job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的類型與<k2,v2>類型一致,則可以省略 52 53 //1.3 分區 54 job.setPartitionerClass(MyPartitioner.class); 55 //有幾個reduce任務運行 56 job.setNumReduceTasks(2); 57 58 //1.4 TODO 排序、分組 59 60 //1.5 規約 61 job.setCombinerClass(MyCombiner.class); 62 63 //2.2 指定自定義reduce類 64 job.setReducerClass(MyReducer.class); 65 //指定reduce的輸出類型 66 job.setOutputKeyClass(Text.class); 67 job.setOutputValueClass(LongWritable.class); 68 69 //2.3 指定寫出到哪里 70 FileOutputFormat.setOutputPath(job, outPath); 71 //指定輸出文件的格式化類 72 //job.setOutputFormatClass(TextOutputFormat.class); 73 74 //把job提交給JobTracker運行 75 job.waitForCompletion(true); 76 } 77 78 static class MyPartitioner extends Partitioner<Text, LongWritable>{ 79 @Override 80 public int getPartition(Text key, LongWritable value, int numReduceTasks) { 81 return (key.toString().equals("hello"))?0:1; 82 } 83 } 84 85 /** 86 * KEYIN 即k1 表示行的偏移量 87 * VALUEIN 即v1 表示行文本內容 88 * KEYOUT 即k2 表示行中出現的單詞 89 * VALUEOUT 即v2 表示行中出現的單詞的次數,固定值1 90 */ 91 static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 92 protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 93 final String[] splited = v1.toString().split("\t"); 94 for (String word : splited) { 95 context.write(new Text(word), new LongWritable(1)); 96 System.out.println("Mapper輸出<"+word+","+1+">"); 97 } 98 }; 99 } 100 101 /** 102 * KEYIN 即k2 表示行中出現的單詞 103 * VALUEIN 即v2 表示行中出現的單詞的次數 104 * KEYOUT 即k3 表示文本中出現的不同單詞 105 * VALUEOUT 即v3 表示文本中出現的不同單詞的總次數 106 * 107 */ 108 static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 109 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 110 //顯示次數表示redcue函數被調用了多少次,表示k2有多少個分組 111 System.out.println("MyReducer輸入分組<"+k2.toString()+",...>"); 112 long times = 0L; 113 for (LongWritable count : v2s) { 114 times += count.get(); 115 //顯示次數表示輸入的k2,v2的鍵值對數量 116 System.out.println("MyReducer輸入鍵值對<"+k2.toString()+","+count.get()+">"); 117 } 118 ctx.write(k2, new LongWritable(times)); 119 }; 120 } 121 122 123 static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{ 124 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 125 //顯示次數表示redcue函數被調用了多少次,表示k2有多少個分組 126 System.out.println("Combiner輸入分組<"+k2.toString()+",...>"); 127 long times = 0L; 128 for (LongWritable count : v2s) { 129 times += count.get(); 130 //顯示次數表示輸入的k2,v2的鍵值對數量 131 System.out.println("Combiner輸入鍵值對<"+k2.toString()+","+count.get()+">"); 132 } 133 134 ctx.write(k2, new LongWritable(times)); 135 //顯示次數表示輸出的k2,v2的鍵值對數量 136 System.out.println("Combiner輸出鍵值對<"+k2.toString()+","+times+">"); 137 }; 138 } 139 }
代碼 3.2
運行結果如圖3.2所示。
14/10/07 18:56:32 INFO mapred.MapTask: record buffer = 262144/327680 Mapper輸出<hello,1> 14/10/07 18:56:32 INFO mapred.MapTask: Starting flush of map output Mapper輸出<world,1> Mapper輸出<hello,1> Mapper輸出<me,1> Combiner輸入分組<hello,...> Combiner輸入鍵值對<hello,1> Combiner輸入鍵值對<hello,1> Combiner輸出鍵值對<hello,2> Combiner輸入分組<me,...> Combiner輸入鍵值對<me,1> Combiner輸出鍵值對<me,1> Combiner輸入分組<world,...> Combiner輸入鍵值對<world,1> Combiner輸出鍵值對<world,1> 14/10/07 18:56:32 INFO mapred.MapTask: Finished spill 0 14/10/07 18:56:32 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 14/10/07 18:56:32 INFO mapred.LocalJobRunner: 14/10/07 18:56:32 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done. 14/10/07 18:56:32 INFO mapred.Task: Using ResourceCalculatorPlugin : null 14/10/07 18:56:32 INFO mapred.LocalJobRunner: 14/10/07 18:56:32 INFO mapred.Merger: Merging 1 sorted segments 14/10/07 18:56:32 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 47 bytes 14/10/07 18:56:32 INFO mapred.LocalJobRunner: MyReducer輸入分組<hello,...> MyReducer輸入鍵值對<hello,2> MyReducer輸入分組<me,...> MyReducer輸入鍵值對<me,1> MyReducer輸入分組<world,...> MyReducer輸入鍵值對<world,1> 14/10/07 18:56:33 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 14/10/07 18:56:33 INFO mapred.LocalJobRunner: 14/10/07 18:56:33 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now 14/10/07 18:56:33 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://hadoop:9000/output 14/10/07 18:56:33 INFO mapred.LocalJobRunner: reduce > reduce 14/10/07 18:56:33 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done. 14/10/07 18:56:33 INFO mapred.JobClient: map 100% reduce 100% 14/10/07 18:56:33 INFO mapred.JobClient: Job complete: job_local_0001 14/10/07 18:56:33 INFO mapred.JobClient: Counters: 19 14/10/07 18:56:33 INFO mapred.JobClient: File Output Format Counters 14/10/07 18:56:33 INFO mapred.JobClient: Bytes Written=21 14/10/07 18:56:33 INFO mapred.JobClient: FileSystemCounters 14/10/07 18:56:33 INFO mapred.JobClient: FILE_BYTES_READ=343 14/10/07 18:56:33 INFO mapred.JobClient: HDFS_BYTES_READ=42 14/10/07 18:56:33 INFO mapred.JobClient: FILE_BYTES_WRITTEN=129572 14/10/07 18:56:33 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=21 14/10/07 18:56:33 INFO mapred.JobClient: File Input Format Counters 14/10/07 18:56:33 INFO mapred.JobClient: Bytes Read=21 14/10/07 18:56:33 INFO mapred.JobClient: Map-Reduce Framework 14/10/07 18:56:33 INFO mapred.JobClient: Map output materialized bytes=51 14/10/07 18:56:33 INFO mapred.JobClient: Map input records=2 14/10/07 18:56:33 INFO mapred.JobClient: Reduce shuffle bytes=0 14/10/07 18:56:33 INFO mapred.JobClient: Spilled Records=6 14/10/07 18:56:33 INFO mapred.JobClient: Map output bytes=53 14/10/07 18:56:33 INFO mapred.JobClient: Total committed heap usage (bytes)=391774208 14/10/07 18:56:33 INFO mapred.JobClient: SPLIT_RAW_BYTES=95 14/10/07 18:56:33 INFO mapred.JobClient: Combine input records=4 14/10/07 18:56:33 INFO mapred.JobClient: Reduce input records=3 14/10/07 18:56:33 INFO mapred.JobClient: Reduce input groups=3 14/10/07 18:56:33 INFO mapred.JobClient: Combine output records=3 14/10/07 18:56:33 INFO mapred.JobClient: Reduce output records=3 14/10/07 18:56:33 INFO mapred.JobClient: Map output records=4
圖 3.2
從上面的運行結果我們可以得知,combine具體作用如下:
- 每一個map可能會產生大量的輸出,combiner的作用就是在map端對輸出先做一次合並,以減少傳輸到reducer的數據量。
- combiner最基本是實現本地key的歸並,combiner具有類似本地的reduce功能。
- 如果不用combiner,那么,所有的結果都是reduce完成,效率會相對低下。使用combiner,先完成的map會在本地聚合,提升速度。
注意:Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。所以從我的想法來看,Combiner只應該用於那 種Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結果的場景。比如累加,最大值等。
解釋一下
*問:為什么使用Combiner?
答:Combiner發生在Map端,對數據進行規約處理,數據量變小了,傳送到reduce端的數據量變小了,傳輸時間變短,作業的整體時間變短。
* 問:為什么Combiner不作為MR運行的標配,而是可選步驟?
答:因為不是所有的算法都適合使用Combiner處理,例如求平均數。
* 問:Combiner本身已經執行了reduce操作,為什么在Reducer階段還要執行reduce操作?
答:combiner操作發生在map端的,智能處理一個map任務中的數據,不能跨map任務執行;只有reduce可以接收多個map任務處理的數據。
三、Partitioner編程
4.1 什么是分區
在MapReuce程序中的Mapper任務的第三步就是分區,那么分區到底是干什么的呢?其實,把數據分區是為了更好的利用數據,根據數據的屬性不同來分成不同區,再根據不同的分區完成不同的任務。MapReduce程序中他的默認分區是1個分區,我們看一下默認分區的代碼,還是以單詞統計為例如代碼4.1所示。

1 package counter; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Counter; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 19 20 public class WordCountApp { 21 static final String INPUT_PATH = "hdfs://hadoop:9000/input"; 22 static final String OUT_PATH = "hdfs://hadoop:9000/output"; 23 24 public static void main(String[] args) throws Exception { 25 26 Configuration conf = new Configuration(); 27 28 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 29 final Path outPath = new Path(OUT_PATH); 30 31 if(fileSystem.exists(outPath)){ 32 fileSystem.delete(outPath, true); 33 } 34 final Job job = new Job(conf , WordCountApp.class.getSimpleName()); 35 36 //1.1指定讀取的文件位於哪里 37 FileInputFormat.setInputPaths(job, INPUT_PATH); 38 job.setInputFormatClass(TextInputFormat.class);//指定如何對輸入文件進行格式化,把輸入文件每一行解析成鍵值對 39 40 //1.2 指定自定義的map類 41 job.setMapperClass(MyMapper.class); 42 job.setMapOutputKeyClass(Text.class);//map輸出的<k,v>類型。 43 job.setMapOutputValueClass(LongWritable.class);//如果<k3,v3>的類型與<k2,v2>類型一致,則可以省略 44 45 //1.3 分區 46 job.setPartitionerClass(HashPartitioner.class); 47 job.setNumReduceTasks(1);//有一個reduce任務運行 48 49 job.setCombinerClass(MyReducer.class); 50 //2.2 指定自定義reduce類 51 job.setReducerClass(MyReducer.class); 52 53 job.setOutputKeyClass(Text.class);//指定reduce的輸出類型 54 job.setOutputValueClass(LongWritable.class); 55 56 //2.3 指定寫出到哪里 57 FileOutputFormat.setOutputPath(job, outPath); 58 job.setOutputFormatClass(TextOutputFormat.class);//指定輸出文件的格式化類 59 60 job.waitForCompletion(true);//把job提交給JobTracker運行 61 } 62 63 /** 64 * KEYIN 即k1 表示行的偏移量 65 * VALUEIN 即v1 表示行文本內容 66 * KEYOUT 即k2 表示行中出現的單詞 67 * VALUEOUT 即v2 表示行中出現的單詞的次數,固定值1 68 */ 69 static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 70 protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 71 final Counter helloCounter = context.getCounter("Sensitive Words", "hello"); 72 73 final String line = v1.toString(); 74 if(line.contains("hello")){ 75 //記錄敏感詞出現在一行中 76 helloCounter.increment(1L); 77 } 78 final String[] splited = line.split("\t"); 79 for (String word : splited) { 80 context.write(new Text(word), new LongWritable(1)); 81 } 82 }; 83 } 84 85 /** 86 * KEYIN 即k2 表示行中出現的單詞 87 * VALUEIN 即v2 表示行中出現的單詞的次數 88 * KEYOUT 即k3 表示文本中出現的不同單詞 89 * VALUEOUT 即v3 表示文本中出現的不同單詞的總次數 90 * 91 */ 92 static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 93 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 94 long times = 0L; 95 for (LongWritable count : v2s) { 96 times += count.get(); 97 } 98 ctx.write(k2, new LongWritable(times)); 99 }; 100 } 101 102 }
代碼 4.1
在MapReduce程序中默認的分區方法為HashPartitioner,代碼job.setNumReduceTasks(1)表示運行的Reduce任務數,他會將numReduceTask這個變量設為1. HashPartitioner繼承自Partitioner,Partitioner是Partitioner的基類,如果需要定制partitioner也需要繼承該類。 HashPartitioner計算方法如代碼4.2所示。
1 public class HashPartitioner<K, V> extends Partitioner<K, V> { 2 3 /** Use {@link Object#hashCode()} to partition. */ 4 public int getPartition(K key, V value, 5 int numReduceTasks) { 6 return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 7 } 8 9 }
代碼 4.2
在上面的代碼中K和V,表示k2和v2,該類中只有一個方法getPartition(),返回值如下”(key.hashCode()& Integer.MAX_VALUE)%numReduceTasks“其中key.hashCode()表示該關鍵是否屬於該類。numReduceTasks的值在上面代碼中設置為1,取模后只有一種結果那就是0。getPartition()的意義就是表示划分到不同區域的一個標記,返回0,就是表示划分到第0區,所以我們可以把它理解分區的下標,來代表不同的分區。
4.2 自定義分區
下面我們嘗試自定義一個分區,來處理一下手機的日志數據(在前面學習中用過),手機日志數據如下圖4.1所示。
圖 4.1
從圖中我們可以發現,在第二列上並不是所有的數據都是手機號,我們任務就是在統計手機流量時,將手機號碼和非手機號輸出到不同的文件中。我們的分區是按手機和非手機號碼來分的,所以我們可以按該字段的長度來划分,如代碼4.3所示。

1 package partition; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.io.Writable; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 20 21 public class KpiApp { 22 static final String INPUT_PATH = "hdfs://hadoop:9000/wlan"; 23 static final String OUT_PATH = "hdfs://hadoop:9000/out"; 24 public static void main(String[] args) throws Exception{ 25 final Job job = new Job(new Configuration(), KpiApp.class.getSimpleName()); 26 27 job.setJarByClass(KpiApp.class); 28 29 //1.1 指定輸入文件路徑 30 FileInputFormat.setInputPaths(job, INPUT_PATH); 31 job.setInputFormatClass(TextInputFormat.class);//指定哪個類用來格式化輸入文件 32 33 //1.2指定自定義的Mapper類 34 job.setMapperClass(MyMapper.class); 35 job.setMapOutputKeyClass(Text.class);//指定輸出<k2,v2>的類型 36 job.setMapOutputValueClass(KpiWritable.class); 37 38 //1.3 指定分區類 39 job.setPartitionerClass(KpiPartitioner.class); 40 job.setNumReduceTasks(2); 41 42 //2.2 指定自定義的reduce類 43 job.setReducerClass(MyReducer.class); 44 job.setOutputKeyClass(Text.class);//指定輸出<k3,v3>的類型 45 job.setOutputValueClass(KpiWritable.class); 46 47 //2.3 指定輸出到哪里 48 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); 49 job.setOutputFormatClass(TextOutputFormat.class);//設定輸出文件的格式化類 50 job.waitForCompletion(true);//把代碼提交給JobTracker執行 51 } 52 53 static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{ 54 protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,KpiWritable>.Context context) throws IOException ,InterruptedException { 55 final String[] splited = value.toString().split("\t"); 56 final String msisdn = splited[1]; 57 final Text k2 = new Text(msisdn); 58 final KpiWritable v2 = new KpiWritable(splited[6],splited[7],splited[8],splited[9]); 59 context.write(k2, v2); 60 }; 61 } 62 63 static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{ 64 /** 65 * @param k2 表示整個文件中不同的手機號碼 66 * @param v2s 表示該手機號在不同時段的流量的集合 67 */ 68 protected void reduce(Text k2, java.lang.Iterable<KpiWritable> v2s, org.apache.hadoop.mapreduce.Reducer<Text,KpiWritable,Text,KpiWritable>.Context context) throws IOException ,InterruptedException { 69 long upPackNum = 0L; 70 long downPackNum = 0L; 71 long upPayLoad = 0L; 72 long downPayLoad = 0L; 73 74 for (KpiWritable kpiWritable : v2s) { 75 upPackNum += kpiWritable.upPackNum; 76 downPackNum += kpiWritable.downPackNum; 77 upPayLoad += kpiWritable.upPayLoad; 78 downPayLoad += kpiWritable.downPayLoad; 79 } 80 81 final KpiWritable v3 = new KpiWritable(upPackNum+"", downPackNum+"", upPayLoad+"", downPayLoad+""); 82 context.write(k2, v3); 83 }; 84 } 85 86 static class KpiPartitioner extends HashPartitioner<Text, KpiWritable>{ 87 @Override 88 public int getPartition(Text key, KpiWritable value, int numReduceTasks) { 89 return (key.toString().length()==11)?0:1; 90 } 91 } 92 } 93 94 class KpiWritable implements Writable{ 95 long upPackNum; 96 long downPackNum; 97 long upPayLoad; 98 long downPayLoad; 99 100 public KpiWritable(){} 101 102 public KpiWritable(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad){ 103 this.upPackNum = Long.parseLong(upPackNum); 104 this.downPackNum = Long.parseLong(downPackNum); 105 this.upPayLoad = Long.parseLong(upPayLoad); 106 this.downPayLoad = Long.parseLong(downPayLoad); 107 } 108 109 110 @Override 111 public void readFields(DataInput in) throws IOException { 112 this.upPackNum = in.readLong(); 113 this.downPackNum = in.readLong(); 114 this.upPayLoad = in.readLong(); 115 this.downPayLoad = in.readLong(); 116 } 117 118 @Override 119 public void write(DataOutput out) throws IOException { 120 out.writeLong(upPackNum); 121 out.writeLong(downPackNum); 122 out.writeLong(upPayLoad); 123 out.writeLong(downPayLoad); 124 } 125 126 @Override 127 public String toString() { 128 return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad; 129 } 130 }
代碼 4.3
注意:分區的例子必須打成jar運行,運行結果如下圖4.3,4.4所示,4.3表示手機號碼流量,4.4為非手機號流量。
圖 4.3
圖4.4
我們知道一個分區對應一個Reducer任務是否是這樣呢,我可以通過訪問50030MapReduce端口來驗證,在瀏覽器輸入”http://hadoop:50030"可以看到MapReduce界面,如圖4.5,4.6所示。
圖 4.5
圖4.6
從圖中可以知道,該MapReduce任務有一個Mapper任務,兩個Reducer任務,那么我們細看一下Reducer的兩個任務到底是什么?如圖4.7,4.8,4.9所示。task_201410070239_0002_r_000000表示第一個分區的輸出,有20條記錄,task_201410070239_0002_r_000001表示第二分區,有一條輸出記錄。和我們程序運行結果一樣。
圖 4.7
圖 4.8 第一分區
圖 4.9 第二分區
綜上一些列分析,分區的用處如下:
1.根據業務需要,產生多個輸出文件
2.多個reduce任務在並發運行,提高整體job的運行效率