一、代碼編寫
1.1 單詞統計
回顧我們以前單詞統計的例子,如代碼1.1所示。

1 package counter; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Counter; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 19 20 public class WordCountApp { 21 static final String INPUT_PATH = "hdfs://hadoop:9000/hello"; 22 static final String OUT_PATH = "hdfs://hadoop:9000/out"; 23 24 public static void main(String[] args) throws Exception { 25 26 Configuration conf = new Configuration(); 27 28 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 29 final Path outPath = new Path(OUT_PATH); 30 31 if(fileSystem.exists(outPath)){ 32 fileSystem.delete(outPath, true); 33 } 34 35 final Job job = new Job(conf , WordCountApp.class.getSimpleName()); 36 37 FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定讀取的文件位於哪里 38 39 job.setInputFormatClass(TextInputFormat.class);//指定如何對輸入文件進行格式化,把輸入文件每一行解析成鍵值對 40 41 job.setMapperClass(MyMapper.class);//1.2 指定自定義的map類 42 job.setMapOutputKeyClass(Text.class);//map輸出的<k,v>類型。如果<k3,v3>的類型與<k2,v2>類型一致,則可以省略 43 job.setMapOutputValueClass(LongWritable.class); 44 45 job.setPartitionerClass(HashPartitioner.class);//1.3 分區 46 job.setNumReduceTasks(1);//有一個reduce任務運行 47 48 job.setReducerClass(MyReducer.class);//2.2 指定自定義reduce類 49 job.setOutputKeyClass(Text.class);//指定reduce的輸出類型 50 job.setOutputValueClass(LongWritable.class); 51 52 FileOutputFormat.setOutputPath(job, outPath);//2.3 指定寫出到哪里 53 54 job.setOutputFormatClass(TextOutputFormat.class);//指定輸出文件的格式化類 55 56 job.waitForCompletion(true);//把job提交給JobTracker運行 57 } 58 59 /** 60 * KEYIN 即k1 表示行的偏移量 61 * VALUEIN 即v1 表示行文本內容 62 * KEYOUT 即k2 表示行中出現的單詞 63 * VALUEOUT 即v2 表示行中出現的單詞的次數,固定值1 64 */ 65 static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 66 protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 67 // final Counter helloCounter = context.getCounter("Sensitive Words", "hello"); 68 69 final String line = v1.toString(); 70 /* if(line.contains("hello")){ 71 //記錄敏感詞出現在一行中 72 helloCounter.increment(1L); 73 }*/ 74 final String[] splited = line.split(" "); 75 for (String word : splited) { 76 context.write(new Text(word), new LongWritable(1)); 77 } 78 }; 79 } 80 81 /** 82 * KEYIN 即k2 表示行中出現的單詞 83 * VALUEIN 即v2 表示行中出現的單詞的次數 84 * KEYOUT 即k3 表示文本中出現的不同單詞 85 * VALUEOUT 即v3 表示文本中出現的不同單詞的總次數 86 * 87 */ 88 static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 89 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 90 long times = 0L; 91 for (LongWritable count : v2s) { 92 times += count.get(); 93 } 94 ctx.write(k2, new LongWritable(times)); 95 }; 96 } 97 98 }
代碼 1.1
分析上面代碼,我們會發現該單詞統計方法的輸入輸出路徑都已經寫死了,比如輸入路徑:INPUT_PATH = "hdfs://hadoop:9000/hello"輸出路徑:OUT_PATH = "hdfs://hadoop:9000/out"。這樣一來,這個算法的輸入出路徑也就固定死了,想要使用這個算法,相應的數據就必須滿足這個固定的路徑要求,從而算法的靈活性和可操作性也就大大降低了,也就是說我們的算法,目前還不算是一個通用的算法。所以為了提高算法靈活性和可操作性,應該通過指令運行時參數來指定輸入輸出路徑。
1.2 在命令行運行的單詞統計
在命令行運行的單詞統計程序,如代碼1.2所示。

1 package cmd; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.conf.Configured; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 19 import org.apache.hadoop.util.Tool; 20 import org.apache.hadoop.util.ToolRunner; 21 22 public class WordCountApp extends Configured implements Tool{ 23 static String INPUT_PATH = ""; 24 static String OUT_PATH = ""; 25 26 @Override 27 public int run(String[] arg0) throws Exception { 28 INPUT_PATH = arg0[0]; 29 OUT_PATH = arg0[1]; 30 31 Configuration conf = new Configuration(); 32 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 33 final Path outPath = new Path(OUT_PATH); 34 if(fileSystem.exists(outPath)){ 35 fileSystem.delete(outPath, true); 36 } 37 38 final Job job = new Job(conf , WordCountApp.class.getSimpleName()); 39 40 job.setJarByClass(WordCountApp.class);//打包運行必須執行的秘密方法 41 FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定讀取的文件位於哪里 42 job.setInputFormatClass(TextInputFormat.class);//指定如何對輸入文件進行格式化,把輸入文件每一行解析成鍵值對 43 44 job.setMapperClass(MyMapper.class);//1.2 指定自定義的map類 45 job.setMapOutputKeyClass(Text.class);//map輸出的<k,v>類型。如果<k3,v3>的類型與<k2,v2>類型一致,則可以省略 46 job.setMapOutputValueClass(LongWritable.class); 47 48 49 job.setPartitionerClass(HashPartitioner.class);//1.3 分區 50 job.setNumReduceTasks(1);//有一個reduce任務運行 51 52 53 job.setReducerClass(MyReducer.class);//2.2 指定自定義reduce類 54 job.setOutputKeyClass(Text.class);//指定reduce的輸出類型 55 job.setOutputValueClass(LongWritable.class); 56 57 FileOutputFormat.setOutputPath(job, outPath);//2.3 指定寫出到哪里 58 job.setOutputFormatClass(TextOutputFormat.class);//指定輸出文件的格式化類 59 60 job.waitForCompletion(true);//把job提交給JobTracker運行 61 return 0; 62 } 63 64 public static void main(String[] args) throws Exception { 65 ToolRunner.run(new WordCountApp(), args); 66 } 67 68 /** 69 * KEYIN 即k1 表示行的偏移量 70 * VALUEIN 即v1 表示行文本內容 71 * KEYOUT 即k2 表示行中出現的單詞 72 * VALUEOUT 即v2 表示行中出現的單詞的次數,固定值1 73 */ 74 static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 75 protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 76 final String[] splited = v1.toString().split("\t"); 77 for (String word : splited) { 78 context.write(new Text(word), new LongWritable(1)); 79 } 80 }; 81 } 82 83 /** 84 * KEYIN 即k2 表示行中出現的單詞 85 * VALUEIN 即v2 表示行中出現的單詞的次數 86 * KEYOUT 即k3 表示文本中出現的不同單詞 87 * VALUEOUT 即v3 表示文本中出現的不同單詞的總次數 88 * 89 */ 90 static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 91 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 92 long times = 0L; 93 for (LongWritable count : v2s) { 94 times += count.get(); 95 } 96 ctx.write(k2, new LongWritable(times)); 97 }; 98 } 99 100 101 }
代碼 1.2
在編寫能夠在命令行運行的單詞統計程序時,我們的類要繼承Configured類實現Tool接口,實現Tool接口就要添加一個run()方法。在run()方法中執行我們原來在Main()方法中運行的配置代碼。然而run方法如何運行呢?那就要在Main方法中調用run方法,調用方式如代碼1.3所示。
1 public static void main(String[] args) throws Exception { 2 ToolRunner.run(new WordCountApp(), args); 3 }
代碼 1.3
我們看一下run方法的參數,ToolRunner.run(Tool tool, String[] args),第一參數為Tool接口,我們知道該程序的類就是Tool的實現類所以我們可以,用該程序類的對象來作為參數。他的第二個參數,是一個字符數組args。在這里我們先講一下,main函數的args參數。這個參數是運行程序前給它的參數。如果你在你程序要用這個參數的話,就需要在運行前指定。比如一個打印helloworld的程序如下:
public class HelloWorld{ public static void main(String[] args) { System.out.println(args[0]); } }
執行命令java HelloWorld ceshi ceshi1 ceshi2,那么在HelloWorld的main方法里面 args就是{"ceshi", "ceshi1", "ceshi2"},打印的結果就是creshi。
經過對main方法的分析,我應該就知道了,run方法的第二個參數就應該是main函數的參數,這樣就能夠接受命令行所指定的參數了。那么既然輸入輸出路徑由運行時的命令行的參數指定,那么就不需要在代碼中指定路徑了,所以將INPUT_PATH和OUT_PATH初始化為空。然后在run方法中通過由命令行傳過來的參數來進行賦值,如下所示。
INPUT_PATH = arg0[0];//表示輸入路徑
OUT_PATH = arg0[1];//表示輸出路徑
而為了我們的程序能夠在命令行運行,必須添加“job.setJarByClass(WordCountApp.class);”代碼,表示我們的程序以打包的方式運行。
二、運行方式
2.1 將程序以.jar類型導出到桌面
<1> 選擇WordCountApp右擊選擇Export,如圖2.1所示。
圖 2.1
<2> 選擇JAR file,選擇Next,如圖2.2所示。
圖 2.2
<3> 選擇Next后,彈出如下界面,如圖2.3,再次選擇Next。
圖 2.3
<4> 選擇Next之后,彈出如圖2.4的界面,選擇Browse。
圖 2.4
<5> 選擇Browse后,在彈出的界面選擇ok,如圖2.5所示。
圖 2.5
<6> 選擇Ok后,直接選擇finish即可,如圖2.6所示。
圖 2.6
2.2 將jar包傳到Linux
使用WinScp將程序的jar包傳到Linux,如圖2.7所示。
圖 2.7
2.3 在Linux命令行執行jar包
2.3.1 創建輸入輸出路徑
執行命令:
hadoop fs -mkdir /input
hadoop fs -mkdir /output
2.3.2 編寫、上傳file文件
執行命令:vi file1
輸入內容:
hello word
hello me
執行命令:hadoop fs -put file1 /
2.3.3 執行程序
執行命令:hadoop jar jar.jar hdfs://hadoop:9000/input hdfs: //hadoop:9000/output
運行過程:

14/09/28 20:08:08 WARN mapred.JobClient: Use GenericOptionsParser for parsi ng the arguments. Applications should implement Tool for the same. 14/09/28 20:08:09 INFO input.FileInputFormat: Total input paths to process : 1 14/09/28 20:08:09 INFO util.NativeCodeLoader: Loaded the native-hadoop libr ary 14/09/28 20:08:09 WARN snappy.LoadSnappy: Snappy native library not loaded 14/09/28 20:08:11 INFO mapred.JobClient: Running job: job_201409281916_0001 14/09/28 20:08:12 INFO mapred.JobClient: map 0% reduce 0% 14/09/28 20:09:03 INFO mapred.JobClient: map 100% reduce 0% 14/09/28 20:09:14 INFO mapred.JobClient: map 100% reduce 100% 14/09/28 20:09:14 INFO mapred.JobClient: Job complete: job_201409281916_0001 14/09/28 20:09:14 INFO mapred.JobClient: Counters: 29 14/09/28 20:09:14 INFO mapred.JobClient: Job Counters 14/09/28 20:09:14 INFO mapred.JobClient: Launched reduce tasks=1 14/09/28 20:09:14 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=47675 14/09/28 20:09:14 INFO mapred.JobClient: Total time spent by all reduces waiting after rese rving slots (ms)=0 14/09/28 20:09:14 INFO mapred.JobClient: Total time spent by all maps waiting after reservi ng slots (ms)=0 14/09/28 20:09:14 INFO mapred.JobClient: Launched map tasks=1 14/09/28 20:09:14 INFO mapred.JobClient: Data-local map tasks=1 14/09/28 20:09:14 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=10902 14/09/28 20:09:14 INFO mapred.JobClient: File Output Format Counters 14/09/28 20:09:14 INFO mapred.JobClient: Bytes Written=21 14/09/28 20:09:14 INFO mapred.JobClient: FileSystemCounters 14/09/28 20:09:14 INFO mapred.JobClient: FILE_BYTES_READ=67 14/09/28 20:09:14 INFO mapred.JobClient: HDFS_BYTES_READ=116 14/09/28 20:09:14 INFO mapred.JobClient: FILE_BYTES_WRITTEN=105834 14/09/28 20:09:14 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=21 14/09/28 20:09:14 INFO mapred.JobClient: File Input Format Counters 14/09/28 20:09:14 INFO mapred.JobClient: Bytes Read=21 14/09/28 20:09:14 INFO mapred.JobClient: Map-Reduce Framework 14/09/28 20:09:14 INFO mapred.JobClient: Map output materialized bytes=67 14/09/28 20:09:14 INFO mapred.JobClient: Map input records=2 14/09/28 20:09:14 INFO mapred.JobClient: Reduce shuffle bytes=67 14/09/28 20:09:14 INFO mapred.JobClient: Spilled Records=8 14/09/28 20:09:14 INFO mapred.JobClient: Map output bytes=53 14/09/28 20:09:14 INFO mapred.JobClient: CPU time spent (ms)=35140 14/09/28 20:09:14 INFO mapred.JobClient: Total committed heap usage (bytes)=131665920 14/09/28 20:09:14 INFO mapred.JobClient: Combine input records=0 14/09/28 20:09:14 INFO mapred.JobClient: SPLIT_RAW_BYTES=95 14/09/28 20:09:14 INFO mapred.JobClient: Reduce input records=4 14/09/28 20:09:14 INFO mapred.JobClient: Reduce input groups=3 14/09/28 20:09:14 INFO mapred.JobClient: Combine output records=0 14/09/28 20:09:14 INFO mapred.JobClient: Physical memory (bytes) snapshot=181952512 14/09/28 20:09:14 INFO mapred.JobClient: Reduce output records=3 14/09/28 20:09:14 INFO mapred.JobClient: Virtual memory (bytes) snapshot=752697344 14/09/28 20:09:14 INFO mapred.JobClient: Map output records=4
執行結果:
[root@hadoop Downloads]# hadoop fs -ls /output Found 3 items -rw-r--r-- 1 root supergroup 0 2014-09-28 20:09 /output/_SUCCESS drwxr-xr-x - root supergroup 0 2014-09-28 20:08 /output/_logs -rw-r--r-- 1 root supergroup 21 2014-09-28 20:09 /output/part-r-00000 [root@hadoop Downloads]# hadoop fs -cat /output/part-r-00000 hello 2 me 1 world 1 [root@hadoop Downloads]#