Hadoop日記Day16---命令行運行MapReduce程序


一、代碼編寫

1.1 單詞統計

  回顧我們以前單詞統計的例子,如代碼1.1所示。

 1 package counter;
 2 
 3 import java.net.URI;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.FileSystem;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Counter;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
19 
20 public class WordCountApp {
21     static final String INPUT_PATH = "hdfs://hadoop:9000/hello";
22     static final String OUT_PATH = "hdfs://hadoop:9000/out";
23     
24     public static void main(String[] args) throws Exception {
25         
26         Configuration conf = new Configuration();
27         
28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
29         final Path outPath = new Path(OUT_PATH);
30         
31         if(fileSystem.exists(outPath)){
32             fileSystem.delete(outPath, true);
33         }
34         
35         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
36         
37         FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定讀取的文件位於哪里
38         
39         job.setInputFormatClass(TextInputFormat.class);//指定如何對輸入文件進行格式化,把輸入文件每一行解析成鍵值對
40         
41         job.setMapperClass(MyMapper.class);//1.2 指定自定義的map類
42         job.setMapOutputKeyClass(Text.class);//map輸出的<k,v>類型。如果<k3,v3>的類型與<k2,v2>類型一致,則可以省略
43         job.setMapOutputValueClass(LongWritable.class);
44         
45         job.setPartitionerClass(HashPartitioner.class);//1.3 分區
46         job.setNumReduceTasks(1);//有一個reduce任務運行
47         
48         job.setReducerClass(MyReducer.class);//2.2 指定自定義reduce類
49         job.setOutputKeyClass(Text.class);//指定reduce的輸出類型
50         job.setOutputValueClass(LongWritable.class);
51         
52         FileOutputFormat.setOutputPath(job, outPath);//2.3 指定寫出到哪里
53         
54         job.setOutputFormatClass(TextOutputFormat.class);//指定輸出文件的格式化類
55         
56         job.waitForCompletion(true);//把job提交給JobTracker運行
57     }
58     
59     /**
60      * KEYIN    即k1        表示行的偏移量
61      * VALUEIN    即v1        表示行文本內容
62      * KEYOUT    即k2        表示行中出現的單詞
63      * VALUEOUT    即v2        表示行中出現的單詞的次數,固定值1
64      */
65     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
66         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
67         //    final Counter helloCounter = context.getCounter("Sensitive Words", "hello");
68             
69             final String line = v1.toString();
70         /*    if(line.contains("hello")){
71                 //記錄敏感詞出現在一行中
72                 helloCounter.increment(1L);
73             }*/
74             final String[] splited = line.split(" ");
75             for (String word : splited) {
76                 context.write(new Text(word), new LongWritable(1));
77             }
78         };
79     }
80     
81     /**
82      * KEYIN    即k2        表示行中出現的單詞
83      * VALUEIN    即v2        表示行中出現的單詞的次數
84      * KEYOUT    即k3        表示文本中出現的不同單詞
85      * VALUEOUT    即v3        表示文本中出現的不同單詞的總次數
86      *
87      */
88     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
89         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
90             long times = 0L;
91             for (LongWritable count : v2s) {
92                 times += count.get();
93             }
94             ctx.write(k2, new LongWritable(times));
95         };
96     }
97         
98 }
View Code

代碼 1.1

  分析上面代碼,我們會發現該單詞統計方法的輸入輸出路徑都已經寫死了,比如輸入路徑:INPUT_PATH = "hdfs://hadoop:9000/hello"輸出路徑:OUT_PATH = "hdfs://hadoop:9000/out"。這樣一來,這個算法的輸入出路徑也就固定死了,想要使用這個算法,相應的數據就必須滿足這個固定的路徑要求,從而算法的靈活性和可操作性也就大大降低了,也就是說我們的算法,目前還不算是一個通用的算法。所以為了提高算法靈活性和可操作性,應該通過指令運行時參數來指定輸入輸出路徑。

1.2 在命令行運行的單詞統計

  在命令行運行的單詞統計程序,如代碼1.2所示。

  1 package cmd;
  2 
  3 import java.net.URI;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.conf.Configured;
  7 import org.apache.hadoop.fs.FileSystem;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.mapreduce.Job;
 12 import org.apache.hadoop.mapreduce.Mapper;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 19 import org.apache.hadoop.util.Tool;
 20 import org.apache.hadoop.util.ToolRunner;
 21 
 22 public class WordCountApp extends Configured implements Tool{
 23     static String INPUT_PATH = "";
 24     static String OUT_PATH = "";
 25     
 26     @Override
 27     public int run(String[] arg0) throws Exception {
 28         INPUT_PATH = arg0[0];
 29         OUT_PATH = arg0[1];
 30         
 31         Configuration conf = new Configuration();
 32         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
 33         final Path outPath = new Path(OUT_PATH);
 34         if(fileSystem.exists(outPath)){
 35             fileSystem.delete(outPath, true);
 36         }
 37         
 38         final Job job = new Job(conf , WordCountApp.class.getSimpleName());        
 39         
 40         job.setJarByClass(WordCountApp.class);//打包運行必須執行的秘密方法        
 41         FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定讀取的文件位於哪里
 42         job.setInputFormatClass(TextInputFormat.class);//指定如何對輸入文件進行格式化,把輸入文件每一行解析成鍵值對
 43         
 44         job.setMapperClass(MyMapper.class);//1.2 指定自定義的map類        
 45         job.setMapOutputKeyClass(Text.class);//map輸出的<k,v>類型。如果<k3,v3>的類型與<k2,v2>類型一致,則可以省略
 46         job.setMapOutputValueClass(LongWritable.class);
 47         
 48         
 49         job.setPartitionerClass(HashPartitioner.class);//1.3 分區        
 50         job.setNumReduceTasks(1);//有一個reduce任務運行
 51         
 52         
 53         job.setReducerClass(MyReducer.class);//2.2 指定自定義reduce類        
 54         job.setOutputKeyClass(Text.class);//指定reduce的輸出類型
 55         job.setOutputValueClass(LongWritable.class);
 56         
 57         FileOutputFormat.setOutputPath(job, outPath);//2.3 指定寫出到哪里
 58         job.setOutputFormatClass(TextOutputFormat.class);//指定輸出文件的格式化類
 59         
 60         job.waitForCompletion(true);//把job提交給JobTracker運行
 61         return 0;
 62     }
 63     
 64     public static void main(String[] args) throws Exception {
 65         ToolRunner.run(new WordCountApp(), args);
 66     }
 67     
 68     /**
 69      * KEYIN    即k1        表示行的偏移量
 70      * VALUEIN    即v1        表示行文本內容
 71      * KEYOUT    即k2        表示行中出現的單詞
 72      * VALUEOUT    即v2        表示行中出現的單詞的次數,固定值1
 73      */
 74     static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
 75         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
 76             final String[] splited = v1.toString().split("\t");
 77             for (String word : splited) {
 78                 context.write(new Text(word), new LongWritable(1));
 79             }
 80         };
 81     }
 82     
 83     /**
 84      * KEYIN    即k2        表示行中出現的單詞
 85      * VALUEIN    即v2        表示行中出現的單詞的次數
 86      * KEYOUT    即k3        表示文本中出現的不同單詞
 87      * VALUEOUT    即v3        表示文本中出現的不同單詞的總次數
 88      *
 89      */
 90     static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
 91         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
 92             long times = 0L;
 93             for (LongWritable count : v2s) {
 94                 times += count.get();
 95             }
 96             ctx.write(k2, new LongWritable(times));
 97         };
 98     }
 99 
100         
101 }
View Code

代碼 1.2

  在編寫能夠在命令行運行的單詞統計程序時,我們的類要繼承Configured類實現Tool接口,實現Tool接口就要添加一個run()方法。在run()方法中執行我們原來在Main()方法中運行的配置代碼。然而run方法如何運行呢?那就要在Main方法中調用run方法,調用方式如代碼1.3所示。

1 public static void main(String[] args) throws Exception {
2         ToolRunner.run(new WordCountApp(), args);
3     }

代碼 1.3

  我們看一下run方法的參數,ToolRunner.run(Tool tool, String[] args),第一參數為Tool接口,我們知道該程序的類就是Tool的實現類所以我們可以,用該程序類的對象來作為參數。他的第二個參數,是一個字符數組args。在這里我們先講一下,main函數的args參數。這個參數是運行程序前給它的參數。如果你在你程序要用這個參數的話,就需要在運行前指定。比如一個打印helloworld的程序如下:

public class HelloWorld{
    public static void main(String[] args) {
        System.out.println(args[0]);
    }
}

  執行命令java HelloWorld ceshi ceshi1 ceshi2,那么在HelloWorld的main方法里面 args就是{"ceshi", "ceshi1", "ceshi2"},打印的結果就是creshi。

  經過對main方法的分析,我應該就知道了,run方法的第二個參數就應該是main函數的參數,這樣就能夠接受命令行所指定的參數了。那么既然輸入輸出路徑由運行時的命令行的參數指定,那么就不需要在代碼中指定路徑了,所以將INPUT_PATHOUT_PATH初始化為空。然后在run方法中通過由命令行傳過來的參數來進行賦值,如下所示。

INPUT_PATH = arg0[0];//表示輸入路徑
OUT_PATH = arg0[1];//表示輸出路徑

  而為了我們的程序能夠在命令行運行,必須添加“job.setJarByClass(WordCountApp.class);”代碼,表示我們的程序以打包的方式運行。

二、運行方式

2.1 將程序以.jar類型導出到桌面

<1> 選擇WordCountApp右擊選擇Export,如圖2.1所示。

 圖 2.1

<2>  選擇JAR file,選擇Next,如圖2.2所示。

 圖 2.2

<3> 選擇Next后,彈出如下界面,如圖2.3,再次選擇Next。

 圖 2.3

<4> 選擇Next之后,彈出如圖2.4的界面,選擇Browse。

 圖 2.4

<5> 選擇Browse后,在彈出的界面選擇ok,如圖2.5所示。

圖 2.5

<6> 選擇Ok后,直接選擇finish即可,如圖2.6所示。

圖 2.6

2.2 將jar包傳到Linux

  使用WinScp將程序的jar包傳到Linux,如圖2.7所示。

圖 2.7

2.3 在Linux命令行執行jar包

2.3.1 創建輸入輸出路徑

  執行命令:

hadoop fs -mkdir /input
hadoop fs -mkdir /output

2.3.2 編寫、上傳file文件

  執行命令:vi file1

  輸入內容:

        hello    word
        hello    me

  執行命令:hadoop fs -put file1 /

2.3.3 執行程序

執行命令:hadoop jar jar.jar hdfs://hadoop:9000/input  hdfs: //hadoop:9000/output

運行過程:

14/09/28 20:08:08 WARN mapred.JobClient: Use GenericOptionsParser for parsi                                                                                             ng the arguments. Applications should implement Tool for the same.
14/09/28 20:08:09 INFO input.FileInputFormat: Total input paths to process                                                                                              : 1
14/09/28 20:08:09 INFO util.NativeCodeLoader: Loaded the native-hadoop libr                                                                                             ary
14/09/28 20:08:09 WARN snappy.LoadSnappy: Snappy native library not loaded
14/09/28 20:08:11 INFO mapred.JobClient: Running job: job_201409281916_0001
14/09/28 20:08:12 INFO mapred.JobClient:  map 0% reduce 0%
14/09/28 20:09:03 INFO mapred.JobClient:  map 100% reduce 0%
14/09/28 20:09:14 INFO mapred.JobClient:  map 100% reduce 100%
14/09/28 20:09:14 INFO mapred.JobClient: Job complete: job_201409281916_0001
14/09/28 20:09:14 INFO mapred.JobClient: Counters: 29
14/09/28 20:09:14 INFO mapred.JobClient:   Job Counters
14/09/28 20:09:14 INFO mapred.JobClient:     Launched reduce tasks=1
14/09/28 20:09:14 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=47675
14/09/28 20:09:14 INFO mapred.JobClient:     Total time spent by all reduces waiting after rese                                                                         rving slots (ms)=0
14/09/28 20:09:14 INFO mapred.JobClient:     Total time spent by all maps waiting after reservi                                                                         ng slots (ms)=0
14/09/28 20:09:14 INFO mapred.JobClient:     Launched map tasks=1
14/09/28 20:09:14 INFO mapred.JobClient:     Data-local map tasks=1
14/09/28 20:09:14 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=10902
14/09/28 20:09:14 INFO mapred.JobClient:   File Output Format Counters
14/09/28 20:09:14 INFO mapred.JobClient:     Bytes Written=21
14/09/28 20:09:14 INFO mapred.JobClient:   FileSystemCounters
14/09/28 20:09:14 INFO mapred.JobClient:     FILE_BYTES_READ=67
14/09/28 20:09:14 INFO mapred.JobClient:     HDFS_BYTES_READ=116
14/09/28 20:09:14 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=105834
14/09/28 20:09:14 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=21
14/09/28 20:09:14 INFO mapred.JobClient:   File Input Format Counters
14/09/28 20:09:14 INFO mapred.JobClient:     Bytes Read=21
14/09/28 20:09:14 INFO mapred.JobClient:   Map-Reduce Framework
14/09/28 20:09:14 INFO mapred.JobClient:     Map output materialized bytes=67
14/09/28 20:09:14 INFO mapred.JobClient:     Map input records=2
14/09/28 20:09:14 INFO mapred.JobClient:     Reduce shuffle bytes=67
14/09/28 20:09:14 INFO mapred.JobClient:     Spilled Records=8
14/09/28 20:09:14 INFO mapred.JobClient:     Map output bytes=53
14/09/28 20:09:14 INFO mapred.JobClient:     CPU time spent (ms)=35140
14/09/28 20:09:14 INFO mapred.JobClient:     Total committed heap usage (bytes)=131665920
14/09/28 20:09:14 INFO mapred.JobClient:     Combine input records=0
14/09/28 20:09:14 INFO mapred.JobClient:     SPLIT_RAW_BYTES=95
14/09/28 20:09:14 INFO mapred.JobClient:     Reduce input records=4
14/09/28 20:09:14 INFO mapred.JobClient:     Reduce input groups=3
14/09/28 20:09:14 INFO mapred.JobClient:     Combine output records=0
14/09/28 20:09:14 INFO mapred.JobClient:     Physical memory (bytes) snapshot=181952512
14/09/28 20:09:14 INFO mapred.JobClient:     Reduce output records=3
14/09/28 20:09:14 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=752697344
14/09/28 20:09:14 INFO mapred.JobClient:     Map output records=4
View Code

執行結果:

[root@hadoop Downloads]# hadoop fs -ls /output
Found 3 items
-rw-r--r--   1 root supergroup          0 2014-09-28 20:09 /output/_SUCCESS
drwxr-xr-x   - root supergroup          0 2014-09-28 20:08 /output/_logs
-rw-r--r--   1 root supergroup         21 2014-09-28 20:09 /output/part-r-00000
[root@hadoop Downloads]# hadoop fs -cat /output/part-r-00000
hello   2
me      1
world   1
[root@hadoop Downloads]#

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM