大數據【四】MapReduce(單詞計數;二次排序;計數器;join;分布式緩存)


    前言:

    根據前面的幾篇博客學習,現在可以進行MapReduce學習了。本篇博客首先闡述了MapReduce的概念及使用原理,其次直接從五個實驗中實踐學習(單詞計數,二次排序,計數器,join,分布式緩存)。

 

一 概述

定義

MapReduce是一種計算模型,簡單的說就是將大批量的工作(數據)分解(MAP)執行,然后再將結果合並成最終結果(REDUCE)。這樣做的好處是可以在任務被分解后,可以通過大量機器進行並行計算,減少整個操作的時間。

適用范圍:數據量大,但是數據種類小可以放入內存。

基本原理及要點:將數據交給不同的機器去處理,數據划分,結果歸約。

理解MapReduce和Yarn:在新版Hadoop中,Yarn作為一個資源管理調度框架,是Hadoop下MapReduce程序運行的生存環境。其實MapRuduce除了可以運行Yarn框架下,也可以運行在諸如Mesos,Corona之類的調度框架上,使用不同的調度框架,需要針對Hadoop做不同的適配。(了解YARN見上一篇博客>>  http://www.cnblogs.com/1996swg/p/7286490.html  )

MapReduce編程

編寫在Hadoop中依賴Yarn框架執行的MapReduce程序,並不需要自己開發MRAppMaster和YARNRunner,因為Hadoop已經默認提供通用的YARNRunner和MRAppMaster程序, 大部分情況下只需要編寫相應的Map處理和Reduce處理過程的業務程序即可。

編寫一個MapReduce程序並不復雜,關鍵點在於掌握分布式的編程思想和方法,主要將計算過程分為以下五個步驟:

(1)迭代。遍歷輸入數據,並將之解析成key/value對。

(2)將輸入key/value對映射(map)成另外一些key/value對。

(3)依據key對中間數據進行分組(grouping)。

(4)以組為單位對數據進行歸約(reduce)。

(5)迭代。將最終產生的key/value對保存到輸出文件中。

 Java API解析

(1)InputFormat:用於描述輸入數據的格式,常用的為TextInputFormat提供如下兩個功能:

數據切分: 按照某個策略將輸入數據切分成若干個split,以便確定Map Task個數以及對應的split。

為Mapper提供數據:給定某個split,能將其解析成一個個key/value對。

(2)OutputFormat:用於描述輸出數據的格式,它能夠將用戶提供的key/value對寫入特定格式的文件中。

(3)Mapper/Reducer: Mapper/Reducer中封裝了應用程序的數據處理邏輯。

(4)Writable:Hadoop自定義的序列化接口。實現該類的接口可以用作MapReduce過程中的value數據使用。

(5)WritableComparable:在Writable基礎上繼承了Comparable接口,實現該類的接口可以用作MapReduce過程中的key數據使用。(因為key包含了比較排序的操作)。

二 單詞計數實驗

  !單詞計數文件word

    

  1‘  啟動Hadoop  執行命令啟動(前面博客)部署好的Hadoop系統。

        命令:  

              cd /usr/cstor/hadoop/

               sbin/start-all.sh

        

  2’  驗證HDFS上沒有wordcount的文件夾  此時HDFS上應該是沒有wordcount文件夾。

                 cd /usr/cstor/hadoop/

                 bin/hadoop fs -ls /                    #查看HDFS上根目錄文件 /

        

  3‘  上傳數據文件到HDFS

               cd /usr/cstor/hadoop/

               bin/hadoop fs -put /root/data/5/word  /

        

  4’  編寫MapReduce程序

        在eclipse新建mapreduce項目(方法見博客>> http://www.cnblogs.com/1996swg/p/7286136.html ),新建class類WordCount

        主要編寫Map和Reduce類,其中Map過程需要繼承org.apache.hadoop.mapreduce包中Mapper類,並重寫其map方法;Reduce過程需要繼承org.apache.hadoop.mapreduce包中Reduce類,並重寫其reduce方法。   

 1 import org.apache.hadoop.conf.Configuration;
 2 import org.apache.hadoop.fs.Path;
 3 import org.apache.hadoop.io.IntWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Job;
 6 import org.apache.hadoop.mapreduce.Mapper;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 9 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
10 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
11  
12 import java.io.IOException;
13 import java.util.StringTokenizer;
14  
15  
16 public class WordCount {
17     public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
18         private final static IntWritable one = new IntWritable(1);
19         private Text word = new Text();
20         //map方法,划分一行文本,讀一個單詞寫出一個<單詞,1>
21         public void map(Object key, Text value, Context context)throws IOException, InterruptedException {
22             StringTokenizer itr = new StringTokenizer(value.toString());
23             while (itr.hasMoreTokens()) {
24                 word.set(itr.nextToken());
25                 context.write(word, one);//寫出<單詞,1>
26             }}}
27     //定義reduce類,對相同的單詞,把它們<K,VList>中的VList值全部相加
28     public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
29         private IntWritable result = new IntWritable();
30         public void reduce(Text key, Iterable<IntWritable> values,Context context)
31                 throws IOException, InterruptedException {
32             int sum = 0;
33             for (IntWritable val : values) {
34                 sum += val.get();//相當於<Hello,1><Hello,1>,將兩個1相加
35             }
36             result.set(sum);
37             context.write(key, result);//寫出這個單詞,和這個單詞出現次數<單詞,單詞出現次數>
38         }}
39     public static void main(String[] args) throws Exception {//主方法,函數入口
40         Configuration conf = new Configuration();           //實例化配置文件類
41         Job job = new Job(conf, "WordCount");             //實例化Job類
42         job.setInputFormatClass(TextInputFormat.class);     //指定使用默認輸入格式類
43         TextInputFormat.setInputPaths(job, args[0]);      //設置待處理文件的位置
44         job.setJarByClass(WordCount.class);               //設置主類名
45         job.setMapperClass(TokenizerMapper.class);        //指定使用上述自定義Map類
46         job.setCombinerClass(IntSumReducer.class);        //指定開啟Combiner函數
47         job.setMapOutputKeyClass(Text.class);            //指定Map類輸出的<K,V>,K類型
48         job.setMapOutputValueClass(IntWritable.class);     //指定Map類輸出的<K,V>,V類型
49         job.setPartitionerClass(HashPartitioner.class);       //指定使用默認的HashPartitioner類
50         job.setReducerClass(IntSumReducer.class);         //指定使用上述自定義Reduce類
51         job.setNumReduceTasks(Integer.parseInt(args[2]));  //指定Reduce個數
52         job.setOutputKeyClass(Text.class);                //指定Reduce類輸出的<K,V>,K類型
53         job.setOutputValueClass(Text.class);               //指定Reduce類輸出的<K,V>,V類型
54         job.setOutputFormatClass(TextOutputFormat.class);  //指定使用默認輸出格式類
55         TextOutputFormat.setOutputPath(job, new Path(args[1]));    //設置輸出結果文件位置
56         System.exit(job.waitForCompletion(true) ? 0 : 1);    //提交任務並監控任務狀態
57     }
58 }

 

 5'  打包成jar文件上傳

      假定打包后的文件名為hdpAction.jar,主類WordCount位於包njupt下,則可使用如下命令向YARN集群提交本應用。

             ./yarn  jar  hdpAction.jar  mapreduce1.WordCount  /word  /wordcount 1

      其中“yarn”為命令,“jar”為命令參數,后面緊跟打包后的代碼地址,“mapreduce1”為包名,“WordCount”為主類名,“/word”為輸入文件在HDFS中的位置,/wordcount為輸出文件在HDFS中的位置。

       注意:如果打包時明確了主類,那么在輸入命令時,就無需輸入mapreduce1.WordCount來確定主類!

             

      結果顯示:  

 1 17/08/05 03:37:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 2 17/08/05 03:37:06 INFO client.RMProxy: Connecting to ResourceManager at master/10.1.21.27:8032
 3 17/08/05 03:37:06 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
 4 17/08/05 03:37:07 INFO input.FileInputFormat: Total input paths to process : 1
 5 17/08/05 03:37:07 INFO mapreduce.JobSubmitter: number of splits:1
 6 17/08/05 03:37:07 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501872322130_0004
 7 17/08/05 03:37:07 INFO impl.YarnClientImpl: Submitted application application_1501872322130_0004
 8 17/08/05 03:37:07 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1501872322130_0004/
 9 17/08/05 03:37:07 INFO mapreduce.Job: Running job: job_1501872322130_0004
10 17/08/05 03:37:12 INFO mapreduce.Job: Job job_1501872322130_0004 running in uber mode : false
11 17/08/05 03:37:12 INFO mapreduce.Job:  map 0% reduce 0%
12 17/08/05 03:37:16 INFO mapreduce.Job:  map 100% reduce 0%
13 17/08/05 03:37:22 INFO mapreduce.Job:  map 100% reduce 100%
14 17/08/05 03:37:22 INFO mapreduce.Job: Job job_1501872322130_0004 completed successfully
15 17/08/05 03:37:22 INFO mapreduce.Job: Counters: 49
16     File System Counters
17         FILE: Number of bytes read=54
18         FILE: Number of bytes written=232239
19         FILE: Number of read operations=0
20         FILE: Number of large read operations=0
21         FILE: Number of write operations=0
22         HDFS: Number of bytes read=166
23         HDFS: Number of bytes written=28
24         HDFS: Number of read operations=6
25         HDFS: Number of large read operations=0
26         HDFS: Number of write operations=2
27     Job Counters 
28         Launched map tasks=1
29         Launched reduce tasks=1
30         Data-local map tasks=1
31         Total time spent by all maps in occupied slots (ms)=2275
32         Total time spent by all reduces in occupied slots (ms)=2598
33         Total time spent by all map tasks (ms)=2275
34         Total time spent by all reduce tasks (ms)=2598
35         Total vcore-seconds taken by all map tasks=2275
36         Total vcore-seconds taken by all reduce tasks=2598
37         Total megabyte-seconds taken by all map tasks=2329600
38         Total megabyte-seconds taken by all reduce tasks=2660352
39     Map-Reduce Framework
40         Map input records=8
41         Map output records=20
42         Map output bytes=154
43         Map output materialized bytes=54
44         Input split bytes=88
45         Combine input records=20
46         Combine output records=5
47         Reduce input groups=5
48         Reduce shuffle bytes=54
49         Reduce input records=5
50         Reduce output records=5
51         Spilled Records=10
52         Shuffled Maps =1
53         Failed Shuffles=0
54         Merged Map outputs=1
55         GC time elapsed (ms)=47
56         CPU time spent (ms)=1260
57         Physical memory (bytes) snapshot=421257216
58         Virtual memory (bytes) snapshot=1647611904
59         Total committed heap usage (bytes)=402653184
60     Shuffle Errors
61         BAD_ID=0
62         CONNECTION=0
63         IO_ERROR=0
64         WRONG_LENGTH=0
65         WRONG_MAP=0
66         WRONG_REDUCE=0
67     File Input Format Counters 
68         Bytes Read=78
69     File Output Format Counters 
70         Bytes Written=28

     >生成結果文件wordcount目錄下的part-r-00000,用hadoop命令查看生成文件

    

三  二次排序

      MR默認會對鍵進行排序,然而有的時候我們也有對值進行排序的需求。滿足這種需求一是可以在reduce階段排序收集過來的values,但是,如果有數量巨大的values可能就會導致內存溢出等問題,這就是二次排序應用的場景——將對值的排序也安排到MR計算過程之中,而不是單獨來做。

      二次排序就是首先按照第一字段排序,然后再對第一字段相同的行按照第二字段排序,注意不能破壞第一次排序的結果。

      !需排序文件secsortdata.txt

      

1'  編寫程序IntPair 類和主類 SecondarySort

    同第一個實驗在eclipse編程的創建方法!

    程序主要難點在於排序和聚合。

      對於排序我們需要定義一個IntPair類用於數據的存儲,並在IntPair類內部自定義Comparator類以實現第一字段和第二字段的比較。

      對於聚合我們需要定義一個FirstPartitioner類,在FirstPartitioner類內部指定聚合規則為第一字段。

      此外,我們還需要開啟MapReduce框架自定義Partitioner 功能和GroupingComparator功能。

    Inpair.java

 1 package mr;
 2  
 3 import java.io.DataInput;
 4 import java.io.DataOutput;
 5 import java.io.IOException;
 6  
 7 import org.apache.hadoop.io.IntWritable;
 8 import org.apache.hadoop.io.WritableComparable;
 9  
10 public class IntPair implements WritableComparable<IntPair> {
11     private IntWritable first;
12     private IntWritable second;
13     public void set(IntWritable first, IntWritable second) {
14         this.first = first;
15         this.second = second;
16     }
17     //注意:需要添加無參的構造方法,否則反射時會報錯。
18     public IntPair() {
19         set(new IntWritable(), new IntWritable());
20     }
21     public IntPair(int first, int second) {
22         set(new IntWritable(first), new IntWritable(second));
23     }
24     public IntPair(IntWritable first, IntWritable second) {
25         set(first, second);
26     }
27     public IntWritable getFirst() {
28         return first;
29     }
30     public void setFirst(IntWritable first) {
31         this.first = first;
32     }
33     public IntWritable getSecond() {
34         return second;
35     }
36     public void setSecond(IntWritable second) {
37         this.second = second;
38     }
39     public void write(DataOutput out) throws IOException {
40         first.write(out);
41         second.write(out);
42     }
43     public void readFields(DataInput in) throws IOException {
44         first.readFields(in);
45         second.readFields(in);
46     }
47     public int hashCode() {
48         return first.hashCode() * 163 + second.hashCode();
49     }
50     public boolean equals(Object o) {
51         if (o instanceof IntPair) {
52             IntPair tp = (IntPair) o;
53             return first.equals(tp.first) && second.equals(tp.second);
54         }
55         return false;
56     }
57     public String toString() {
58         return first + "\t" + second;
59     }
60     public int compareTo(IntPair tp) {
61         int cmp = first.compareTo(tp.first);
62         if (cmp != 0) {
63             return cmp;
64         }
65         return second.compareTo(tp.second);
66     }
67 }

 

    secsortdata.java

 1 package mr;
 2  
 3 import java.io.IOException;
 4  
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.NullWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.io.WritableComparable;
11 import org.apache.hadoop.io.WritableComparator;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Partitioner;
15 import org.apache.hadoop.mapreduce.Reducer;
16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
18  
19 public class SecondarySort {
20     static class TheMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> {
21         @Override
22         protected void map(LongWritable key, Text value, Context context)
23                 throws IOException, InterruptedException {
24             String[] fields = value.toString().split("\t");
25             int field1 = Integer.parseInt(fields[0]);
26             int field2 = Integer.parseInt(fields[1]);
27             context.write(new IntPair(field1,field2), NullWritable.get());
28         }
29     }
30     static class TheReducer extends Reducer<IntPair, NullWritable,IntPair, NullWritable> {
31         //private static final Text SEPARATOR = new Text("------------------------------------------------");
32         @Override
33         protected void reduce(IntPair key, Iterable<NullWritable> values, Context context)
34                 throws IOException, InterruptedException {
35             context.write(key, NullWritable.get());
36         }
37     }
38     public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {
39         public int getPartition(IntPair key, NullWritable value,
40                 int numPartitions) {
41             return Math.abs(key.getFirst().get()) % numPartitions;
42         }
43     }
44     //如果不添加這個類,默認第一列和第二列都是升序排序的。
45 //這個類的作用是使第一列升序排序,第二列降序排序
46     public static class KeyComparator extends WritableComparator {
47         //無參構造器必須加上,否則報錯。
48         protected KeyComparator() {
49             super(IntPair.class, true);
50         }
51         public int compare(WritableComparable a, WritableComparable b) {
52             IntPair ip1 = (IntPair) a;
53             IntPair ip2 = (IntPair) b;
54             //第一列按升序排序
55             int cmp = ip1.getFirst().compareTo(ip2.getFirst());
56             if (cmp != 0) {
57                 return cmp;
58             }
59             //在第一列相等的情況下,第二列按倒序排序
60             return -ip1.getSecond().compareTo(ip2.getSecond());
61         }
62     }
63     //入口程序
64     public static void main(String[] args) throws Exception {
65         Configuration conf = new Configuration();
66         Job job = Job.getInstance(conf);
67         job.setJarByClass(SecondarySort.class);
68         //設置Mapper的相關屬性
69         job.setMapperClass(TheMapper.class);
70         //當Mapper中的輸出的key和value的類型和Reduce輸出
71 //的key和value的類型相同時,以下兩句可以省略。
72         //job.setMapOutputKeyClass(IntPair.class);
73         //job.setMapOutputValueClass(NullWritable.class);
74         FileInputFormat.setInputPaths(job, new Path(args[0]));
75         //設置分區的相關屬性
76         job.setPartitionerClass(FirstPartitioner.class);
77         //在map中對key進行排序
78         job.setSortComparatorClass(KeyComparator.class);
79         //job.setGroupingComparatorClass(GroupComparator.class);
80         //設置Reducer的相關屬性
81         job.setReducerClass(TheReducer.class);
82         job.setOutputKeyClass(IntPair.class);
83         job.setOutputValueClass(NullWritable.class);
84         FileOutputFormat.setOutputPath(job, new Path(args[1]));
85         //設置Reducer數量
86         int reduceNum = 1;
87         if(args.length >= 3 && args[2] != null){
88             reduceNum = Integer.parseInt(args[2]);
89         }
90         job.setNumReduceTasks(reduceNum);
91         job.waitForCompletion(true);
92     }
93 }

 

 2’  打包提交

      使用Eclipse開發工具將該代碼打包,選擇主類為mr.Secondary。如果沒有指定主類,那么在執行時就要指定須執行的類。假定打包后的文件名為Secondary.jar,主類SecondarySort位於包mr下,則可使用如下命令向Hadoop集群提交本應用。

       bin/hadoop jar hdpAction6.jar mr.Secondary /user/mapreduce/secsort/in/secsortdata.txt  /user/mapreduce/secsort/out  1

      其中“hadoop”為命令,“jar”為命令參數,后面緊跟打的包,/user/mapreduce/secsort/in/secsortdata.txt”為輸入文件在HDFS中的位置,如果HDFS中沒有這個文件,則自己自行上傳。“/user/mapreduce/secsort/out/”為輸出文件在HDFS中的位置,“1”為Reduce個數。

      如果打包時已經設定了主類,此時命令中無需再次輸入定義主類!

      (上傳secsortdata.txt到HDFS  命令: ” hadoop fs -put  目標文件包括路徑  hdfs路徑  “)

      

     顯示結果:

 1 [root@master hadoop]# bin/hadoop jar SecondarySort.jar  /secsortdata.txt  /user/mapreduce/secsort/out  1
 2 Not a valid JAR: /usr/cstor/hadoop/SecondarySort.jar
 3 [root@master hadoop]# bin/hadoop jar hdpAction6.jar  /secsortdata.txt  /user/mapreduce/secsort/out  1
 4 17/08/05 04:05:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 5 17/08/05 04:05:49 INFO client.RMProxy: Connecting to ResourceManager at master/10.1.21.27:8032
 6 17/08/05 04:05:49 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
 7 17/08/05 04:05:50 INFO input.FileInputFormat: Total input paths to process : 1
 8 17/08/05 04:05:50 INFO mapreduce.JobSubmitter: number of splits:1
 9 17/08/05 04:05:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501872322130_0007
10 17/08/05 04:05:50 INFO impl.YarnClientImpl: Submitted application application_1501872322130_0007
11 17/08/05 04:05:50 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1501872322130_0007/
12 17/08/05 04:05:50 INFO mapreduce.Job: Running job: job_1501872322130_0007
13 17/08/05 04:05:56 INFO mapreduce.Job: Job job_1501872322130_0007 running in uber mode : false
14 17/08/05 04:05:56 INFO mapreduce.Job:  map 0% reduce 0%
15 17/08/05 04:06:00 INFO mapreduce.Job:  map 100% reduce 0%
16 17/08/05 04:06:05 INFO mapreduce.Job:  map 100% reduce 100%
17 17/08/05 04:06:06 INFO mapreduce.Job: Job job_1501872322130_0007 completed successfully
18 17/08/05 04:06:07 INFO mapreduce.Job: Counters: 49
19     File System Counters
20         FILE: Number of bytes read=106
21         FILE: Number of bytes written=230897
22         FILE: Number of read operations=0
23         FILE: Number of large read operations=0
24         FILE: Number of write operations=0
25         HDFS: Number of bytes read=159
26         HDFS: Number of bytes written=60
27         HDFS: Number of read operations=6
28         HDFS: Number of large read operations=0
29         HDFS: Number of write operations=2
30     Job Counters 
31         Launched map tasks=1
32         Launched reduce tasks=1
33         Data-local map tasks=1
34         Total time spent by all maps in occupied slots (ms)=2534
35         Total time spent by all reduces in occupied slots (ms)=2799
36         Total time spent by all map tasks (ms)=2534
37         Total time spent by all reduce tasks (ms)=2799
38         Total vcore-seconds taken by all map tasks=2534
39         Total vcore-seconds taken by all reduce tasks=2799
40         Total megabyte-seconds taken by all map tasks=2594816
41         Total megabyte-seconds taken by all reduce tasks=2866176
42     Map-Reduce Framework
43         Map input records=10
44         Map output records=10
45         Map output bytes=80
46         Map output materialized bytes=106
47         Input split bytes=99
48         Combine input records=0
49         Combine output records=0
50         Reduce input groups=10
51         Reduce shuffle bytes=106
52         Reduce input records=10
53         Reduce output records=10
54         Spilled Records=20
55         Shuffled Maps =1
56         Failed Shuffles=0
57         Merged Map outputs=1
58         GC time elapsed (ms)=55
59         CPU time spent (ms)=1490
60         Physical memory (bytes) snapshot=419209216
61         Virtual memory (bytes) snapshot=1642618880
62         Total committed heap usage (bytes)=402653184
63     Shuffle Errors
64         BAD_ID=0
65         CONNECTION=0
66         IO_ERROR=0
67         WRONG_LENGTH=0
68         WRONG_MAP=0
69         WRONG_REDUCE=0
70     File Input Format Counters 
71         Bytes Read=60
72     File Output Format Counters 
73         Bytes Written=60

   生成文件顯示二次排序結果:

    

四  計數器

1‘  MapReduce計數器是什么?                                                                

    計數器是用來記錄job的執行進度和狀態的。它的作用可以理解為日志。我們可以在程序的某個位置插入計數器,記錄數據或者進度的變化情況。

   MapReduce計數器能做什么?

    MapReduce 計數器(Counter)為我們提供一個窗口,用於觀察 MapReduce Job 運行期的各種細節數據。對MapReduce性能調優很有幫助,MapReduce性能優化的評估大部分都是基於這些 Counter 的數值表現出來的。

    在許多情況下,一個用戶需要了解待分析的數據,盡管這並非所要執行的分析任務 的核心內容。以統計數據集中無效記錄數目的任務為例,如果發現無效記錄的比例 相當高,那么就需要認真思考為何存在如此多無效記錄。是所采用的檢測程序存在 缺陷,還是數據集質量確實很低,包含大量無效記錄?如果確定是數據集的質量問 題,則可能需要擴大數據集的規模,以增大有效記錄的比例,從而進行有意義的分析。 
       計數器是一種收集作業統計信息的有效手段,用於質量控制或應用級統計。計數器 還可輔助診斷系統故障。如果需要將日志信息傳輸到map或reduce任務,更好的 方法通常是嘗試傳輸計數器值以監測某一特定事件是否發生。對於大型分布式作業 而言,使用計數器更為方便。首先,獲取計數器值比輸出日志更方便,其次,根據 計數器值統計特定事件的發生次數要比分析一堆日志文件容易得多。 

  內置計數器

     MapReduce 自帶了許多默認Counter,現在我們來分析這些默認 Counter 的含義,方便大家觀察 Job 結果,如輸入的字節數、輸出的字節數、Map端輸入/輸出的字節數和條數、Reduce端的輸入/輸出的字節數和條數等。下面我們只需了解這些內置計數器,知道計數器組名稱(groupName)和計數器名稱(counterName),以后使用計數器會查找groupName和counterName即可。  

  自定義計數器

    MapReduce允許用戶編寫程序來定義計數器,計數器的值可在mapper或reducer 中增加。多個計數器由一個Java枚舉(enum)類型來定義,以便對計數器分組。一個作業可以定義的枚舉類型數量不限,各個枚舉類型所包含的字段數量也不限。枚 舉類型的名稱即為組的名稱,枚舉類型的字段就是計數器名稱。計數器是全局的。換言之,MapReduce框架將跨所有map和reduce聚集這些計數器,並在作業結束 時產生一個最終結果。

2’  >編輯計數文件counters.txt

    

   >上傳該文件到HDFS

    

3'  編寫程序Counters.java

 1 package mr ;
 2 import java.io.IOException;
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Counter;
 8 import org.apache.hadoop.mapreduce.Job;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12 import org.apache.hadoop.util.GenericOptionsParser;
13  
14 public class Counters {
15 public static class MyCounterMap extends Mapper<LongWritable, Text, Text, Text> {
16            public static Counter ct = null;
17            protected void map(LongWritable key, Text value,
18                              org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text>.Context context)
19                              throws java.io.IOException, InterruptedException {
20                     String arr_value[] = value.toString().split("\t");
21 if (arr_value.length > 3) {
22 ct = context.getCounter("ErrorCounter", "toolong"); // ErrorCounter為組名,toolong為組員名
23 ct.increment(1); // 計數器加一
24 } else if (arr_value.length < 3) {
25 ct = context.getCounter("ErrorCounter", "tooshort");
26                     ct.increment(1);
27            }
28 }
29 }
30 public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
31 Configuration conf = new Configuration();
32 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
33 if (otherArgs.length != 2) {
34            System.err.println("Usage: Counters <in> <out>");
35 System.exit(2);
36 }
37 Job job = new Job(conf, "Counter");
38            job.setJarByClass(Counters.class);
39  
40            job.setMapperClass(MyCounterMap.class);
41  
42            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
43            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
44            System.exit(job.waitForCompletion(true) ? 0 : 1);
45 }
46 }

 

4'  打包並提交

使用Eclipse開發工具將該代碼打包,選擇主類為mr.Counters。假定打包后的文件名為hdpAction7.jar,主類Counters位於包mr下,則可使用如下命令向Hadoop集群提交本應用。

        bin/hadoop  jar  hdpAction7.jar  mr.Counters   /counters.txt  /usr/counters/out

其中“hadoop”為命令,“jar”為命令參數,后面緊跟打包。 “/usr/counts/in/counts.txt”為輸入文件在HDFS中的位置(如果沒有,自行上傳),“/usr/counts/out”為輸出文件在HDFS中的位置。

    顯示結果:    

 1 [root@master hadoop]# bin/hadoop  jar  hdpAction7.jar   /counters.txt  /usr/counters/out
 2 17/08/05 04:22:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 3 17/08/05 04:22:59 INFO client.RMProxy: Connecting to ResourceManager at master/10.1.21.27:8032
 4 17/08/05 04:23:00 INFO input.FileInputFormat: Total input paths to process : 1
 5 17/08/05 04:23:00 INFO mapreduce.JobSubmitter: number of splits:1
 6 17/08/05 04:23:00 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501872322130_0008
 7 17/08/05 04:23:00 INFO impl.YarnClientImpl: Submitted application application_1501872322130_0008
 8 17/08/05 04:23:00 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1501872322130_0008/
 9 17/08/05 04:23:00 INFO mapreduce.Job: Running job: job_1501872322130_0008
10 17/08/05 04:23:05 INFO mapreduce.Job: Job job_1501872322130_0008 running in uber mode : false
11 17/08/05 04:23:05 INFO mapreduce.Job:  map 0% reduce 0%
12 17/08/05 04:23:10 INFO mapreduce.Job:  map 100% reduce 0%
13 17/08/05 04:23:15 INFO mapreduce.Job:  map 100% reduce 100%
14 17/08/05 04:23:16 INFO mapreduce.Job: Job job_1501872322130_0008 completed successfully
15 17/08/05 04:23:16 INFO mapreduce.Job: Counters: 51
16     File System Counters
17         FILE: Number of bytes read=6
18         FILE: Number of bytes written=229309
19         FILE: Number of read operations=0
20         FILE: Number of large read operations=0
21         FILE: Number of write operations=0
22         HDFS: Number of bytes read=134
23         HDFS: Number of bytes written=0
24         HDFS: Number of read operations=6
25         HDFS: Number of large read operations=0
26         HDFS: Number of write operations=2
27     Job Counters 
28         Launched map tasks=1
29         Launched reduce tasks=1
30         Data-local map tasks=1
31         Total time spent by all maps in occupied slots (ms)=2400
32         Total time spent by all reduces in occupied slots (ms)=2472
33         Total time spent by all map tasks (ms)=2400
34         Total time spent by all reduce tasks (ms)=2472
35         Total vcore-seconds taken by all map tasks=2400
36         Total vcore-seconds taken by all reduce tasks=2472
37         Total megabyte-seconds taken by all map tasks=2457600
38         Total megabyte-seconds taken by all reduce tasks=2531328
39     Map-Reduce Framework
40         Map input records=4
41         Map output records=0
42         Map output bytes=0
43         Map output materialized bytes=6
44         Input split bytes=96
45         Combine input records=0
46         Combine output records=0
47         Reduce input groups=0
48         Reduce shuffle bytes=6
49         Reduce input records=0
50         Reduce output records=0
51         Spilled Records=0
52         Shuffled Maps =1
53         Failed Shuffles=0
54         Merged Map outputs=1
55         GC time elapsed (ms)=143
56         CPU time spent (ms)=1680
57         Physical memory (bytes) snapshot=413036544
58         Virtual memory (bytes) snapshot=1630470144
59         Total committed heap usage (bytes)=402653184
60     ErrorCounter
61         toolong=1
62         tooshort=1
63     Shuffle Errors
64         BAD_ID=0
65         CONNECTION=0
66         IO_ERROR=0
67         WRONG_LENGTH=0
68         WRONG_MAP=0
69         WRONG_REDUCE=0
70     File Input Format Counters 
71         Bytes Read=38
72     File Output Format Counters 
73         Bytes Written=0
74 [root@master hadoop]# 

 

 

五  join操作

1'  概述

    對於RDBMS中的Join操作大伙一定非常熟悉,寫SQL的時候要十分注意細節,稍有差池就會耗時巨久造成很大的性能瓶頸,而在Hadoop中使用MapReduce框架進行Join的操作時同樣耗時,但是由於Hadoop的分布式設計理念的特殊性,因此對於這種Join操作同樣也具備了一定的特殊性。

     原理

使用MapReduce實現Join操作有多種實現方式:

>在Reduce端連接為最為常見的模式:

Map端的主要工作:為來自不同表(文件)的key/value對打標簽以區別不同來源的記錄。然后用連接字段作為key,其余部分和新加的標志作為value,最后進行輸出。

Reduce端的主要工作:在Reduce端以連接字段作為key的分組已經完成,我們只需要在每一個分組當中將那些來源於不同文件的記錄(在map階段已經打標志)分開,最后進行笛卡爾只就OK了。

>在Map端進行連接

使用場景:一張表十分小、一張表很大。

用法:在提交作業的時候先將小表文件放到該作業的DistributedCache中,然后從DistributeCache中取出該小表進行Join key / value解釋分割放到內存中(可以放大Hash Map等等容器中)。然后掃描大表,看大表中的每條記錄的Join key /value值是否能夠在內存中找到相同Join key的記錄,如果有則直接輸出結果。

>SemiJoin

SemiJoin就是所謂的半連接,其實仔細一看就是Reduce Join的一個變種,就是在map端過濾掉一些數據,在網絡中只傳輸參與連接的數據不參與連接的數據不必在網絡中進行傳輸,從而減少了shuffle的網絡傳輸量,使整體效率得到提高,其他思想和Reduce Join是一模一樣的。說得更加接地氣一點就是將小表中參與Join的key單獨抽出來通過DistributedCach分發到相關節點,然后將其取出放到內存中(可以放到HashSet中),在map階段掃描連接表,將Join key不在內存HashSet中的記錄過濾掉,讓那些參與Join的記錄通過shuffle傳輸到Reduce端進行Join操作,其他的和Reduce Join都是一樣的

2'  >創建兩個表文件data.txt 和 info.txt

    

    >上傳到HDFS

    

 

3‘  編寫程序MRJoin.java

    程序分析執行過程如下:

在map階段,把所有記錄標記成<key, value>的形式,其中key是1003/1004/1005/1006的字段值,value則根據來源不同取不同的形式:來源於表A的記錄,value的值為“201001 abc”等值;來源於表B的記錄,value的值為”kaka“之類的值。

在Reduce階段,先把每個key下的value列表拆分為分別來自表A和表B的兩部分,分別放入兩個向量中。然后遍歷兩個向量做笛卡爾積,形成一條條最終結果。

    代碼如下:

  1 package mr;
  2  
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6  
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.io.WritableComparable;
 12 import org.apache.hadoop.io.WritableComparator;
 13 import org.apache.hadoop.mapreduce.Job;
 14 import org.apache.hadoop.mapreduce.Mapper;
 15 import org.apache.hadoop.mapreduce.Partitioner;
 16 import org.apache.hadoop.mapreduce.Reducer;
 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 19 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 20 import org.apache.hadoop.util.GenericOptionsParser;
 21  
 22 public class MRJoin {
 23 public static class MR_Join_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {
 24          @Override
 25            protected void map(LongWritable key, Text value, Context context)
 26                                                                                              throws IOException, InterruptedException {
 27                     // 獲取輸入文件的全路徑和名稱
 28                     String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
 29                     if (pathName.contains("data.txt")) {
 30                              String values[] = value.toString().split("\t");
 31                              if (values.length < 3) {
 32                                        // data數據格式不規范,字段小於3,拋棄數據
 33                                        return;
 34                              } else {
 35                                        // 數據格式規范,區分標識為1
 36                                        TextPair tp = new TextPair(new Text(values[1]), new Text("1"));
 37                                        context.write(tp, new Text(values[0] + "\t" + values[2]));
 38                              }
 39                     }
 40                     if (pathName.contains("info.txt")) {
 41                              String values[] = value.toString().split("\t");
 42                              if (values.length < 2) {
 43                                        // data數據格式不規范,字段小於2,拋棄數據
 44                                        return;
 45                              } else {
 46                                        // 數據格式規范,區分標識為0
 47                                        TextPair tp = new TextPair(new Text(values[0]), new Text("0"));
 48                                        context.write(tp, new Text(values[1]));
 49                              }
 50                     }
 51            }
 52 }
 53  
 54 public static class MR_Join_Partitioner extends Partitioner<TextPair, Text> {
 55            @Override
 56            public int getPartition(TextPair key, Text value, int numParititon) {
 57                     return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
 58            }
 59 }
 60  
 61 public static class MR_Join_Comparator extends WritableComparator {
 62            public MR_Join_Comparator() {
 63                     super(TextPair.class, true);
 64            }
 65  
 66            public int compare(WritableComparable a, WritableComparable b) {
 67                     TextPair t1 = (TextPair) a;
 68                     TextPair t2 = (TextPair) b;
 69                     return t1.getFirst().compareTo(t2.getFirst());
 70            }
 71 }
 72  
 73 public static class MR_Join_Reduce extends Reducer<TextPair, Text, Text, Text> {
 74            protected void Reduce(TextPair key, Iterable<Text> values, Context context)
 75                              throws IOException, InterruptedException {
 76                     Text pid = key.getFirst();
 77                     String desc = values.iterator().next().toString();
 78                     while (values.iterator().hasNext()) {
 79                              context.write(pid, new Text(values.iterator().next().toString() + "\t" + desc));
 80                     }
 81            }
 82 }
 83  
 84  
 85 public static void main(String agrs[])
 86                                                 throws IOException, InterruptedException, ClassNotFoundException {
 87            Configuration conf = new Configuration();
 88            GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);
 89            String[] otherArgs = parser.getRemainingArgs();
 90            if (agrs.length < 3) {
 91                     System.err.println("Usage: MRJoin <in_path_one> <in_path_two> <output>");
 92                     System.exit(2);
 93            }
 94           
 95            Job job = new Job(conf, "MRJoin");
 96            // 設置運行的job
 97            job.setJarByClass(MRJoin.class);
 98            // 設置Map相關內容
 99            job.setMapperClass(MR_Join_Mapper.class);
100            // 設置Map的輸出
101            job.setMapOutputKeyClass(TextPair.class);
102            job.setMapOutputValueClass(Text.class);
103            // 設置partition
104            job.setPartitionerClass(MR_Join_Partitioner.class);
105            // 在分區之后按照指定的條件分組
106            job.setGroupingComparatorClass(MR_Join_Comparator.class);
107            // 設置Reduce
108            job.setReducerClass(MR_Join_Reduce.class);
109            // 設置Reduce的輸出
110            job.setOutputKeyClass(Text.class);
111            job.setOutputValueClass(Text.class);
112            // 設置輸入和輸出的目錄
113            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
114            FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
115            FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
116            // 執行,直到結束就退出
117            System.exit(job.waitForCompletion(true) ? 0 : 1);
118  }
119 }
120  
121 class TextPair implements WritableComparable<TextPair> {
122 private Text first;
123 private Text second;
124  
125 public TextPair() {
126            set(new Text(), new Text());
127 }
128  
129 public TextPair(String first, String second) {
130            set(new Text(first), new Text(second));
131 }
132  
133 public TextPair(Text first, Text second) {
134            set(first, second);
135 }
136  
137 public void set(Text first, Text second) {
138            this.first = first;
139            this.second = second;
140 }
141  
142 public Text getFirst() {
143            return first;
144 }
145  
146 public Text getSecond() {
147            return second;
148 }
149  
150 public void write(DataOutput out) throws IOException {
151            first.write(out);
152            second.write(out);
153 }
154  
155 public void readFields(DataInput in) throws IOException {
156            first.readFields(in);
157            second.readFields(in);
158 }
159  
160 public int compareTo(TextPair tp) {
161            int cmp = first.compareTo(tp.first);
162            if (cmp != 0) {
163                     return cmp;
164            }
165            return second.compareTo(tp.second);
166 }
167 }

 

 4’  打包並提交

使用Eclipse開發工具將該代碼打包,假定打包后的文件名為hdpAction8.jar,主類MRJoin位於包mr下,則可使用如下命令向Hadoop集群提交本應用。

       bin/hadoop jar hdpAction8.jar mr.MRJoin  /data.txt    /info.txt   /usr/MRJoin/out

其中“hadoop”為命令,“jar”為命令參數,后面緊跟打包。 “/data.txt”和 “/info.txt”為輸入文件在HDFS中的位置,“/usr/MRJoin/out”為輸出文件在HDFS中的位置。

  執行結果如下:   

 1 [root@master hadoop]# bin/hadoop jar hdpAction8.jar  /data.txt    /info.txt   /usr/MRJoin/out
 2 17/08/05 04:38:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 3 17/08/05 04:38:12 INFO client.RMProxy: Connecting to ResourceManager at master/10.1.21.27:8032
 4 17/08/05 04:38:13 INFO input.FileInputFormat: Total input paths to process : 2
 5 17/08/05 04:38:13 INFO mapreduce.JobSubmitter: number of splits:2
 6 17/08/05 04:38:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501872322130_0009
 7 17/08/05 04:38:13 INFO impl.YarnClientImpl: Submitted application application_1501872322130_0009
 8 17/08/05 04:38:13 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1501872322130_0009/
 9 17/08/05 04:38:13 INFO mapreduce.Job: Running job: job_1501872322130_0009
10 17/08/05 04:38:18 INFO mapreduce.Job: Job job_1501872322130_0009 running in uber mode : false
11 17/08/05 04:38:18 INFO mapreduce.Job:  map 0% reduce 0%
12 17/08/05 04:38:23 INFO mapreduce.Job:  map 100% reduce 0%
13 17/08/05 04:38:28 INFO mapreduce.Job:  map 100% reduce 100%
14 17/08/05 04:38:28 INFO mapreduce.Job: Job job_1501872322130_0009 completed successfully
15 17/08/05 04:38:29 INFO mapreduce.Job: Counters: 49
16     File System Counters
17         FILE: Number of bytes read=179
18         FILE: Number of bytes written=347823
19         FILE: Number of read operations=0
20         FILE: Number of large read operations=0
21         FILE: Number of write operations=0
22         HDFS: Number of bytes read=317
23         HDFS: Number of bytes written=283
24         HDFS: Number of read operations=9
25         HDFS: Number of large read operations=0
26         HDFS: Number of write operations=2
27     Job Counters 
28         Launched map tasks=2
29         Launched reduce tasks=1
30         Data-local map tasks=2
31         Total time spent by all maps in occupied slots (ms)=5122
32         Total time spent by all reduces in occupied slots (ms)=2685
33         Total time spent by all map tasks (ms)=5122
34         Total time spent by all reduce tasks (ms)=2685
35         Total vcore-seconds taken by all map tasks=5122
36         Total vcore-seconds taken by all reduce tasks=2685
37         Total megabyte-seconds taken by all map tasks=5244928
38         Total megabyte-seconds taken by all reduce tasks=2749440
39     Map-Reduce Framework
40         Map input records=10
41         Map output records=10
42         Map output bytes=153
43         Map output materialized bytes=185
44         Input split bytes=184
45         Combine input records=0
46         Combine output records=0
47         Reduce input groups=4
48         Reduce shuffle bytes=185
49         Reduce input records=10
50         Reduce output records=10
51         Spilled Records=20
52         Shuffled Maps =2
53         Failed Shuffles=0
54         Merged Map outputs=2
55         GC time elapsed (ms)=122
56         CPU time spent (ms)=2790
57         Physical memory (bytes) snapshot=680472576
58         Virtual memory (bytes) snapshot=2443010048
59         Total committed heap usage (bytes)=603979776
60     Shuffle Errors
61         BAD_ID=0
62         CONNECTION=0
63         IO_ERROR=0
64         WRONG_LENGTH=0
65         WRONG_MAP=0
66         WRONG_REDUCE=0
67     File Input Format Counters 
68         Bytes Read=133
69     File Output Format Counters 
70         Bytes Written=283
71 [root@master hadoop]# 

    > 生成join后的文件在/usr/MRJoin/out目錄下:

  

六  分布式緩存

1‘  假定現有一個大為100G的大表big.txt和一個大小為1M的小表small.txt,請基於MapReduce思想編程實現判斷小表中單詞在大表中出現次數。也即所謂的“掃描大表、加載小表”。

  為解決上述問題,可開啟10個Map。這樣,每個Map只需處理總量的1/10,將大大加快處理。而在單獨Map內,直接用HashSet加載“1M小表”,對於存在硬盤(Map處理時會將HDFS文件拷貝至本地)的10G大文件,則逐條掃描,這就是所謂的“掃描大表、加載小表”,也即分布式緩存。

2’  >新建兩個txt文件

    

   >上傳到HDFS

    首先登錄client機,查看HDFS里是否已存在目錄“/user/root/mr/in”,若不存在,使用下述命令新建該目錄。

         /usr/cstor/hadoop/bin/hdfs  dfs  -mkdir  -p  /user/root/mr/in

        

    接着,使用下述命令將client機本地文件“/root/data/9/big.txt”和“/root/data/9/ small.txt”上傳至HDFS的“/user/root/mr/in”目錄

         /usr/cstor/hadoop/bin/hdfs  dfs  -put  /root/data/9/big.txt  /user/root/mr/in

         /usr/cstor/hadoop/bin/hdfs  dfs  -put  /root/data/9/small.txt  /user/root/mr/in

        

3‘  編寫代碼,新建BigAndSmallTable類並指定包名(代碼中為cn.cstor.mr),在BigAndSmallTable.java文件中

      依次寫入如下代碼:      

 1 package cn.cstor.mr;
 2  
 3 import java.io.IOException;
 4 import java.util.HashSet;
 5  
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.fs.FSDataInputStream;
 8 import org.apache.hadoop.fs.FileSystem;
 9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.IntWritable;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.util.LineReader;
18  
19 public class BigAndSmallTable {
20 public static class TokenizerMapper extends
21 Mapper<Object, Text, Text, IntWritable> {
22 private final static IntWritable one = new IntWritable(1);
23 private static HashSet<String> smallTable = null;
24  
25 protected void setup(Context context) throws IOException,
26 InterruptedException {
27 smallTable = new HashSet<String>();
28 Path smallTablePath = new Path(context.getConfiguration().get(
29 "smallTableLocation"));
30 FileSystem hdfs = smallTablePath.getFileSystem(context
31 .getConfiguration());
32 FSDataInputStream hdfsReader = hdfs.open(smallTablePath);
33 Text line = new Text();
34 LineReader lineReader = new LineReader(hdfsReader);
35 while (lineReader.readLine(line) > 0) {
36 // you can do something here
37 String[] values = line.toString().split(" ");
38 for (int i = 0; i < values.length; i++) {
39 smallTable.add(values[i]);
40 System.out.println(values[i]);
41 }
42 }
43 lineReader.close();
44 hdfsReader.close();
45 System.out.println("setup ok *^_^* ");
46 }
47  
48 public void map(Object key, Text value, Context context)
49 throws IOException, InterruptedException {
50 String[] values = value.toString().split(" ");
51 for (int i = 0; i < values.length; i++) {
52 if (smallTable.contains(values[i])) {
53 context.write(new Text(values[i]), one);
54 }
55 }
56 }
57 }
58  
59 public static class IntSumReducer extends
60 Reducer<Text, IntWritable, Text, IntWritable> {
61 private IntWritable result = new IntWritable();
62  
63 public void reduce(Text key, Iterable<IntWritable> values,
64 Context context) throws IOException, InterruptedException {
65 int sum = 0;
66 for (IntWritable val : values) {
67 sum += val.get();
68 }
69 result.set(sum);
70 context.write(key, result);
71 }
72 }
73  
74 public static void main(String[] args) throws Exception {
75 Configuration conf = new Configuration();
76 conf.set("smallTableLocation", args[1]);
77 Job job = Job.getInstance(conf, "BigAndSmallTable");
78 job.setJarByClass(BigAndSmallTable.class);
79 job.setMapperClass(TokenizerMapper.class);
80 job.setReducerClass(IntSumReducer.class);
81 job.setMapOutputKeyClass(Text.class);
82 job.setMapOutputValueClass(IntWritable.class);
83 job.setOutputKeyClass(Text.class);
84 job.setOutputValueClass(IntWritable.class);
85 FileInputFormat.addInputPath(job, new Path(args[0]));
86 FileOutputFormat.setOutputPath(job, new Path(args[2]));
87 System.exit(job.waitForCompletion(true) ? 0 : 1);
88 }
89 }

 4’  打包上傳並執行   

    首先,使用“Xmanager Enterprise 5”將“C:\Users\allen\ Desktop\BigSmallTable.jar”上傳至client機。此處上傳至“/root/BigSmallTable.jar”

    接着,登錄client機上,使用下述命令提交BigSmallTable.jar任務。

         /usr/cstor/hadoop/bin/hadoop  jar  /root/BigSmallTable.jar  cn.cstor.mr.BigAndSmallTable  /user/root/mr/in/big.txt   /user/root/mr/in/small.txt  /user/root/mr/bigAndSmallResult 

 1 [root@client ~]# /usr/cstor/hadoop/bin/hadoop  jar  /root/BigSmallTable.jar   /user/root/mr/in/big.txt   /user/root/mr/in/small.txt  /user/root/mr/bigAndSmallResult
 2 17/08/05 04:55:51 INFO client.RMProxy: Connecting to ResourceManager at master/10.1.21.27:8032
 3 17/08/05 04:55:52 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
 4 17/08/05 04:55:52 INFO input.FileInputFormat: Total input paths to process : 1
 5 17/08/05 04:55:52 INFO mapreduce.JobSubmitter: number of splits:1
 6 17/08/05 04:55:52 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1501872322130_0010
 7 17/08/05 04:55:53 INFO impl.YarnClientImpl: Submitted application application_1501872322130_0010
 8 17/08/05 04:55:53 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1501872322130_0010/
 9 17/08/05 04:55:53 INFO mapreduce.Job: Running job: job_1501872322130_0010
10 17/08/05 04:55:58 INFO mapreduce.Job: Job job_1501872322130_0010 running in uber mode : false
11 17/08/05 04:55:58 INFO mapreduce.Job:  map 0% reduce 0%
12 17/08/05 04:56:03 INFO mapreduce.Job:  map 100% reduce 0%
13 17/08/05 04:56:08 INFO mapreduce.Job:  map 100% reduce 100%
14 17/08/05 04:56:09 INFO mapreduce.Job: Job job_1501872322130_0010 completed successfully
15 17/08/05 04:56:09 INFO mapreduce.Job: Counters: 49
16     File System Counters
17         FILE: Number of bytes read=36
18         FILE: Number of bytes written=231153
19         FILE: Number of read operations=0
20         FILE: Number of large read operations=0
21         FILE: Number of write operations=0
22         HDFS: Number of bytes read=265
23         HDFS: Number of bytes written=18
24         HDFS: Number of read operations=7
25         HDFS: Number of large read operations=0
26         HDFS: Number of write operations=2
27     Job Counters 
28         Launched map tasks=1
29         Launched reduce tasks=1
30         Data-local map tasks=1
31         Total time spent by all maps in occupied slots (ms)=2597
32         Total time spent by all reduces in occupied slots (ms)=2755
33         Total time spent by all map tasks (ms)=2597
34         Total time spent by all reduce tasks (ms)=2755
35         Total vcore-seconds taken by all map tasks=2597
36         Total vcore-seconds taken by all reduce tasks=2755
37         Total megabyte-seconds taken by all map tasks=2659328
38         Total megabyte-seconds taken by all reduce tasks=2821120
39     Map-Reduce Framework
40         Map input records=5
41         Map output records=3
42         Map output bytes=24
43         Map output materialized bytes=36
44         Input split bytes=107
45         Combine input records=0
46         Combine output records=0
47         Reduce input groups=3
48         Reduce shuffle bytes=36
49         Reduce input records=3
50         Reduce output records=3
51         Spilled Records=6
52         Shuffled Maps =1
53         Failed Shuffles=0
54         Merged Map outputs=1
55         GC time elapsed (ms)=57
56         CPU time spent (ms)=1480
57         Physical memory (bytes) snapshot=425840640
58         Virtual memory (bytes) snapshot=1651806208
59         Total committed heap usage (bytes)=402653184
60     Shuffle Errors
61         BAD_ID=0
62         CONNECTION=0
63         IO_ERROR=0
64         WRONG_LENGTH=0
65         WRONG_MAP=0
66         WRONG_REDUCE=0
67     File Input Format Counters 
68         Bytes Read=147
69     File Output Format Counters 
70         Bytes Written=18
71 [root@client ~]# 
該命令執行顯示

     >查看結果

        程序執行后,可使用下述命令查看執行結果,注意若再次執行,請更改結果目錄:

             /usr/cstor/hadoop/bin/hdfs  dfs  -cat  /user/root/mr/bigAndSmallResult/part-r-00000

      

 

總結:

   從五個實驗做出來之后,我們可以系統化的了解mapreduce的運行流程:

首先目標文件上傳到HDFS;

其次編寫目標程序代碼;

然后將其打包上傳到集群服務器上;

再然后執行該jar包;

生成part-r-00000結果文件。

  關於hadoop的命令使用也更加熟練,對於一些文件上傳、查看、編輯的處理也可以掌握於心了。學習到這里,對於大數據也算可以入門了,對於大數據也有了一定的了解與基本操作。

  路漫漫其修遠兮,吾將上下而求索。日積月累,堅持不懈是學習成果的前提。量變造成質變,望看到此處的朋友們共同努力,互相交流學習,我們都是愛學習的人!

      

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM