前言:
根據前面的幾篇博客學習,現在可以進行MapReduce學習了。本篇博客首先闡述了MapReduce的概念及使用原理,其次直接從五個實驗中實踐學習(單詞計數,二次排序,計數器,join,分布式緩存)。
一 概述
定義
MapReduce是一種計算模型,簡單的說就是將大批量的工作(數據)分解(MAP)執行,然后再將結果合並成最終結果(REDUCE)。這樣做的好處是可以在任務被分解后,可以通過大量機器進行並行計算,減少整個操作的時間。
適用范圍:數據量大,但是數據種類小可以放入內存。
基本原理及要點:將數據交給不同的機器去處理,數據划分,結果歸約。
理解MapReduce和Yarn:在新版Hadoop中,Yarn作為一個資源管理調度框架,是Hadoop下MapReduce程序運行的生存環境。其實MapRuduce除了可以運行Yarn框架下,也可以運行在諸如Mesos,Corona之類的調度框架上,使用不同的調度框架,需要針對Hadoop做不同的適配。(了解YARN見上一篇博客>> http://www.cnblogs.com/1996swg/p/7286490.html )
MapReduce編程
編寫在Hadoop中依賴Yarn框架執行的MapReduce程序,並不需要自己開發MRAppMaster和YARNRunner,因為Hadoop已經默認提供通用的YARNRunner和MRAppMaster程序, 大部分情況下只需要編寫相應的Map處理和Reduce處理過程的業務程序即可。
編寫一個MapReduce程序並不復雜,關鍵點在於掌握分布式的編程思想和方法,主要將計算過程分為以下五個步驟:
(1)迭代。遍歷輸入數據,並將之解析成key/value對。
(2)將輸入key/value對映射(map)成另外一些key/value對。
(3)依據key對中間數據進行分組(grouping)。
(4)以組為單位對數據進行歸約(reduce)。
(5)迭代。將最終產生的key/value對保存到輸出文件中。
Java API解析
(1)InputFormat:用於描述輸入數據的格式,常用的為TextInputFormat提供如下兩個功能:
數據切分: 按照某個策略將輸入數據切分成若干個split,以便確定Map Task個數以及對應的split。
為Mapper提供數據:給定某個split,能將其解析成一個個key/value對。
(2)OutputFormat:用於描述輸出數據的格式,它能夠將用戶提供的key/value對寫入特定格式的文件中。
(3)Mapper/Reducer: Mapper/Reducer中封裝了應用程序的數據處理邏輯。
(4)Writable:Hadoop自定義的序列化接口。實現該類的接口可以用作MapReduce過程中的value數據使用。
(5)WritableComparable:在Writable基礎上繼承了Comparable接口,實現該類的接口可以用作MapReduce過程中的key數據使用。(因為key包含了比較排序的操作)。
二 單詞計數實驗
!單詞計數文件word
1‘ 啟動Hadoop 執行命令啟動(前面博客)部署好的Hadoop系統。
命令:
cd /usr/cstor/hadoop/
sbin/start-all.sh
2’ 驗證HDFS上沒有wordcount的文件夾 此時HDFS上應該是沒有wordcount文件夾。
cd /usr/cstor/hadoop/
bin/hadoop fs -ls / #查看HDFS上根目錄文件 /
3‘ 上傳數據文件到HDFS
cd /usr/cstor/hadoop/
bin/hadoop fs -put /root/data/5/word /
4’ 編寫MapReduce程序
在eclipse新建mapreduce項目(方法見博客>> http://www.cnblogs.com/1996swg/p/7286136.html ),新建class類WordCount
主要編寫Map和Reduce類,其中Map過程需要繼承org.apache.hadoop.mapreduce包中Mapper類,並重寫其map方法;Reduce過程需要繼承org.apache.hadoop.mapreduce包中Reduce類,並重寫其reduce方法。
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.Path; 3 import org.apache.hadoop.io.IntWritable; 4 import org.apache.hadoop.io.Text; 5 import org.apache.hadoop.mapreduce.Job; 6 import org.apache.hadoop.mapreduce.Mapper; 7 import org.apache.hadoop.mapreduce.Reducer; 8 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 9 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 10 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 11 12 import java.io.IOException; 13 import java.util.StringTokenizer; 14 15 16 public class WordCount { 17 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { 18 private final static IntWritable one = new IntWritable(1); 19 private Text word = new Text(); 20 //map方法,划分一行文本,讀一個單詞寫出一個<單詞,1> 21 public void map(Object key, Text value, Context context)throws IOException, InterruptedException { 22 StringTokenizer itr = new StringTokenizer(value.toString()); 23 while (itr.hasMoreTokens()) { 24 word.set(itr.nextToken()); 25 context.write(word, one);//寫出<單詞,1> 26 }}} 27 //定義reduce類,對相同的單詞,把它們<K,VList>中的VList值全部相加 28 public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 29 private IntWritable result = new IntWritable(); 30 public void reduce(Text key, Iterable<IntWritable> values,Context context) 31 throws IOException, InterruptedException { 32 int sum = 0; 33 for (IntWritable val : values) { 34 sum += val.get();//相當於<Hello,1><Hello,1>,將兩個1相加 35 } 36 result.set(sum); 37 context.write(key, result);//寫出這個單詞,和這個單詞出現次數<單詞,單詞出現次數> 38 }} 39 public static void main(String[] args) throws Exception {//主方法,函數入口 40 Configuration conf = new Configuration(); //實例化配置文件類 41 Job job = new Job(conf, "WordCount"); //實例化Job類 42 job.setInputFormatClass(TextInputFormat.class); //指定使用默認輸入格式類 43 TextInputFormat.setInputPaths(job, args[0]); //設置待處理文件的位置 44 job.setJarByClass(WordCount.class); //設置主類名 45 job.setMapperClass(TokenizerMapper.class); //指定使用上述自定義Map類 46 job.setCombinerClass(IntSumReducer.class); //指定開啟Combiner函數 47 job.setMapOutputKeyClass(Text.class); //指定Map類輸出的<K,V>,K類型 48 job.setMapOutputValueClass(IntWritable.class); //指定Map類輸出的<K,V>,V類型 49 job.setPartitionerClass(HashPartitioner.class); //指定使用默認的HashPartitioner類 50 job.setReducerClass(IntSumReducer.class); //指定使用上述自定義Reduce類 51 job.setNumReduceTasks(Integer.parseInt(args[2])); //指定Reduce個數 52 job.setOutputKeyClass(Text.class); //指定Reduce類輸出的<K,V>,K類型 53 job.setOutputValueClass(Text.class); //指定Reduce類輸出的<K,V>,V類型 54 job.setOutputFormatClass(TextOutputFormat.class); //指定使用默認輸出格式類 55 TextOutputFormat.setOutputPath(job, new Path(args[1])); //設置輸出結果文件位置 56 System.exit(job.waitForCompletion(true) ? 0 : 1); //提交任務並監控任務狀態 57 } 58 }
5' 打包成jar文件上傳
假定打包后的文件名為hdpAction.jar,主類WordCount位於包njupt下,則可使用如下命令向YARN集群提交本應用。
./yarn jar hdpAction.jar mapreduce1.WordCount /word /wordcount 1
其中“yarn”為命令,“jar”為命令參數,后面緊跟打包后的代碼地址,“mapreduce1”為包名,“WordCount”為主類名,“/word”為輸入文件在HDFS中的位置,/wordcount為輸出文件在HDFS中的位置。
注意:如果打包時明確了主類,那么在輸入命令時,就無需輸入mapreduce1.WordCount來確定主類!
結果顯示:
1 17/08/05 03:37:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2 17/08/05 03:37:06 INFO client.RMProxy: Connecting to ResourceManager at master/10.1.21.27:8032 3 17/08/05 03:37:06 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 4 17/08/05 03:37:07 INFO input.FileInputFormat: Total input paths to process : 1 5 17/08/05 03:37:07 INFO mapreduce.JobSubmitter: number of splits:1 6 17/08/05 03:37:07 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501872322130_0004 7 17/08/05 03:37:07 INFO impl.YarnClientImpl: Submitted application application_1501872322130_0004 8 17/08/05 03:37:07 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1501872322130_0004/ 9 17/08/05 03:37:07 INFO mapreduce.Job: Running job: job_1501872322130_0004 10 17/08/05 03:37:12 INFO mapreduce.Job: Job job_1501872322130_0004 running in uber mode : false 11 17/08/05 03:37:12 INFO mapreduce.Job: map 0% reduce 0% 12 17/08/05 03:37:16 INFO mapreduce.Job: map 100% reduce 0% 13 17/08/05 03:37:22 INFO mapreduce.Job: map 100% reduce 100% 14 17/08/05 03:37:22 INFO mapreduce.Job: Job job_1501872322130_0004 completed successfully 15 17/08/05 03:37:22 INFO mapreduce.Job: Counters: 49 16 File System Counters 17 FILE: Number of bytes read=54 18 FILE: Number of bytes written=232239 19 FILE: Number of read operations=0 20 FILE: Number of large read operations=0 21 FILE: Number of write operations=0 22 HDFS: Number of bytes read=166 23 HDFS: Number of bytes written=28 24 HDFS: Number of read operations=6 25 HDFS: Number of large read operations=0 26 HDFS: Number of write operations=2 27 Job Counters 28 Launched map tasks=1 29 Launched reduce tasks=1 30 Data-local map tasks=1 31 Total time spent by all maps in occupied slots (ms)=2275 32 Total time spent by all reduces in occupied slots (ms)=2598 33 Total time spent by all map tasks (ms)=2275 34 Total time spent by all reduce tasks (ms)=2598 35 Total vcore-seconds taken by all map tasks=2275 36 Total vcore-seconds taken by all reduce tasks=2598 37 Total megabyte-seconds taken by all map tasks=2329600 38 Total megabyte-seconds taken by all reduce tasks=2660352 39 Map-Reduce Framework 40 Map input records=8 41 Map output records=20 42 Map output bytes=154 43 Map output materialized bytes=54 44 Input split bytes=88 45 Combine input records=20 46 Combine output records=5 47 Reduce input groups=5 48 Reduce shuffle bytes=54 49 Reduce input records=5 50 Reduce output records=5 51 Spilled Records=10 52 Shuffled Maps =1 53 Failed Shuffles=0 54 Merged Map outputs=1 55 GC time elapsed (ms)=47 56 CPU time spent (ms)=1260 57 Physical memory (bytes) snapshot=421257216 58 Virtual memory (bytes) snapshot=1647611904 59 Total committed heap usage (bytes)=402653184 60 Shuffle Errors 61 BAD_ID=0 62 CONNECTION=0 63 IO_ERROR=0 64 WRONG_LENGTH=0 65 WRONG_MAP=0 66 WRONG_REDUCE=0 67 File Input Format Counters 68 Bytes Read=78 69 File Output Format Counters 70 Bytes Written=28
>生成結果文件wordcount目錄下的part-r-00000,用hadoop命令查看生成文件
三 二次排序
MR默認會對鍵進行排序,然而有的時候我們也有對值進行排序的需求。滿足這種需求一是可以在reduce階段排序收集過來的values,但是,如果有數量巨大的values可能就會導致內存溢出等問題,這就是二次排序應用的場景——將對值的排序也安排到MR計算過程之中,而不是單獨來做。
二次排序就是首先按照第一字段排序,然后再對第一字段相同的行按照第二字段排序,注意不能破壞第一次排序的結果。
!需排序文件secsortdata.txt
1' 編寫程序IntPair 類和主類 SecondarySort類
同第一個實驗在eclipse編程的創建方法!
程序主要難點在於排序和聚合。
對於排序我們需要定義一個IntPair類用於數據的存儲,並在IntPair類內部自定義Comparator類以實現第一字段和第二字段的比較。
對於聚合我們需要定義一個FirstPartitioner類,在FirstPartitioner類內部指定聚合規則為第一字段。
此外,我們還需要開啟MapReduce框架自定義Partitioner 功能和GroupingComparator功能。
Inpair.java
1 package mr; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.WritableComparable; 9 10 public class IntPair implements WritableComparable<IntPair> { 11 private IntWritable first; 12 private IntWritable second; 13 public void set(IntWritable first, IntWritable second) { 14 this.first = first; 15 this.second = second; 16 } 17 //注意:需要添加無參的構造方法,否則反射時會報錯。 18 public IntPair() { 19 set(new IntWritable(), new IntWritable()); 20 } 21 public IntPair(int first, int second) { 22 set(new IntWritable(first), new IntWritable(second)); 23 } 24 public IntPair(IntWritable first, IntWritable second) { 25 set(first, second); 26 } 27 public IntWritable getFirst() { 28 return first; 29 } 30 public void setFirst(IntWritable first) { 31 this.first = first; 32 } 33 public IntWritable getSecond() { 34 return second; 35 } 36 public void setSecond(IntWritable second) { 37 this.second = second; 38 } 39 public void write(DataOutput out) throws IOException { 40 first.write(out); 41 second.write(out); 42 } 43 public void readFields(DataInput in) throws IOException { 44 first.readFields(in); 45 second.readFields(in); 46 } 47 public int hashCode() { 48 return first.hashCode() * 163 + second.hashCode(); 49 } 50 public boolean equals(Object o) { 51 if (o instanceof IntPair) { 52 IntPair tp = (IntPair) o; 53 return first.equals(tp.first) && second.equals(tp.second); 54 } 55 return false; 56 } 57 public String toString() { 58 return first + "\t" + second; 59 } 60 public int compareTo(IntPair tp) { 61 int cmp = first.compareTo(tp.first); 62 if (cmp != 0) { 63 return cmp; 64 } 65 return second.compareTo(tp.second); 66 } 67 }
secsortdata.java
1 package mr; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.NullWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.io.WritableComparable; 11 import org.apache.hadoop.io.WritableComparator; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Partitioner; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 18 19 public class SecondarySort { 20 static class TheMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> { 21 @Override 22 protected void map(LongWritable key, Text value, Context context) 23 throws IOException, InterruptedException { 24 String[] fields = value.toString().split("\t"); 25 int field1 = Integer.parseInt(fields[0]); 26 int field2 = Integer.parseInt(fields[1]); 27 context.write(new IntPair(field1,field2), NullWritable.get()); 28 } 29 } 30 static class TheReducer extends Reducer<IntPair, NullWritable,IntPair, NullWritable> { 31 //private static final Text SEPARATOR = new Text("------------------------------------------------"); 32 @Override 33 protected void reduce(IntPair key, Iterable<NullWritable> values, Context context) 34 throws IOException, InterruptedException { 35 context.write(key, NullWritable.get()); 36 } 37 } 38 public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> { 39 public int getPartition(IntPair key, NullWritable value, 40 int numPartitions) { 41 return Math.abs(key.getFirst().get()) % numPartitions; 42 } 43 } 44 //如果不添加這個類,默認第一列和第二列都是升序排序的。 45 //這個類的作用是使第一列升序排序,第二列降序排序 46 public static class KeyComparator extends WritableComparator { 47 //無參構造器必須加上,否則報錯。 48 protected KeyComparator() { 49 super(IntPair.class, true); 50 } 51 public int compare(WritableComparable a, WritableComparable b) { 52 IntPair ip1 = (IntPair) a; 53 IntPair ip2 = (IntPair) b; 54 //第一列按升序排序 55 int cmp = ip1.getFirst().compareTo(ip2.getFirst()); 56 if (cmp != 0) { 57 return cmp; 58 } 59 //在第一列相等的情況下,第二列按倒序排序 60 return -ip1.getSecond().compareTo(ip2.getSecond()); 61 } 62 } 63 //入口程序 64 public static void main(String[] args) throws Exception { 65 Configuration conf = new Configuration(); 66 Job job = Job.getInstance(conf); 67 job.setJarByClass(SecondarySort.class); 68 //設置Mapper的相關屬性 69 job.setMapperClass(TheMapper.class); 70 //當Mapper中的輸出的key和value的類型和Reduce輸出 71 //的key和value的類型相同時,以下兩句可以省略。 72 //job.setMapOutputKeyClass(IntPair.class); 73 //job.setMapOutputValueClass(NullWritable.class); 74 FileInputFormat.setInputPaths(job, new Path(args[0])); 75 //設置分區的相關屬性 76 job.setPartitionerClass(FirstPartitioner.class); 77 //在map中對key進行排序 78 job.setSortComparatorClass(KeyComparator.class); 79 //job.setGroupingComparatorClass(GroupComparator.class); 80 //設置Reducer的相關屬性 81 job.setReducerClass(TheReducer.class); 82 job.setOutputKeyClass(IntPair.class); 83 job.setOutputValueClass(NullWritable.class); 84 FileOutputFormat.setOutputPath(job, new Path(args[1])); 85 //設置Reducer數量 86 int reduceNum = 1; 87 if(args.length >= 3 && args[2] != null){ 88 reduceNum = Integer.parseInt(args[2]); 89 } 90 job.setNumReduceTasks(reduceNum); 91 job.waitForCompletion(true); 92 } 93 }
2’ 打包提交
使用Eclipse開發工具將該代碼打包,選擇主類為mr.Secondary。如果沒有指定主類,那么在執行時就要指定須執行的類。假定打包后的文件名為Secondary.jar,主類SecondarySort位於包mr下,則可使用如下命令向Hadoop集群提交本應用。
bin/hadoop jar hdpAction6.jar mr.Secondary /user/mapreduce/secsort/in/secsortdata.txt /user/mapreduce/secsort/out 1
其中“hadoop”為命令,“jar”為命令參數,后面緊跟打的包,/user/mapreduce/secsort/in/secsortdata.txt”為輸入文件在HDFS中的位置,如果HDFS中沒有這個文件,則自己自行上傳。“/user/mapreduce/secsort/out/”為輸出文件在HDFS中的位置,“1”為Reduce個數。
如果打包時已經設定了主類,此時命令中無需再次輸入定義主類!
(上傳secsortdata.txt到HDFS 命令: ” hadoop fs -put 目標文件包括路徑 hdfs路徑 “)
顯示結果:
1 [root@master hadoop]# bin/hadoop jar SecondarySort.jar /secsortdata.txt /user/mapreduce/secsort/out 1 2 Not a valid JAR: /usr/cstor/hadoop/SecondarySort.jar 3 [root@master hadoop]# bin/hadoop jar hdpAction6.jar /secsortdata.txt /user/mapreduce/secsort/out 1 4 17/08/05 04:05:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 5 17/08/05 04:05:49 INFO client.RMProxy: Connecting to ResourceManager at master/10.1.21.27:8032 6 17/08/05 04:05:49 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 7 17/08/05 04:05:50 INFO input.FileInputFormat: Total input paths to process : 1 8 17/08/05 04:05:50 INFO mapreduce.JobSubmitter: number of splits:1 9 17/08/05 04:05:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501872322130_0007 10 17/08/05 04:05:50 INFO impl.YarnClientImpl: Submitted application application_1501872322130_0007 11 17/08/05 04:05:50 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1501872322130_0007/ 12 17/08/05 04:05:50 INFO mapreduce.Job: Running job: job_1501872322130_0007 13 17/08/05 04:05:56 INFO mapreduce.Job: Job job_1501872322130_0007 running in uber mode : false 14 17/08/05 04:05:56 INFO mapreduce.Job: map 0% reduce 0% 15 17/08/05 04:06:00 INFO mapreduce.Job: map 100% reduce 0% 16 17/08/05 04:06:05 INFO mapreduce.Job: map 100% reduce 100% 17 17/08/05 04:06:06 INFO mapreduce.Job: Job job_1501872322130_0007 completed successfully 18 17/08/05 04:06:07 INFO mapreduce.Job: Counters: 49 19 File System Counters 20 FILE: Number of bytes read=106 21 FILE: Number of bytes written=230897 22 FILE: Number of read operations=0 23 FILE: Number of large read operations=0 24 FILE: Number of write operations=0 25 HDFS: Number of bytes read=159 26 HDFS: Number of bytes written=60 27 HDFS: Number of read operations=6 28 HDFS: Number of large read operations=0 29 HDFS: Number of write operations=2 30 Job Counters 31 Launched map tasks=1 32 Launched reduce tasks=1 33 Data-local map tasks=1 34 Total time spent by all maps in occupied slots (ms)=2534 35 Total time spent by all reduces in occupied slots (ms)=2799 36 Total time spent by all map tasks (ms)=2534 37 Total time spent by all reduce tasks (ms)=2799 38 Total vcore-seconds taken by all map tasks=2534 39 Total vcore-seconds taken by all reduce tasks=2799 40 Total megabyte-seconds taken by all map tasks=2594816 41 Total megabyte-seconds taken by all reduce tasks=2866176 42 Map-Reduce Framework 43 Map input records=10 44 Map output records=10 45 Map output bytes=80 46 Map output materialized bytes=106 47 Input split bytes=99 48 Combine input records=0 49 Combine output records=0 50 Reduce input groups=10 51 Reduce shuffle bytes=106 52 Reduce input records=10 53 Reduce output records=10 54 Spilled Records=20 55 Shuffled Maps =1 56 Failed Shuffles=0 57 Merged Map outputs=1 58 GC time elapsed (ms)=55 59 CPU time spent (ms)=1490 60 Physical memory (bytes) snapshot=419209216 61 Virtual memory (bytes) snapshot=1642618880 62 Total committed heap usage (bytes)=402653184 63 Shuffle Errors 64 BAD_ID=0 65 CONNECTION=0 66 IO_ERROR=0 67 WRONG_LENGTH=0 68 WRONG_MAP=0 69 WRONG_REDUCE=0 70 File Input Format Counters 71 Bytes Read=60 72 File Output Format Counters 73 Bytes Written=60
生成文件顯示二次排序結果:
四 計數器
1‘ MapReduce計數器是什么?
計數器是用來記錄job的執行進度和狀態的。它的作用可以理解為日志。我們可以在程序的某個位置插入計數器,記錄數據或者進度的變化情況。
MapReduce計數器能做什么?
MapReduce 計數器(Counter)為我們提供一個窗口,用於觀察 MapReduce Job 運行期的各種細節數據。對MapReduce性能調優很有幫助,MapReduce性能優化的評估大部分都是基於這些 Counter 的數值表現出來的。
在許多情況下,一個用戶需要了解待分析的數據,盡管這並非所要執行的分析任務 的核心內容。以統計數據集中無效記錄數目的任務為例,如果發現無效記錄的比例 相當高,那么就需要認真思考為何存在如此多無效記錄。是所采用的檢測程序存在 缺陷,還是數據集質量確實很低,包含大量無效記錄?如果確定是數據集的質量問 題,則可能需要擴大數據集的規模,以增大有效記錄的比例,從而進行有意義的分析。
計數器是一種收集作業統計信息的有效手段,用於質量控制或應用級統計。計數器 還可輔助診斷系統故障。如果需要將日志信息傳輸到map或reduce任務,更好的 方法通常是嘗試傳輸計數器值以監測某一特定事件是否發生。對於大型分布式作業 而言,使用計數器更為方便。首先,獲取計數器值比輸出日志更方便,其次,根據 計數器值統計特定事件的發生次數要比分析一堆日志文件容易得多。
內置計數器
MapReduce 自帶了許多默認Counter,現在我們來分析這些默認 Counter 的含義,方便大家觀察 Job 結果,如輸入的字節數、輸出的字節數、Map端輸入/輸出的字節數和條數、Reduce端的輸入/輸出的字節數和條數等。下面我們只需了解這些內置計數器,知道計數器組名稱(groupName)和計數器名稱(counterName),以后使用計數器會查找groupName和counterName即可。
自定義計數器
MapReduce允許用戶編寫程序來定義計數器,計數器的值可在mapper或reducer 中增加。多個計數器由一個Java枚舉(enum)類型來定義,以便對計數器分組。一個作業可以定義的枚舉類型數量不限,各個枚舉類型所包含的字段數量也不限。枚 舉類型的名稱即為組的名稱,枚舉類型的字段就是計數器名稱。計數器是全局的。換言之,MapReduce框架將跨所有map和reduce聚集這些計數器,並在作業結束 時產生一個最終結果。
2’ >編輯計數文件counters.txt
>上傳該文件到HDFS
3' 編寫程序Counters.java
1 package mr ; 2 import java.io.IOException; 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Counter; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 import org.apache.hadoop.util.GenericOptionsParser; 13 14 public class Counters { 15 public static class MyCounterMap extends Mapper<LongWritable, Text, Text, Text> { 16 public static Counter ct = null; 17 protected void map(LongWritable key, Text value, 18 org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text>.Context context) 19 throws java.io.IOException, InterruptedException { 20 String arr_value[] = value.toString().split("\t"); 21 if (arr_value.length > 3) { 22 ct = context.getCounter("ErrorCounter", "toolong"); // ErrorCounter為組名,toolong為組員名 23 ct.increment(1); // 計數器加一 24 } else if (arr_value.length < 3) { 25 ct = context.getCounter("ErrorCounter", "tooshort"); 26 ct.increment(1); 27 } 28 } 29 } 30 public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { 31 Configuration conf = new Configuration(); 32 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 33 if (otherArgs.length != 2) { 34 System.err.println("Usage: Counters <in> <out>"); 35 System.exit(2); 36 } 37 Job job = new Job(conf, "Counter"); 38 job.setJarByClass(Counters.class); 39 40 job.setMapperClass(MyCounterMap.class); 41 42 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 43 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 44 System.exit(job.waitForCompletion(true) ? 0 : 1); 45 } 46 }
4' 打包並提交
使用Eclipse開發工具將該代碼打包,選擇主類為mr.Counters。假定打包后的文件名為hdpAction7.jar,主類Counters位於包mr下,則可使用如下命令向Hadoop集群提交本應用。
bin/hadoop jar hdpAction7.jar mr.Counters /counters.txt /usr/counters/out
其中“hadoop”為命令,“jar”為命令參數,后面緊跟打包。 “/usr/counts/in/counts.txt”為輸入文件在HDFS中的位置(如果沒有,自行上傳),“/usr/counts/out”為輸出文件在HDFS中的位置。
顯示結果:
1 [root@master hadoop]# bin/hadoop jar hdpAction7.jar /counters.txt /usr/counters/out 2 17/08/05 04:22:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 3 17/08/05 04:22:59 INFO client.RMProxy: Connecting to ResourceManager at master/10.1.21.27:8032 4 17/08/05 04:23:00 INFO input.FileInputFormat: Total input paths to process : 1 5 17/08/05 04:23:00 INFO mapreduce.JobSubmitter: number of splits:1 6 17/08/05 04:23:00 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501872322130_0008 7 17/08/05 04:23:00 INFO impl.YarnClientImpl: Submitted application application_1501872322130_0008 8 17/08/05 04:23:00 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1501872322130_0008/ 9 17/08/05 04:23:00 INFO mapreduce.Job: Running job: job_1501872322130_0008 10 17/08/05 04:23:05 INFO mapreduce.Job: Job job_1501872322130_0008 running in uber mode : false 11 17/08/05 04:23:05 INFO mapreduce.Job: map 0% reduce 0% 12 17/08/05 04:23:10 INFO mapreduce.Job: map 100% reduce 0% 13 17/08/05 04:23:15 INFO mapreduce.Job: map 100% reduce 100% 14 17/08/05 04:23:16 INFO mapreduce.Job: Job job_1501872322130_0008 completed successfully 15 17/08/05 04:23:16 INFO mapreduce.Job: Counters: 51 16 File System Counters 17 FILE: Number of bytes read=6 18 FILE: Number of bytes written=229309 19 FILE: Number of read operations=0 20 FILE: Number of large read operations=0 21 FILE: Number of write operations=0 22 HDFS: Number of bytes read=134 23 HDFS: Number of bytes written=0 24 HDFS: Number of read operations=6 25 HDFS: Number of large read operations=0 26 HDFS: Number of write operations=2 27 Job Counters 28 Launched map tasks=1 29 Launched reduce tasks=1 30 Data-local map tasks=1 31 Total time spent by all maps in occupied slots (ms)=2400 32 Total time spent by all reduces in occupied slots (ms)=2472 33 Total time spent by all map tasks (ms)=2400 34 Total time spent by all reduce tasks (ms)=2472 35 Total vcore-seconds taken by all map tasks=2400 36 Total vcore-seconds taken by all reduce tasks=2472 37 Total megabyte-seconds taken by all map tasks=2457600 38 Total megabyte-seconds taken by all reduce tasks=2531328 39 Map-Reduce Framework 40 Map input records=4 41 Map output records=0 42 Map output bytes=0 43 Map output materialized bytes=6 44 Input split bytes=96 45 Combine input records=0 46 Combine output records=0 47 Reduce input groups=0 48 Reduce shuffle bytes=6 49 Reduce input records=0 50 Reduce output records=0 51 Spilled Records=0 52 Shuffled Maps =1 53 Failed Shuffles=0 54 Merged Map outputs=1 55 GC time elapsed (ms)=143 56 CPU time spent (ms)=1680 57 Physical memory (bytes) snapshot=413036544 58 Virtual memory (bytes) snapshot=1630470144 59 Total committed heap usage (bytes)=402653184 60 ErrorCounter 61 toolong=1 62 tooshort=1 63 Shuffle Errors 64 BAD_ID=0 65 CONNECTION=0 66 IO_ERROR=0 67 WRONG_LENGTH=0 68 WRONG_MAP=0 69 WRONG_REDUCE=0 70 File Input Format Counters 71 Bytes Read=38 72 File Output Format Counters 73 Bytes Written=0 74 [root@master hadoop]#
五 join操作
1' 概述
對於RDBMS中的Join操作大伙一定非常熟悉,寫SQL的時候要十分注意細節,稍有差池就會耗時巨久造成很大的性能瓶頸,而在Hadoop中使用MapReduce框架進行Join的操作時同樣耗時,但是由於Hadoop的分布式設計理念的特殊性,因此對於這種Join操作同樣也具備了一定的特殊性。
原理
使用MapReduce實現Join操作有多種實現方式:
>在Reduce端連接為最為常見的模式:
Map端的主要工作:為來自不同表(文件)的key/value對打標簽以區別不同來源的記錄。然后用連接字段作為key,其余部分和新加的標志作為value,最后進行輸出。
Reduce端的主要工作:在Reduce端以連接字段作為key的分組已經完成,我們只需要在每一個分組當中將那些來源於不同文件的記錄(在map階段已經打標志)分開,最后進行笛卡爾只就OK了。
>在Map端進行連接
使用場景:一張表十分小、一張表很大。
用法:在提交作業的時候先將小表文件放到該作業的DistributedCache中,然后從DistributeCache中取出該小表進行Join key / value解釋分割放到內存中(可以放大Hash Map等等容器中)。然后掃描大表,看大表中的每條記錄的Join key /value值是否能夠在內存中找到相同Join key的記錄,如果有則直接輸出結果。
>SemiJoin
SemiJoin就是所謂的半連接,其實仔細一看就是Reduce Join的一個變種,就是在map端過濾掉一些數據,在網絡中只傳輸參與連接的數據不參與連接的數據不必在網絡中進行傳輸,從而減少了shuffle的網絡傳輸量,使整體效率得到提高,其他思想和Reduce Join是一模一樣的。說得更加接地氣一點就是將小表中參與Join的key單獨抽出來通過DistributedCach分發到相關節點,然后將其取出放到內存中(可以放到HashSet中),在map階段掃描連接表,將Join key不在內存HashSet中的記錄過濾掉,讓那些參與Join的記錄通過shuffle傳輸到Reduce端進行Join操作,其他的和Reduce Join都是一樣的
2' >創建兩個表文件data.txt 和 info.txt
>上傳到HDFS
3‘ 編寫程序MRJoin.java
程序分析執行過程如下:
在map階段,把所有記錄標記成<key, value>的形式,其中key是1003/1004/1005/1006的字段值,value則根據來源不同取不同的形式:來源於表A的記錄,value的值為“201001 abc”等值;來源於表B的記錄,value的值為”kaka“之類的值。
在Reduce階段,先把每個key下的value列表拆分為分別來自表A和表B的兩部分,分別放入兩個向量中。然后遍歷兩個向量做笛卡爾積,形成一條條最終結果。
代碼如下:
1 package mr; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.io.WritableComparable; 12 import org.apache.hadoop.io.WritableComparator; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Partitioner; 16 import org.apache.hadoop.mapreduce.Reducer; 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 20 import org.apache.hadoop.util.GenericOptionsParser; 21 22 public class MRJoin { 23 public static class MR_Join_Mapper extends Mapper<LongWritable, Text, TextPair, Text> { 24 @Override 25 protected void map(LongWritable key, Text value, Context context) 26 throws IOException, InterruptedException { 27 // 獲取輸入文件的全路徑和名稱 28 String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); 29 if (pathName.contains("data.txt")) { 30 String values[] = value.toString().split("\t"); 31 if (values.length < 3) { 32 // data數據格式不規范,字段小於3,拋棄數據 33 return; 34 } else { 35 // 數據格式規范,區分標識為1 36 TextPair tp = new TextPair(new Text(values[1]), new Text("1")); 37 context.write(tp, new Text(values[0] + "\t" + values[2])); 38 } 39 } 40 if (pathName.contains("info.txt")) { 41 String values[] = value.toString().split("\t"); 42 if (values.length < 2) { 43 // data數據格式不規范,字段小於2,拋棄數據 44 return; 45 } else { 46 // 數據格式規范,區分標識為0 47 TextPair tp = new TextPair(new Text(values[0]), new Text("0")); 48 context.write(tp, new Text(values[1])); 49 } 50 } 51 } 52 } 53 54 public static class MR_Join_Partitioner extends Partitioner<TextPair, Text> { 55 @Override 56 public int getPartition(TextPair key, Text value, int numParititon) { 57 return Math.abs(key.getFirst().hashCode() * 127) % numParititon; 58 } 59 } 60 61 public static class MR_Join_Comparator extends WritableComparator { 62 public MR_Join_Comparator() { 63 super(TextPair.class, true); 64 } 65 66 public int compare(WritableComparable a, WritableComparable b) { 67 TextPair t1 = (TextPair) a; 68 TextPair t2 = (TextPair) b; 69 return t1.getFirst().compareTo(t2.getFirst()); 70 } 71 } 72 73 public static class MR_Join_Reduce extends Reducer<TextPair, Text, Text, Text> { 74 protected void Reduce(TextPair key, Iterable<Text> values, Context context) 75 throws IOException, InterruptedException { 76 Text pid = key.getFirst(); 77 String desc = values.iterator().next().toString(); 78 while (values.iterator().hasNext()) { 79 context.write(pid, new Text(values.iterator().next().toString() + "\t" + desc)); 80 } 81 } 82 } 83 84 85 public static void main(String agrs[]) 86 throws IOException, InterruptedException, ClassNotFoundException { 87 Configuration conf = new Configuration(); 88 GenericOptionsParser parser = new GenericOptionsParser(conf, agrs); 89 String[] otherArgs = parser.getRemainingArgs(); 90 if (agrs.length < 3) { 91 System.err.println("Usage: MRJoin <in_path_one> <in_path_two> <output>"); 92 System.exit(2); 93 } 94 95 Job job = new Job(conf, "MRJoin"); 96 // 設置運行的job 97 job.setJarByClass(MRJoin.class); 98 // 設置Map相關內容 99 job.setMapperClass(MR_Join_Mapper.class); 100 // 設置Map的輸出 101 job.setMapOutputKeyClass(TextPair.class); 102 job.setMapOutputValueClass(Text.class); 103 // 設置partition 104 job.setPartitionerClass(MR_Join_Partitioner.class); 105 // 在分區之后按照指定的條件分組 106 job.setGroupingComparatorClass(MR_Join_Comparator.class); 107 // 設置Reduce 108 job.setReducerClass(MR_Join_Reduce.class); 109 // 設置Reduce的輸出 110 job.setOutputKeyClass(Text.class); 111 job.setOutputValueClass(Text.class); 112 // 設置輸入和輸出的目錄 113 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 114 FileInputFormat.addInputPath(job, new Path(otherArgs[1])); 115 FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 116 // 執行,直到結束就退出 117 System.exit(job.waitForCompletion(true) ? 0 : 1); 118 } 119 } 120 121 class TextPair implements WritableComparable<TextPair> { 122 private Text first; 123 private Text second; 124 125 public TextPair() { 126 set(new Text(), new Text()); 127 } 128 129 public TextPair(String first, String second) { 130 set(new Text(first), new Text(second)); 131 } 132 133 public TextPair(Text first, Text second) { 134 set(first, second); 135 } 136 137 public void set(Text first, Text second) { 138 this.first = first; 139 this.second = second; 140 } 141 142 public Text getFirst() { 143 return first; 144 } 145 146 public Text getSecond() { 147 return second; 148 } 149 150 public void write(DataOutput out) throws IOException { 151 first.write(out); 152 second.write(out); 153 } 154 155 public void readFields(DataInput in) throws IOException { 156 first.readFields(in); 157 second.readFields(in); 158 } 159 160 public int compareTo(TextPair tp) { 161 int cmp = first.compareTo(tp.first); 162 if (cmp != 0) { 163 return cmp; 164 } 165 return second.compareTo(tp.second); 166 } 167 }
4’ 打包並提交
使用Eclipse開發工具將該代碼打包,假定打包后的文件名為hdpAction8.jar,主類MRJoin位於包mr下,則可使用如下命令向Hadoop集群提交本應用。
bin/hadoop jar hdpAction8.jar mr.MRJoin /data.txt /info.txt /usr/MRJoin/out
其中“hadoop”為命令,“jar”為命令參數,后面緊跟打包。 “/data.txt”和 “/info.txt”為輸入文件在HDFS中的位置,“/usr/MRJoin/out”為輸出文件在HDFS中的位置。
執行結果如下:
1 [root@master hadoop]# bin/hadoop jar hdpAction8.jar /data.txt /info.txt /usr/MRJoin/out 2 17/08/05 04:38:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 3 17/08/05 04:38:12 INFO client.RMProxy: Connecting to ResourceManager at master/10.1.21.27:8032 4 17/08/05 04:38:13 INFO input.FileInputFormat: Total input paths to process : 2 5 17/08/05 04:38:13 INFO mapreduce.JobSubmitter: number of splits:2 6 17/08/05 04:38:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501872322130_0009 7 17/08/05 04:38:13 INFO impl.YarnClientImpl: Submitted application application_1501872322130_0009 8 17/08/05 04:38:13 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1501872322130_0009/ 9 17/08/05 04:38:13 INFO mapreduce.Job: Running job: job_1501872322130_0009 10 17/08/05 04:38:18 INFO mapreduce.Job: Job job_1501872322130_0009 running in uber mode : false 11 17/08/05 04:38:18 INFO mapreduce.Job: map 0% reduce 0% 12 17/08/05 04:38:23 INFO mapreduce.Job: map 100% reduce 0% 13 17/08/05 04:38:28 INFO mapreduce.Job: map 100% reduce 100% 14 17/08/05 04:38:28 INFO mapreduce.Job: Job job_1501872322130_0009 completed successfully 15 17/08/05 04:38:29 INFO mapreduce.Job: Counters: 49 16 File System Counters 17 FILE: Number of bytes read=179 18 FILE: Number of bytes written=347823 19 FILE: Number of read operations=0 20 FILE: Number of large read operations=0 21 FILE: Number of write operations=0 22 HDFS: Number of bytes read=317 23 HDFS: Number of bytes written=283 24 HDFS: Number of read operations=9 25 HDFS: Number of large read operations=0 26 HDFS: Number of write operations=2 27 Job Counters 28 Launched map tasks=2 29 Launched reduce tasks=1 30 Data-local map tasks=2 31 Total time spent by all maps in occupied slots (ms)=5122 32 Total time spent by all reduces in occupied slots (ms)=2685 33 Total time spent by all map tasks (ms)=5122 34 Total time spent by all reduce tasks (ms)=2685 35 Total vcore-seconds taken by all map tasks=5122 36 Total vcore-seconds taken by all reduce tasks=2685 37 Total megabyte-seconds taken by all map tasks=5244928 38 Total megabyte-seconds taken by all reduce tasks=2749440 39 Map-Reduce Framework 40 Map input records=10 41 Map output records=10 42 Map output bytes=153 43 Map output materialized bytes=185 44 Input split bytes=184 45 Combine input records=0 46 Combine output records=0 47 Reduce input groups=4 48 Reduce shuffle bytes=185 49 Reduce input records=10 50 Reduce output records=10 51 Spilled Records=20 52 Shuffled Maps =2 53 Failed Shuffles=0 54 Merged Map outputs=2 55 GC time elapsed (ms)=122 56 CPU time spent (ms)=2790 57 Physical memory (bytes) snapshot=680472576 58 Virtual memory (bytes) snapshot=2443010048 59 Total committed heap usage (bytes)=603979776 60 Shuffle Errors 61 BAD_ID=0 62 CONNECTION=0 63 IO_ERROR=0 64 WRONG_LENGTH=0 65 WRONG_MAP=0 66 WRONG_REDUCE=0 67 File Input Format Counters 68 Bytes Read=133 69 File Output Format Counters 70 Bytes Written=283 71 [root@master hadoop]#
> 生成join后的文件在/usr/MRJoin/out目錄下:
六 分布式緩存
1‘ 假定現有一個大為100G的大表big.txt和一個大小為1M的小表small.txt,請基於MapReduce思想編程實現判斷小表中單詞在大表中出現次數。也即所謂的“掃描大表、加載小表”。
為解決上述問題,可開啟10個Map。這樣,每個Map只需處理總量的1/10,將大大加快處理。而在單獨Map內,直接用HashSet加載“1M小表”,對於存在硬盤(Map處理時會將HDFS文件拷貝至本地)的10G大文件,則逐條掃描,這就是所謂的“掃描大表、加載小表”,也即分布式緩存。
2’ >新建兩個txt文件
>上傳到HDFS
首先登錄client機,查看HDFS里是否已存在目錄“/user/root/mr/in”,若不存在,使用下述命令新建該目錄。
/usr/cstor/hadoop/bin/hdfs dfs -mkdir -p /user/root/mr/in
接着,使用下述命令將client機本地文件“/root/data/9/big.txt”和“/root/data/9/ small.txt”上傳至HDFS的“/user/root/mr/in”目錄:
/usr/cstor/hadoop/bin/hdfs dfs -put /root/data/9/big.txt /user/root/mr/in
/usr/cstor/hadoop/bin/hdfs dfs -put /root/data/9/small.txt /user/root/mr/in
3‘ 編寫代碼,新建BigAndSmallTable類並指定包名(代碼中為cn.cstor.mr),在BigAndSmallTable.java文件中
依次寫入如下代碼:
1 package cn.cstor.mr; 2 3 import java.io.IOException; 4 import java.util.HashSet; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.FSDataInputStream; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.IntWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.util.LineReader; 18 19 public class BigAndSmallTable { 20 public static class TokenizerMapper extends 21 Mapper<Object, Text, Text, IntWritable> { 22 private final static IntWritable one = new IntWritable(1); 23 private static HashSet<String> smallTable = null; 24 25 protected void setup(Context context) throws IOException, 26 InterruptedException { 27 smallTable = new HashSet<String>(); 28 Path smallTablePath = new Path(context.getConfiguration().get( 29 "smallTableLocation")); 30 FileSystem hdfs = smallTablePath.getFileSystem(context 31 .getConfiguration()); 32 FSDataInputStream hdfsReader = hdfs.open(smallTablePath); 33 Text line = new Text(); 34 LineReader lineReader = new LineReader(hdfsReader); 35 while (lineReader.readLine(line) > 0) { 36 // you can do something here 37 String[] values = line.toString().split(" "); 38 for (int i = 0; i < values.length; i++) { 39 smallTable.add(values[i]); 40 System.out.println(values[i]); 41 } 42 } 43 lineReader.close(); 44 hdfsReader.close(); 45 System.out.println("setup ok *^_^* "); 46 } 47 48 public void map(Object key, Text value, Context context) 49 throws IOException, InterruptedException { 50 String[] values = value.toString().split(" "); 51 for (int i = 0; i < values.length; i++) { 52 if (smallTable.contains(values[i])) { 53 context.write(new Text(values[i]), one); 54 } 55 } 56 } 57 } 58 59 public static class IntSumReducer extends 60 Reducer<Text, IntWritable, Text, IntWritable> { 61 private IntWritable result = new IntWritable(); 62 63 public void reduce(Text key, Iterable<IntWritable> values, 64 Context context) throws IOException, InterruptedException { 65 int sum = 0; 66 for (IntWritable val : values) { 67 sum += val.get(); 68 } 69 result.set(sum); 70 context.write(key, result); 71 } 72 } 73 74 public static void main(String[] args) throws Exception { 75 Configuration conf = new Configuration(); 76 conf.set("smallTableLocation", args[1]); 77 Job job = Job.getInstance(conf, "BigAndSmallTable"); 78 job.setJarByClass(BigAndSmallTable.class); 79 job.setMapperClass(TokenizerMapper.class); 80 job.setReducerClass(IntSumReducer.class); 81 job.setMapOutputKeyClass(Text.class); 82 job.setMapOutputValueClass(IntWritable.class); 83 job.setOutputKeyClass(Text.class); 84 job.setOutputValueClass(IntWritable.class); 85 FileInputFormat.addInputPath(job, new Path(args[0])); 86 FileOutputFormat.setOutputPath(job, new Path(args[2])); 87 System.exit(job.waitForCompletion(true) ? 0 : 1); 88 } 89 }
4’ 打包上傳並執行
首先,使用“Xmanager Enterprise 5”將“C:\Users\allen\ Desktop\BigSmallTable.jar”上傳至client機。此處上傳至“/root/BigSmallTable.jar”
接着,登錄client機上,使用下述命令提交BigSmallTable.jar任務。
/usr/cstor/hadoop/bin/hadoop jar /root/BigSmallTable.jar cn.cstor.mr.BigAndSmallTable /user/root/mr/in/big.txt /user/root/mr/in/small.txt /user/root/mr/bigAndSmallResult

1 [root@client ~]# /usr/cstor/hadoop/bin/hadoop jar /root/BigSmallTable.jar /user/root/mr/in/big.txt /user/root/mr/in/small.txt /user/root/mr/bigAndSmallResult 2 17/08/05 04:55:51 INFO client.RMProxy: Connecting to ResourceManager at master/10.1.21.27:8032 3 17/08/05 04:55:52 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 4 17/08/05 04:55:52 INFO input.FileInputFormat: Total input paths to process : 1 5 17/08/05 04:55:52 INFO mapreduce.JobSubmitter: number of splits:1 6 17/08/05 04:55:52 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501872322130_0010 7 17/08/05 04:55:53 INFO impl.YarnClientImpl: Submitted application application_1501872322130_0010 8 17/08/05 04:55:53 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1501872322130_0010/ 9 17/08/05 04:55:53 INFO mapreduce.Job: Running job: job_1501872322130_0010 10 17/08/05 04:55:58 INFO mapreduce.Job: Job job_1501872322130_0010 running in uber mode : false 11 17/08/05 04:55:58 INFO mapreduce.Job: map 0% reduce 0% 12 17/08/05 04:56:03 INFO mapreduce.Job: map 100% reduce 0% 13 17/08/05 04:56:08 INFO mapreduce.Job: map 100% reduce 100% 14 17/08/05 04:56:09 INFO mapreduce.Job: Job job_1501872322130_0010 completed successfully 15 17/08/05 04:56:09 INFO mapreduce.Job: Counters: 49 16 File System Counters 17 FILE: Number of bytes read=36 18 FILE: Number of bytes written=231153 19 FILE: Number of read operations=0 20 FILE: Number of large read operations=0 21 FILE: Number of write operations=0 22 HDFS: Number of bytes read=265 23 HDFS: Number of bytes written=18 24 HDFS: Number of read operations=7 25 HDFS: Number of large read operations=0 26 HDFS: Number of write operations=2 27 Job Counters 28 Launched map tasks=1 29 Launched reduce tasks=1 30 Data-local map tasks=1 31 Total time spent by all maps in occupied slots (ms)=2597 32 Total time spent by all reduces in occupied slots (ms)=2755 33 Total time spent by all map tasks (ms)=2597 34 Total time spent by all reduce tasks (ms)=2755 35 Total vcore-seconds taken by all map tasks=2597 36 Total vcore-seconds taken by all reduce tasks=2755 37 Total megabyte-seconds taken by all map tasks=2659328 38 Total megabyte-seconds taken by all reduce tasks=2821120 39 Map-Reduce Framework 40 Map input records=5 41 Map output records=3 42 Map output bytes=24 43 Map output materialized bytes=36 44 Input split bytes=107 45 Combine input records=0 46 Combine output records=0 47 Reduce input groups=3 48 Reduce shuffle bytes=36 49 Reduce input records=3 50 Reduce output records=3 51 Spilled Records=6 52 Shuffled Maps =1 53 Failed Shuffles=0 54 Merged Map outputs=1 55 GC time elapsed (ms)=57 56 CPU time spent (ms)=1480 57 Physical memory (bytes) snapshot=425840640 58 Virtual memory (bytes) snapshot=1651806208 59 Total committed heap usage (bytes)=402653184 60 Shuffle Errors 61 BAD_ID=0 62 CONNECTION=0 63 IO_ERROR=0 64 WRONG_LENGTH=0 65 WRONG_MAP=0 66 WRONG_REDUCE=0 67 File Input Format Counters 68 Bytes Read=147 69 File Output Format Counters 70 Bytes Written=18 71 [root@client ~]#
>查看結果
程序執行后,可使用下述命令查看執行結果,注意若再次執行,請更改結果目錄:
/usr/cstor/hadoop/bin/hdfs dfs -cat /user/root/mr/bigAndSmallResult/part-r-00000
總結:
從五個實驗做出來之后,我們可以系統化的了解mapreduce的運行流程:
首先目標文件上傳到HDFS;
其次編寫目標程序代碼;
然后將其打包上傳到集群服務器上;
再然后執行該jar包;
生成part-r-00000結果文件。
關於hadoop的命令使用也更加熟練,對於一些文件上傳、查看、編輯的處理也可以掌握於心了。學習到這里,對於大數據也算可以入門了,對於大數據也有了一定的了解與基本操作。
路漫漫其修遠兮,吾將上下而求索。日積月累,堅持不懈是學習成果的前提。量變造成質變,望看到此處的朋友們共同努力,互相交流學習,我們都是愛學習的人!