上一篇博文如何在Eclipse下搭建Hadoop開發環境,今天給大家介紹一下如何分別分別在Eclipse和Hadoop集群上運行我們的MapReduce程序!
1. 在Eclipse環境下運行MapReduce程序(WordCount程序)
首先看一下我的項目結構和WordCount程序:
其中word.txt將作為我們測試的輸入文件,內容如下:
程序代碼如下所示:
1 package com.hadoop.WordCount; 2
3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 17
18 @SuppressWarnings("unused") 19 public class WordCount {
20
21 public static class TokenizerMapper extends
22 Mapper<Object, Text, Text, IntWritable>
23 // 為什么這里k1要用Object、Text、IntWritable等,而不是java的string啊、int啊類型,當然,你可以用其他的,這樣用的好處是,因為它里面實現了序列化和反序列化。 24 // 可以讓在節點間傳輸和通信效率更高。這就為什么hadoop本身的機制類型的誕生。 25
26
27 //這個Mapper類是一個泛型類型,它有四個形參類型,分別指定map函數的輸入鍵、輸入值、輸出鍵、輸出值的類型。hadoop沒有直接使用Java內嵌的類型,而是自己開發了一套可以優化網絡序列化傳輸的基本類型。 28 //這些類型都在org.apache.hadoop.io包中。 29 //比如這個例子中的Object類型,適用於字段需要使用多種類型的時候,Text類型相當於Java中的String類型,IntWritable類型相當於Java中的Integer類型
30 { 31 //定義兩個變量或者說是定義兩個對象,叫法都可以
32 private final static IntWritable one = new IntWritable(1);//這個1表示每個單詞出現一次,map的輸出value就是1. 33 //因為,v1是單詞出現次數,直接對one賦值為1
34 private Text word = new Text(); 35
36 public void map(Object key, Text value, Context context) 37 //context它是mapper的一個內部類,簡單的說頂級接口是為了在map或是reduce任務中跟蹤task的狀態,很自然的MapContext就是記錄了map執行的上下文,在mapper類中,這個context可以存儲一些job conf的信息,比如job運行時參數等, 38 //我們可以在map函數中處理這個信息,這也是Hadoop中參數傳遞中一個很經典的例子,同時context作為了map和reduce執行中各個函數的一個橋梁,這個設計和Java web中的session對象、application對象很相似 39 //簡單的說context對象保存了作業運行的上下文信息,比如:作業配置信息、InputSplit信息、任務ID等 40 //我們這里最直觀的就是主要用到context的write方法。 41 //說白了,context起到的是連接map和reduce的橋梁。起到上下文的作用!
42
43 throws IOException, InterruptedException { 44 //The tokenizer uses the default delimiter set, which is " \t\n\r": the space character, the tab character, the newline character, the carriage-return character
45 StringTokenizer itr = new StringTokenizer(value.toString());//將Text類型的value轉化成字符串類型 46
47 //使用StringTokenizer類將字符串“hello,java,delphi,asp,PHP”分解為單個單詞 48 // 程序的運行結果為: 49 // hello 50 // java 51 // delphi 52 // asp 53 // php
54
55
56 while (itr.hasMoreTokens()) { 57 // 實際上就是java.util.StringTokenizer.hasMoreTokens() 58 // hasMoreTokens() 方法是用來測試是否有此標記生成器的字符串可用更多的標記。
59 word.set(itr.nextToken()); 60 context.write(word, one); 61 } 62 } 63 } 64
65 public static class IntSumReducer extends
66 Reducer<Text, IntWritable, Text, IntWritable> { 67 private IntWritable result = new IntWritable(); 68 public void reduce(Text key, Iterable<IntWritable> values, 69 Context context) throws IOException, InterruptedException { 70 //我們這里最直觀的就是主要用到context的write方法。 71 //說白了,context起到的是連接map和reduce的橋梁。起到上下文的作用!
72
73 int sum = 0; 74 for (IntWritable val : values) {//叫做增強的for循環,也叫for星型循環
75 sum += val.get(); 76 } 77 result.set(sum); 78 context.write(key, result); 79 } 80 } 81
82 public static void main(String[] args) throws Exception { 83 Configuration conf = new Configuration();//程序里,只需寫這么一句話,就會加載到hadoop的配置文件了 84 //Configuration類代表作業的配置,該類會加載mapred-site.xml、hdfs-site.xml、core-site.xml等配置文件。 85
86 //刪除已經存在的輸出目錄
87 Path mypath = new Path("hdfs://Centpy:9000/test/wordcount-out");//輸出路徑
88 FileSystem hdfs = mypath.getFileSystem(conf);//程序里,只需寫這么一句話,就可以獲取到文件系統了。 89 //FileSystem里面包括很多系統,不局限於hdfs,是因為,程序讀到conf,哦,原來是hadoop集群啊。這時,才認知到是hdfs 90
91 //如果文件系統中存在這個輸出路徑,則刪除掉,保證輸出目錄不能提前存在。
92 if (hdfs.isDirectory(mypath)) { 93 hdfs.delete(mypath, true); 94 } 95
96 //job對象指定了作業執行規范,可以用它來控制整個作業的運行。
97 Job job = Job.getInstance();// new Job(conf, "word count");
98 job.setJarByClass(WordCount.class);//我們在hadoop集群上運行作業的時候,要把代碼打包成一個jar文件,然后把這個文件 99 //傳到集群上,然后通過命令來執行這個作業,但是命令中不必指定JAR文件的名稱,在這條命令中通過job對象的setJarByClass()中傳遞一個主類就行,hadoop會通過這個主類來查找包含它的JAR文件。
100
101 job.setMapperClass(TokenizerMapper.class); 102 //job.setReducerClass(IntSumReducer.class);
103 job.setCombinerClass(IntSumReducer.class);//Combiner最終不能影響reduce輸出的結果 104 // 這句話要好好理解!!!
105
106
107
108 job.setOutputKeyClass(Text.class); 109 job.setOutputValueClass(IntWritable.class); 110 //一般情況下mapper和reducer的輸出的數據類型是一樣的,所以我們用上面兩條命令就行,如果不一樣,我們就可以用下面兩條命令單獨指定mapper的輸出key、value的數據類型 111 //job.setMapOutputKeyClass(Text.class); 112 //job.setMapOutputValueClass(IntWritable.class); 113 //hadoop默認的是TextInputFormat和TextOutputFormat,所以說我們這里可以不用配置。 114 //job.setInputFormatClass(TextInputFormat.class); 115 //job.setOutputFormatClass(TextOutputFormat.class);
116
117 FileInputFormat.addInputPath(job, new Path( 118 "hdfs://Centpy:9000/test/word.txt"));//FileInputFormat.addInputPath()指定的這個路徑可以是單個文件、一個目錄或符合特定文件模式的一系列文件。 119 //從方法名稱可以看出,可以通過多次調用這個方法來實現多路徑的輸入。
120 FileOutputFormat.setOutputPath(job, new Path( 121 "hdfs://Centpy:9000/hdfsOutput"));//只能有一個輸出路徑,該路徑指定的就是reduce函數輸出文件的寫入目錄。 122 //特別注意:輸出目錄不能提前存在,否則hadoop會報錯並拒絕執行作業,這樣做的目的是防止數據丟失,因為長時間運行的作業如果結果被意外覆蓋掉,那肯定不是我們想要的
123 System.exit(job.waitForCompletion(true) ? 0 : 1); 124 //使用job.waitForCompletion()提交作業並等待執行完成,該方法返回一個boolean值,表示執行成功或者失敗,這個布爾值被轉換成程序退出代碼0或1,該布爾參數還是一個詳細標識,所以作業會把進度寫到控制台。 125 //waitForCompletion()提交作業后,每秒會輪詢作業的進度,如果發現和上次報告后有改變,就把進度報告到控制台,作業完成后,如果成功就顯示作業計數器,如果失敗則把導致作業失敗的錯誤輸出到控制台
126 } 127 } 128
129 //TextInputFormat是hadoop默認的輸入格式,這個類繼承自FileInputFormat,使用這種輸入格式,每個文件都會單獨作為Map的輸入,每行數據都會生成一條記錄,每條記錄會表示成<key,value>的形式。 130 //key的值是每條數據記錄在數據分片中的字節偏移量,數據類型是LongWritable. 131 //value的值為每行的內容,數據類型為Text。 132 //
133 //實際上InputFormat()是用來生成可供Map處理的<key,value>的。 134 //InputSplit是hadoop中用來把輸入數據傳送給每個單獨的Map(也就是我們常說的一個split對應一個Map), 135 //InputSplit存儲的並非數據本身,而是一個分片長度和一個記錄數據位置的數組。 136 //生成InputSplit的方法可以通過InputFormat()來設置。 137 //當數據傳給Map時,Map會將輸入分片傳送給InputFormat(),InputFormat()則調用getRecordReader()生成RecordReader,RecordReader則再通過creatKey()和creatValue()創建可供Map處理的<key,value>對。 138 //
139 //OutputFormat() 140 //默認的輸出格式為TextOutputFormat。它和默認輸入格式類似,會將每條記錄以一行的形式存入文本文件。它的鍵和值可以是任意形式的,因為程序內部會調用toString()將鍵和值轉化為String類型再輸出。
接下來我們右鍵選擇“Run As --> Java Aplication / Run on Hadoop”運行程序,控制台情況如下:
然后,右鍵我們的DFS Locations,執行刷新命令,此時運行結果也出現在目錄里了,如下所示:
具體內容如下:
顯然這是正確的結果。
2. 在Hadoop集群上運行MapReduce程序(WordCount程序)
在導出為jar包之前需要對代碼進行一點小小的修改。在Eclipse環境下運行時我們需要將輸入輸出目錄寫死才能正常執行,但在Hadoop上執行時需要更靈活的方法,所以使用數組(args[])來代替,在執行運行命令時再指定路徑!(這一點很重要,否則會報錯而無法運行)。
接下來,我們需要將我們的項目導出為Jar包。
然后使用rz命令(如果沒有安裝請使用命令進行安裝,-y表示直接確認,在后面的安裝過程可以不用再等待確認)上傳到Hadoop目錄下。
yum -y install lrzsz
上傳之后我們輸入以下命令來運行我們的MapReduce程序。
hadoop jar WordCount.jar com.hadoop.WordCount.WordCount /test /hdfsOutput
其中,.jar文件為我們上傳的程序,第二個紅框里的路徑為Eclipse下的類及完整路徑(包路徑+類名),運行過程如下所示:
現在輸入以下命令以查看輸出路徑及結果文件:
hadoop fs -ls /hdfsOutput hadoop fs -cat /hdfsOutput/part-r-00000
顯然,結果和在Eclipse環境下的一致。
本文內容就到此結束了,希望大家通過閱讀本文之后能夠明白在這兩個不同環境下運行MapReduce程序的具體流程、區別以及注意事項。
以上就是博主為大家介紹的這一板塊的主要內容,這都是博主自己的學習過程,希望能給大家帶來一定的指導作用,有用的還望大家點個支持,如果對你沒用也望包涵,有錯誤煩請指出。如有期待可關注博主以第一時間獲取更新哦,謝謝!
版權聲明:本文為博主原創文章,未經博主允許不得轉載。