摘要:
一個WordCount單詞統計程序為實例,詳細演示了如何編寫MapReduce程序代碼以及如何打包運行程序。
參考資料:
Api 文檔地址:http://hadoop.apache.org/docs/current/api/index.html
maven資源庫:https://mvnrepository.com/repos/central 用於配置pom的時候查詢資源
1.創建maven項目
創建maven項目,項目名hdfs ##這里我用的文章“java操作hdfs”的項目hdfs
pom.xml文件: //與文章“java操作hdfs”的項目一樣。
2.編寫WordCount類
在該項目包com.scitc.hdfs中新建WordCount.java類,代碼如下:
package com.scitc.hdfs; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { /** * @author he *參數LongWritable:讀取行的偏移量,知道下一行從哪兒開始讀取 *Text:正在讀取的行的內容 *后兩個參數構成<k,v>輸出類型 例如<hadoop,1> */ static class Maps extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); //等價於java中 int one = 1; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 將讀入的每行數據按空格切分 String[] dataArr = value.toString().split(" "); if(dataArr.length>0){ // 將每個單詞作為map的key的value設置為1 for (String word : dataArr) { context.write(new Text(word), one); //context是mapreduce的上下文 } } } } /** * @author he *前兩個參數構成<text,IntWritable>,正式map的輸出,例如<hadoop,1>,<spark,1>,<hadoop,1>... */ static class Reduces extends Reducer<Text, IntWritable, Text, IntWritable> { /** * 參數key:例如詞頻統計,單詞hadoop * 參數values:是一個迭代器,例如<hadoop,<1,1>>,<spark,1> */ @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); //獲取IntWritable的值,這里實際都是1 } IntWritable result = new IntWritable(); result.set(sum); context.write(key, result); } } public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { // 1:實例化Configuration類、新建一個任務 Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word-count"); //名字可以自己取 //2:設置jar加載的路徑,這個測試正確 job.setJarByClass(WordCount.class); //3:設置Map類和reduce類 job.setMapperClass(Maps.class); job.setReducerClass(Reduces.class); //4:設置Map輸出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5:設置最終輸出kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6:設置輸入和輸出路徑 //方法1:直接提供輸入、輸出默認參數 //輸入文件地址,可以是本地,或者hdfs地址;
//輸出文件地址可以本地文件夾或hdfs文件夾,wordcount不能提前在hdfs中創建 String inputPath = "hdfs://master:9000/he/input/wordcount.txt"; String outPath = "hdfs://master:9000/he/output/wordcount/"; // 如果有傳入文件地址,接收參數為輸入文件地址 if(args != null && args.length > 0){ inputPath = args[0]; } FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outPath)); // 方法2:執行jar時,外部提供輸入、輸出參數 //FileInputFormat.addInputPath(job, new Path(args[0])); //FileOutputFormat.setOutputPath(job, new Path(args[1])); //7:提交任務 boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }
3:HDFS准備工作
1.啟動hadoop集群
2.在hdfs中新建/he/input/ 目錄,從本地導入wordcount.txt文件到該目錄下
新建/he/output/ 目錄
##說明:代碼中輸出路徑/he/output/wordcount/中wordcount文件夾不能提前創建,代碼執行的時候自己會創建。
4:maven項目打包和運行
1.打包
項目名hdfs上右鍵>>Run As>>Maven install
2.上傳
項目根目錄下的target文件夾中找到hdfs-0.0.1-SNAPSHOT.jar,改文件名為hdfs.jar,上傳到master的/opt/data/目錄中
3.用hadoop jar 命令運行hd.jar包
cd /opt/data
hadoop jar hdfs.jar com.scitc.hdfs.WordCount ##命令語法:hadoop jar jar包 類的全名稱
5:查看結果
方法1:通過hdfs shell 查看結果
方法2:eclipse中查看
方法3:http://master:50070/explorer.html#/he/output/wordcount 網頁查看