Hadoop入門教程:Java API實現,對Java程序員來講,直接調用Hadoop的Java API來實現是最為方便的,要使用Java API至少需要實現三個重要組件:Map類、Reduce類、驅動Driver。下面將具體實現Java API的詞頻統計程序。
(1)實現Map類:WordcountMapper.java,核心代碼如下:
import
java.io.IOException;
import
java.util.StringTokenizer;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.Hadoop.MapReduce.Mapper;
public
class
WordcountMapper
extends
Mapper<object, intwritable=
""
>{
private
f?inal
static
IntWritable one =
new
IntWritable(
1
);
private
Text word =
new
Text();
public
void
map(Object key, Text value, Context context)
throws
IOException, InterruptedException {
StringTokenizer itr =
new
StringTokenizer(value.toString());
while
(itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
</object,>
首先要實現Map需要繼承Hadoop的Mapper類,至少需要實現其中的map方法,其中Mapper中的map方法通過指定的輸入文件格式一次處理一行,value就是map函數接收到的輸入行,然后通過StringTokenizer以空格為分隔符將一行切分為若干tokens,之后,輸出形式的鍵值對並將它寫入org.apache.hadoop.mapred.OutputCollector中。為了更加清晰地認識Map階段的處理,我們假設有三個文本a、b、c,使用上述實現的處理流程如圖2-5所示。
從圖2-5中可以看到對於文件A的輸入,相應的Map處理之后還會進行sort,最終Map輸出如下:
<Hello,
1
>
<nuoline,
1
>
<nuoline,
1
>
<Welcome,
1
>
對於文件B,執行相應的sort之后最終Map輸出如下:
<hadoop,
1
>
<hadoop,
1
>
<Hello,
1
>
<Welcome,
1
>
對於文件C,執行相應的sort之后最終Map輸出如下:
<cloud,
1
>
<cloud,
1
>
<Hello,
1
>
<Welcome,
1
>
<welcome,
1
></welcome,
1
>

(2)實現Reduce類:WordcountReducer.java,核心代碼如下:
import
java.io.IOException;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Reducer;
public
class
WordcountReducer
extends
Reducer<text,intwritable,text,intwritable> {
private
IntWritable result =
new
IntWritable();
public
void
reduce(Text key, Iterable<intwritable> values,
Context context)
throws
IOException, InterruptedException {
int
sum =
0
;
for
(IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
</intwritable></text,intwritable,text,intwritable>
實現WordcountReducer類需要繼承Reducer,至少需要實現其中的reduce方法,輸入參數中的key和values是由Map任務輸出的中間結果,values是一個Iterator,遍歷這個Iterator就可以得到屬於同一個key的所有value。此處key是一個單詞,values是詞頻。只需要將所有的values相加,就可以得到這個單詞總的出現次數。
對於圖2-5的Map輸出,Reduce處理的示意圖如圖2-6所示。

從圖2-6中可以看出,Reduce的輸入就是Map的輸出,然后會進行sort group,將Reduce的輸入變為>的形式,接着Hadoop框架會使用用戶指定的Reduce類處理數據,並最終輸出。當然用戶還可以指定combiner,每次Map運行之后,會按照key對輸出進行排序,然后把輸出傳遞給本地的combiner(可以指定和Reducer一樣),進行本地聚合。運行combiner能減少數據的通信量並降低Reduce的負載。
(3)實現運行驅動
運行驅動的目的就是在程序中指定用戶的Map類和Reduce類,並配置提交給Hadoop時的相關參數。例如實現一個詞頻統計的wordcount驅動類:MyWordCount.java,其核心代碼如下:
import
org.apache.hadoop.conf.Conf?iguration;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public
class
MyWordCount {
public
static
void
main(String[] args)
throws
Exception {
Conf?iguration conf =
new
Conf?iguration();
Job job =
new
Job(conf,
"word count"
);
job.setJarByClass(MyWordCount.
class
);
job.setMapperClass(WordcountMapper.
class
);
job.setCombinerClass(WordcountReducer.
class
);
job.setReducerClass(WordcountReducer.
class
);
job.setOutputKeyClass(Text.
class
);
job.setOutputValueClass(IntWritable.
class
);
FileInputFormat.addInputPath(job,
new
Path(args[
0
]));
FileOutputFormat.setOutputPath(job,
new
Path(args[
1
]));
System.exit(job.waitForCompletion(
true
) ?
0
:
1
);
}
}
從上述核心代碼中可以看出,需要在main函數中設置輸入/輸出路徑的參數,同時為了提交作業,需要job對象,並在job對象中指定作業名稱、Map類、Reduce類,以及鍵值的類型等參數。來源:CUUG官網