在配置Hadoop集群成功后,利用官方自帶的例子簡單測試了一下MapReduce程序WordCount,這個例子也就相當於編程入門的HelloWorld程序了,結構清晰容易理解,並且方便說明MapReduce的工作過程。這篇隨筆主要想記錄下在Eclipse中編寫簡單的MapReduce程序的上手過程。原創代碼的內容不會很多,更多的是參考和借鑒現有的優秀代碼。
一、Hello 'MapReduce' World——WordCount程序
1、在Eclipse中建立Java項目WordCount
2、導入相關包(可以在Eclispe中為這三個包建立User Library以便使用)
①commons-cli-1.2.jar
②hadoop-common-2.7.3.jar
③hadoop-mapreduce-client-core-2.7.3.jar
3、配置好Build Path,確保項目中引入了上述三個包
4、新建包名為zmt.test,在其下建立新的Class名為WordCount,並鍵入官方源碼
package zmt.test; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int sum = 0; for (IntWritable val : values){ sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length < 2){ System.err.println("用法:wordcount <int> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i=0; i<otherArgs.length-1; ++i){ FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1])); System.exit(job.waitForCompletion(true)?0:1); } }
5、項目右鍵導出為jar文件,命名為WC.jar
6、將WC.jar復制到虛擬機Master主機中,虛擬機安裝了 VMWare Tools可以直接拖拽進行復制。此處復制到/home/admin/Documents/
7、准備好待統計詞頻的文本文件,此處沿用之前搭建Hadoop時的README.txt。
上傳文件至Hadoop:hdfs dfs -put README.txt /data/input
8、執行任務命令
hadoop jar /home/admin/Documents/WC.jar zmt.test.WordCount /data/input/WC /data/output/WC
需要關注的是入口類的路徑zmt.test.WordCount,在更復雜的任務開發中需要指明MapReduce程序入口
9、查看結果,命令行中會直接給出結果,也可以去/data/output/WC/part-r-00000查看文件內容
10、任務跟蹤,查看MapReduce程序運行情況
http://192.168.222.134:8088/cluster
二、TopN問題——找到前N個數
TopN問題也是入門的一個很好的例子,可以更好地理解MapReduce程序的工作流程,更重要的是了解程序中哪些是模式,是可以更改的,是可以不這么寫的。
與WordCount重復的步驟就不再描述,直接給出關鍵代碼和操作。
1、生成隨機數
import java.io.BufferedWriter; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileWriter; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingException; import java.util.Random; public class Num_Generator { public static void main(String[] args) { FileOutputStream fos; OutputStreamWriter osw; BufferedWriter bw; Random random = new Random(); String filename = "random_num"; for (int i = 0; i < 10; i++) { String tmp_name = filename+""+i+".txt"; File file = new File(tmp_name); try { fos = new FileOutputStream(file); osw = new OutputStreamWriter(fos,"UTF-8"); bw = new BufferedWriter(osw); for (int j = 0; j < 1000000; j++) { int rm = random.nextInt(); bw.write(rm+""); bw.newLine(); } bw.flush(); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(i+":Complete."); } } }
該程序生成了10個文件,每個文件包括一百萬個Integer范圍的隨機數,生成完成后將其復制並上傳到虛擬機的Hadoop文件系統HDFS中
2、TopN程序編寫(該程序是參考另一篇博客的,很慚愧,鏈接忘了(;′⌒`))
import java.io.IOException; import java.util.Iterator; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class TopN { public static class MyMapper extends Mapper<Object, Text, NullWritable, IntWritable> { private TreeMap<Integer, Integer> tree = new TreeMap<Integer, Integer>(); // private final static IntWritable one = new IntWritable(1); // private Text number = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub // super.setup(context); System.out.println("Mapper("+context.getConfiguration().getInt("N", 10)+"):in setup..."); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub // super.cleanup(context); System.out.println("Mapper("+context.getConfiguration().getInt("N", 10)+"):in cleanup..."); for(Integer text : tree.values()){ context.write(NullWritable.get(), new IntWritable(text)); } } @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ String key_num = value.toString(); int num = Integer.parseInt(key_num); tree.put(num, num); if(tree.size() > context.getConfiguration().getInt("N", 10)) tree.remove(tree.firstKey()); // System.out.println("Mapper("+context.getConfiguration().getInt("N", 10)+"):"+key.toString()+"/"+value.toString()); // number.set(key_num); // context.write(number, one); } } public static class MyReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable> { // private IntWritable kk = new IntWritable(); private TreeMap<Integer, Integer> tree = new TreeMap<Integer, Integer>(); // private IntWritable result = new IntWritable(); @Override public void reduce(NullWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ for (IntWritable value : values){ tree.put(value.get(), value.get()); if(tree.size() > context.getConfiguration().getInt("N", 10)) { tree.remove(tree.firstKey()); } } // System.out.println("Reducer("+context.getConfiguration().getInt("N", 10)+"):"+key.toString()+"/"+result.get()); } @Override protected void cleanup( org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub // super.cleanup(context); for(Integer val : tree.descendingKeySet()){ context.write(NullWritable.get(), new IntWritable(val)); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length < 3){ System.err.println("heheda"); System.exit(2); } conf.setInt("N", new Integer(otherArgs[0])); System.out.println("N:"+otherArgs[0]); Job job = Job.getInstance(conf, "TopN"); job.setJarByClass(TopN.class); job.setMapperClass(MyMapper.class); // job.setCombinerClass(MyReducer.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(IntWritable.class); for (int i = 1; i < otherArgs.length-1; i++) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
3、運行測試,需要輸入參數N
hadoop jar /home/hadoop/hadoop-2.7.3/share/hadoop/mapreduce/TopN.jar TopN 12 /data/input/test1 /data/output/TT
4、查看結果
hdfs dfs -cat /data/output/TT/part-r-00000
[root@Master myscript]# hdfs dfs -cat /data/output/TT/part-r-00000 2147483194 2147483070 2147483066 2147482879 2147482835 2147482469 2147482152 2147481212 2147481174 2147480379 2147479927 2147479795