1. 環境配置
a) 配置系統環境變量HADOOP_HOME
b) 把hadoop.dll文件放到c:/windows/System32目錄下
c) hadoop-2.6.0\share\hadoop\common\sources目錄下hadoop-common-2.6.0-sources.jar文件中找到org\apache\hadoop\io\nativeio下NativeIO.java文件,復制到對應的Eclipse的project, NativeIO.java文件還要在原來的包名下
d) 修改此文件的557行,替換為return true
e) 在主機中配置虛擬機的IP和用戶名
f) 以管理員身份運行eclipse
2. 代碼(以wordcount為例)
a) MapReduce分Map和Reduce兩部分,加上測試,一共三部分
b) Map里主要解決文件分割的問題;
package com.hadoop.hdfs.api.test.mr.wc;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* KEYIN, VALUEIN, 輸入的key-value數據類型
*
* KEYOUT, VALUEOUT 輸出的key-value數據類型
*
*/
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
/* key:輸入的key值,偏移量
value:輸入的value,一行的內容
*/
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
//獲取一行的內容
String linestr=value.toString();
String[] words=linestr.split(" ");//正則表達式怎么實現這些東西?
for (String word : words) {
//輸出寫
context.write(new Text(word), new LongWritable(1));
}
}
}
c) 接收map的結果,然后整合輸出
package com.hadoop.hdfs.api.test.mr.wc;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* org.apache.hadoop.mapreduce是Hadoop2的api
*/
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
Iterator<LongWritable>iter=values.iterator();
long sum=0;
while(iter.hasNext()){
sum+=iter.next().get();
}
context.write(key, new LongWritable(sum));
}
}
d) 運行文件里只需要配置好路徑即可
package com.hadoop.hdfs.api.test.mr.wc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WCRunner {
public static void main(String[] args) {
Configuration conf =new Configuration();
/* conf.set("hadoop.tmp.dir", "j:/tmp/tmpData");*/
try {
Job job=Job.getInstance(conf);
//指定main方法所在的類
job.setJarByClass(WCRunner.class);
//指定map和reduce類
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
//指定map的輸出key和value的數據類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定reduce的輸出key和value的數據類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定輸入的文件目錄,這里可以是文件,也可以是目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定輸出的文件目錄,這里只能是目錄,不能是文件
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//執行job
job.waitForCompletion(true);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
注:數據類型
Value Text類型 Key LongWritable類型
特別聲明:Text的包需要特別注意
運行前,傳一下路徑
Input是讀文件的路徑,里面的文件就是我們要讀的
Output是reduce生成文件存放的地方
特別聲明:input路徑必須存在,output必須是不存在的
特特特別聲明:用戶名一定要注意,不要有空格!!!
MapReduce原理
MapReduce分為兩大部分,Map(抓取數據、數據分割)和Reduce(處理數據,數據整合,上傳數據)。
從單文件看MapReduce:
1. 從HDFS上讀取一個文件
2. 為本地主機分配一個Map任務
3. Map作業從輸入數據中抽取出鍵值對
4. 每一個鍵值對都作為參數傳遞給map函數,map函數產生的中間鍵值對被緩存在內存中。
5. 上一階段中解析出來的每一個鍵值對,調用一次map方法。如果有1000個鍵值對,就會調用1000次map方法。每一次調用map方法會輸出零個或者多個鍵值對。
6. 為本地主機分配一個Reduce任務
7. Reduce任務讀取Map任務產生的中間值並排序(因為Map任務產生的鍵值對可能映射到不同的分區中,當然本地只有一個分區,所以要排序),排序的目的是使相同鍵的鍵值對聚集在一起。
8. 遍歷排序后中間鍵值對,將具有相同鍵的鍵值對調用一次reduce方法,對每個不同的鍵分別調用一次reduce方法。
9. reduce函數產生的輸出會添加到這個分區的輸出文件中。
從集群上看MapReduce
1. 將HDFS上的文件分塊,如需要輸入的文件為100MB和200MB時,因為塊大小為128MB,所以共分為三塊,塊一:100MB;塊二:128MB;塊三:72MB
每個塊對應一個Map,需要三個Map進程來處理
2. 為集群上空閑的機器分配Map任務,被分配了Map作業的機器,開始讀取對應分片的輸入數據
3. 與單文件過程類似
4. Map產生的中間鍵值對分為N個區保存在本地中,每個區對應一個Reduce任務,將這N個區的位置報告給集群中負責調度的機器,讓其將位置信息轉發給已分配好Reduce任務的機器。
5. 有Reduce任務的機器從剛獲取的地址處,讀取中間鍵值對,然后與單文件類似
6. 所有執行完畢后,MapReduce輸出放在了N個分區的輸出文件中(分別對應一個Reduce作業)。用戶通常並不需要合並這N個文件,而是將其作為輸入交給另一個MapReduce程序處理。整個過程中,輸入數據是來自底層分布式文件系統(GFS)的,中間數據是放在本地文件系統的,最終輸出數據是寫入底層分布式文件系統(GFS)的。
注:關於Map和Reduce之間的數據傳輸過程,MapReduce的核心Shuffle,現在知識有限。只知道它的作用,不知道為什么作用,希望過幾天可以整理一下