Hadoop—MapReduce計算氣象溫度
1 運行環境說明
1.1 硬軟件環境
- 主機操作系統:Mac OS 64 bit ,8G內存
- 虛擬軟件:Parallers Desktop12
- 虛擬機操作系統:CentOS 64位,單核,512內存
- JDK:java version "1.7.0_45"
- Hadoop:1.1.2
1.2 機器網絡環境
集群包含三個節點:1個namenode、2個datanode,其中節點之間可以相互ping通。節點IP地址和主機名分布如下:
序號 | IP地址 | 機器名 | 類型 | 用戶名 | 運行進程 |
---|---|---|---|---|---|
1 | 192.168.33.200 | Master | 名稱節點 | haha | NN、SNN、JobTracer |
2 | 192.168.33.201 | Slave1 | 數據節點 | haha | DN、TaskTracer |
3 | 192.168.33.202 | Slave2 | 數據節點 | haha | DN、TaskTracer |
4 | 192.168.33.203 | Slave3 | 數據節點 | haha | DN、TaskTracer |
所有節點均是CentOS6.5 64bit系統,防火牆均禁用,所有節點上均創建了一個haha用戶,用戶主目錄是/home/haha。
2 使用MapReduce求每年最低溫度
2.1 內容
下載氣象數據集部分數據,寫一個Map-Reduce作業,求每年的最低溫度,部署並運行之.
分析Map-Reduce過程
Map-Reduce編程模型
2.1.1 Map-reduce的思想就是“分而治之”
-
Mapper
Mapper負責“分”,即把復雜的任務分解為若干個“簡單的任務”執行
“簡單的任務”有幾個含義:- 1 數據戒計算規模相對於原任務要大大縮小;
- 2 就近計算 ,即會被分配到存放了所需數據的節點進行計算;
- 3 這些小任務可以幵行計算,彼此間幾乎沒有依賴關系
-
Reducer
對map階段的結果進行匯總
- Reducer的數目由mapred-site.xml配置文件里的項目mapred.reduce.tasks決定。缺 省值為1,用戶可以覆蓋之
2.2 運行代碼
2.2.1 MinTemperature
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
publicclass MinTemperature {
public staticvoid main(String[] args) throws Exception {
if(args.length != 2) {
System.err.println("Usage: MinTemperature<input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MinTemperature.class);
job.setJobName("Min temperature");
//new Path(args[0])控制台的第一個參數--輸入路徑
FileInputFormat.addInputPath(job, new Path(args[0]));
//new Path(args[1])控制台的第二個參數--輸出路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//指定Mapper是哪個類
job.setMapperClass(MinTemperatureMapper.class);
//指定Reducer是哪個類
job.setReducerClass(MinTemperatureReducer.class);
//指定輸出的key和value是什么
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
2.2.2 MinTemperatureMapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if(line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if(airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
2.2.3 MinTemperatureReducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int minValue = Integer.MAX_VALUE;
for(IntWritable value : values) {
minValue = Math.min(minValue, value.get());
}
context.write(key, new IntWritable(minValue));
}
}
2.3 實現過程
2.3.1 編寫代碼
進入/home/haha/hadoop-1.1.2/myclass目錄,在該目錄中建立MinTemperature.Java
、MinTemperatureMapper.java
和MinTemperatureReducer.java
代碼文件,代碼內容為2.2所示,執行命令如下:
[haha@Master ~]$cd /home/haha/hadoop-1.1.2/myclass/
[haha@Master myclass]$vi MinTemperature.java
[haha@Master myclass]$vi MinTemperatureMapper.java
[haha@Master myclass]$vi MinTemperatureReducer.java
MinTemperature.java
MinTemperatureMapper.java
MinTemperatureReducer.java
2.3.2編譯代碼
在/home/haha/hadoop-1.1.2/myclass目錄中,使用如下命令對java代碼進行編譯,為保證編譯成功,加入classpath變量,引入hadoop-core-1.1.2.jar包:
[haha@Master myclass]$javac -classpath ../hadoop-core-1.1.2.jar *.java
[haha@Master myclass]$ls
[haha@Master myclass]$mv *.jar
[haha@Master myclass]$rm *.class
2.3.4創建目錄
進入/home/haha/hadoop-1.1.2/bin目錄,在HDFS中創建氣象數據存放路徑/user/haha/in,執行命令如下:
cd /home/haha/hadoop-1.1.2/bin
hadoop fs -mkdir /user/haha/in
hadoop fs -ls /user/haha
2.3.5解壓氣象數據並上傳到HDFS中
使用SSH工具或者scp命令把從NCDC下載的氣象數據上傳到上步驟創建的目錄/user/haha/in中。
使用zcat命令把這些數據文件解壓並合並到一個sample.txt文件中,合並后把這個文件上傳到HDFS文件系統的/usr/hadoop/in目錄中:
cd /user/haha/hadoop-1.1.2/in
zcat *.gz > sample.txt
hadoop fs -copyFromLocal sample.txt /user/haha/in
氣象數據具體的下載地址為 ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/ ,該數據包括1900年到現在所有年份的氣象數據,大小大概有70多個G。為了測試簡單,我們這里選取一部分的數據進行測試
2.3.6 運行程序
以jar的方式啟動MapReduce任務,執行輸出目錄為/user/haha/outputFile:
cd /home/haha/hadoop-1.1.2
hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/sample.txt outputFile
2.3.7查看結果
執行成功后,查看/user/haha/outputFile目錄中是否存在運行結果,使用cat查看結果:
[haha@Master ~]$ hadoop fs -ls /user/haha/outputFile
[haha@Master ~]$ hadoop fs -cat /user/haha/outputFile/part-r-00000
[haha@Master ~]$ hadoop fs -cat /user/haha/outputFile/part-r-00000
1972 11
2.3.8通過頁面結果
1. 查看jobtracker.jsp
http://master:50030/jobtracker.jsp
已經完成的作業任務:
任務的詳細信息:
2.查看dfshealth.jsp
http://master:50070/dfshealth.jsp
分別查看HDFS文件系統和日志
3 求溫度平均值能使用combiner嗎?
Q:如果求溫度的平均值,能使用combiner嗎?有沒有變通的方法.
A:不能使用,因為求平均值和前面求最值存在差異,各局部最值的最值還是等於整體的最值的,但是對於平均值而言,各局部平均值的平均值將不再是整體的平均值了,所以不能用combiner。可以通過變通的辦法使用combiner來計算平均值,即在combiner的鍵值對中不直接存儲最后的平均值,而是存儲所有值的和個數,最后在reducer輸出時再用和除以個數得到平均值。
3.1 程序代碼
AvgTemperature.java
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AvgTemperature {
public static void main(String[] args) throws Exception {
if(args.length != 2) {
System.out.println("Usage: AvgTemperatrue <input path><output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(AvgTemperature.class);
job.setJobName("Avg Temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(AvgTemperatureMapper.class);
job.setCombinerClass(AvgTemperatureCombiner.class);
job.setReducerClass(AvgTemperatureReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
AvgTemperatureMapper.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
publicclass AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if(line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if(airTemperature != MISSING && !quality.matches("[01459]")) {
context.write(new Text(year), new Text(String.valueOf(airTemperature)));
}
}
}
AvgTemperatureCombiner.java
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text>{
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sumValue = 0;
long numValue = 0;
for(Text value : values) {
sumValue += Double.parseDouble(value.toString());
numValue ++;
}
context.write(key, new Text(String.valueOf(sumValue) + ',' + String.valueOf(numValue)));
}
}
AvgTemperatureReducer.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class AvgTemperatureReducer extends Reducer<Text, Text, Text, IntWritable>{
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sumValue = 0;
long numValue = 0;
int avgValue = 0;
for(Text value : values) {
String[] valueAll = value.toString().split(",");
sumValue += Double.parseDouble(valueAll[0]);
numValue += Integer.parseInt(valueAll[1]);
}
avgValue = (int)(sumValue/numValue);
context.write(key, new IntWritable(avgValue));
}
}
3.2 實現過程
編寫代碼
進入/home/haha/hadoop-1.1.2/myclass目錄,在該目錄中建立AvgTemperature.java、AvgTemperatureMapper.java、AvgTemperatureCombiner.java和AvgTemperatureReducer.java代碼文件,執行命令如下:
cd /usr/local/hadoop-1.1.2/myclass/
vi AvgTemperature.java
vi AvgTemperatureMapper.java
vi AvgTemperatureCombiner.java
vi AvgTemperatureReducer.java
編譯代碼
在/home/user/hadoop-1.1.2/myclass目錄中,使用如下命令對java代碼進行編譯,為保證編譯成功,加入classpath變量,引入hadoop-core-1.1.2.jar包:
javac -classpath ../hadoop-core-1.1.2.jar *.java
ls
打包編譯文件
把編譯好class文件打包,否則在執行過程會發生錯誤。把打好的包移動到上級目錄並刪除編譯好的class文件:
jar cvf ./AvgTemperature.jar ./*.class
ls
mv *.jar ..
rm *.class
運行程序
數據使用求每年最低溫度的氣象數據,數據在HDFS位置為/user/haha/in/sample.txt,以jar的方式啟動MapReduce任務,執行輸出目錄為/user/haha/out1:
cd /home/haha/hadoop-1.1.2
hadoop jar AvgTemperature.jar AvgTemperature /user/haha/in/sample.txt /user/haha/out1
查看結果
執行成功后,查看/user/haha/out1目錄中是否存在運行結果,使用cat查看結果:
hadoop fs -ls /user/haha/out1
hadoop fs -cat /user/haha/out1/part-r-00000