1 運行環境說明
1.1 硬軟件環境
1.2 機器網絡環境
2 書面作業1:對雲計算的看法
2.1 書面作業1內容
3 書面作業2:使用MapReduce求每年最低溫度
3.1 書面作業2內容
3.2 運行代碼
3.2.1 MinTemperature
3.2.2 MinTemperatureMapper
3.2.3 MinTemperatureReducer
3.3 實現過程
3.3.1 編寫代碼
3.3.2 編譯代碼
3.3.3 打包編譯文件
3.3.4 創建目錄
3.3.5 解壓氣象數據並上傳到HDFS中
3.3.6 運行程序
3.3.7 查看結果
3.3.8 通過頁面結果
4 書面作業3:求溫度平均值能使用combiner嗎?
4.1 書面作業3內容
4.2 回答
4.3 程序代碼
4.3.1 AvgTemperature.java
4.3.2 AvgTemperatureMapper.java
4.3.3 AvgTemperatureCombiner.java
4.3.4 AvgTemperatureReducer.java
4.4 實現過程
4.4.1 編寫代碼
4.4.2 編譯代碼
4.4.3 打包編譯文件
4.4.4 運行程序
4.4.5 查看結果
5 書面作業4:使用Hadoop流求最高溫度(awk腳本)
5.1 書面作業4內容
5.2 程序代碼
5.2.1 執行代碼
5.2.2 mapper.sh
5.2.3 reducer.sh
5.3 實現過程
5.3.1 編寫代碼
5.3.2 運行程序
5.3.3 查看結果
6 書面作業4:使用Hadoop流求最高溫度(Python語言)
6.1 書面作業4內容
6.2 程序代碼
6.2.1 執行代碼
6.2.2 mapper.py
6.2.3 reducer.py
6.3 實現過程
6.3.1 編寫代碼
6.3.2 運行程序
6.3.3 查看結果
7 書面作業5:MapReduce是否可以自動識別新增節點?
7.1 書面作業5內容
7.2 程序代碼
7.3 實現過程
7.3.1 環境准備
7.3.2 准備數據
7.3.3 不啟動Hadoop4,運行任務
7.3.4 啟動Hadoop4,運行任務
7.3.5 重啟Hadoop集群,運行任務
7.3.6 結論
8 書面作業6:使用Hadoop公平調度器
8.1 書面作業6內容
8.2 程序代碼
8.3 實現過程
8.3.1 打開公平調度器
8.3.2 驗證啟動公平調度器
8.3.3 准備數據
8.3.4 運行分析MapReduce
8.3.5 觀察結果
9 問題解決
9.1 在作業5中新增節點后,DataNode無法啟動
1 運行環境說明
1.1 硬軟件環境
l 主機操作系統:Windows 64 bit,雙核4線程,主頻2.2G,6G內存
l 虛擬軟件:VMware® Workstation 9.0.0 build-812388
l 虛擬機操作系統:CentOS 64位,單核,1G內存
l JDK:1.7.0_55 64 bit
l Hadoop:1.1.2
1.2 機器網絡環境
集群包含三個節點:1個namenode、2個datanode,其中節點之間可以相互ping通。節點IP地址和主機名分布如下:
序號 |
機器名 |
類型 |
用戶名 |
運行進程 |
|
1 |
10.88.147.221 |
hadoop1 |
名稱節點 |
hadoop |
NN、SNN、JobTracer |
2 |
10.88.147.222 |
hadoop2 |
數據節點 |
hadoop |
DN、TaskTracer |
3 |
10.88.147.223 |
hadoop3 |
數據節點 |
hadoop |
DN、TaskTracer |
所有節點均是CentOS6.5 64bit系統,防火牆均禁用,所有節點上均創建了一個hadoop用戶,用戶主目錄是/usr/hadoop。所有節點上均創建了一個目錄/usr/local/hadoop,並且擁有者是hadoop用戶。
2 書面作業1:對雲計算的看法
2.1 書面作業1內容
說說你對雲計算的看法,是忽悠?還是能帶來真實價值的東西?
2.2 回答
雲計算是對現有資源集中優化后,對客戶提供服務,從現在的情況來看雲計算真實的為大家提供了服務,比如:網盤等。至於雲計算更為准確的定義為美國國家標准與技術研究院(NIST)定義:雲計算是一種按使用量付費的模式,這種模式提供可用的、便捷的、按需的網絡訪問,進入可配置的計算資源共享池(資源包括網絡,服務器,存儲,應用軟件,服務),這些資源能夠被快速提供,只需投入很少的管理工作或與服務供應商進行很少的交互。
雲計算特點如下:
(1) 超大規模:“雲”具有相當的規模,賦予用戶前所未有的計算能力;
(2) 虛擬化:雲計算支持用戶在任意位置、使用各種終端獲取應用服務;
(3) 高可靠性:“雲”使用了數據多副本容錯、計算節點同構可互換等措施來保障服務的高可靠性;
(4) 通用性:雲計算不針對特定的應用,同一個“雲”可以同時支撐不同的應用運行;
(5) 高可擴展性:“雲”的規模可以動態伸縮,滿足應用和用戶規模增長的需要;
(6) 按需服務:“雲”是一個龐大的資源池,可以需購買;
(7) 極其廉價:由於“雲”的特殊容錯措施可以采用極其廉價的節點來構成雲,“雲”的自動化集中式管理使大量企業無需負擔日益高昂的數據中心管理成本;
(8) 潛在的危險性
雲計算可以認為包括以下幾個層次的服務:基礎設施即服務(IaaS),平台即服務(PaaS)和軟件即服務(SaaS)。
l IaaS(Infrastructure-as-a-Service):基礎設施即服務。消費者通過Internet可以從完善的計算機基礎設施獲得服務。例如:硬件服務器租用。
l PaaS:PaaS(Platform-as-a- Service):平台即服務。PaaS實際上是指將軟件研發的平台作為一種服務,以SaaS的模式提交給用戶。因此,PaaS也是SaaS模式的一種應用。但是,PaaS的出現可以加快SaaS的發展,尤其是加快SaaS應用的開發速度。例如:軟件的個性化定制開發。
l SaaS:SaaS(Software-as-a- Service):軟件即服務。它是一種通過Internet提供軟件的模式,用戶無需購買軟件,而是向提供商租用基於Web的軟件,來管理企業經營活動。例如:陽光雲服務器。
3 書面作業2:使用MapReduce求每年最低溫度
3.1 書面作業2內容
下載氣象數據集部分數據,寫一個Map-Reduce作業,求每年的最低溫度,部署並運行之,抓圖過程
3.2 運行代碼
3.2.1MinTemperature
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
publicclass MinTemperature {
publicstaticvoid main(String[] args) throws Exception {
if(args.length != 2) {
System.err.println("Usage: MinTemperature<input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MinTemperature.class);
job.setJobName("Min temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MinTemperatureMapper.class);
job.setReducerClass(MinTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3.2.2MinTemperatureMapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
publicclass MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
privatestatic final intMISSING = 9999;
@Override
publicvoid map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if(line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if(airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
3.2.3MinTemperatureReducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
publicclass MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
publicvoid reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int minValue = Integer.MAX_VALUE;
for(IntWritable value : values) {
minValue = Math.min(minValue, value.get());
}
context.write(key, new IntWritable(minValue));
}
}
3.3 實現過程
3.3.1編寫代碼
進入/usr/local/hadoop-1.1.2/myclass目錄,在該目錄中建立MinTemperature.java、MinTemperatureMapper.java和MinTemperatureReducer.java代碼文件,代碼內容為3.2所示,執行命令如下:
cd /usr/local/hadoop-1.1.2/myclass/
vi MinTemperature.java
vi MinTemperatureMapper.java
vi MinTemperatureReducer.java
MinTemperature.java:
MinTemperatureMapper.java:
MinTemperatureReducer.java:
3.3.2編譯代碼
在/usr/local/hadoop-1.1.2/myclass目錄中,使用如下命令對java代碼進行編譯,為保證編譯成功,加入classpath變量,引入hadoop-core-1.1.2.jar包:
javac -classpath ../hadoop-core-1.1.2.jar *.java
ls
3.3.3打包編譯文件
把編譯好class文件打包,否則在執行過程會發生錯誤。把打好的包移動到上級目錄並刪除編譯好的class文件:
jar cvf ./MinTemperature.jar ./*.class
ls
mv *.jar ..
rm *.class
3.3.4創建目錄
進入/usr/local/hadoop-1.1.2/bin目錄,在HDFS中創建氣象數據存放路徑/usr/hadoop/in,執行命令如下:
cd /usr/local/hadoop-1.1.2/bin
hadoop fs -mkdir /usr/hadoop/in
hadoop fs -ls /usr/hadoop
3.3.5解壓氣象數據並上傳到HDFS中
使用SSH工具(參見第1、2周2.1.3.1Linux文件傳輸工具所描述)把從NCDC下載的氣象數據上傳到上步驟創建的目錄/usr/local/hadoop-1.1.2/input中。
使用zcat命令把這些數據文件解壓並合並到一個sample.txt文件中,合並后把這個文件上傳到HDFS文件系統的/usr/hadoop/in目錄中:
cd /usr/local/hadoop-1.1.2/input
zcat *.gz > sample.txt
hadoop fs -copyFromLocal sample.txt /usr/hadoop/in
氣象數據具體的下載地址為 ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/ ,該數據包括1900年到現在所有年份的氣象數據,大小大概有70多個G。為了測試簡單,我們這里選取一部分的數據進行測試
3.3.6運行程序
以jar的方式啟動MapReduce任務,執行輸出目錄為/usr/hadoop/out:
cd /usr/local/hadoop-1.1.2
hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/sample.txt /usr/hadoop/out
3.3.7查看結果
執行成功后,查看/usr/hadoop/out目錄中是否存在運行結果,使用cat查看結果:
hadoop fs -ls /usr/hadoop/out
hadoop fs -cat /usr/hadoop/out/part-r-00000
3.3.8通過頁面結果
1. 查看jobtracker.jsp
http://10.88.147.221:50030/jobtracker.jsp
已經完成的作業任務:
任務的詳細信息:
2. 查看dfshealth.jsp
http://10.88.147.221:50070/dfshealth.jsp
分別查看HDFS文件系統和日志
4 書面作業3:求溫度平均值能使用combiner嗎?
4.1 書面作業3內容
(選作)如果求溫度的平均值,能使用combiner嗎?有沒有變通的方法?說說你的看法
4.2 回答
不能使用,因為求平均值和前面求最值存在差異,各局部最值的最值還是等於整體的最值的,但是對於平均值而言,各局部平均值的平均值將不再是整體的平均值了,所以不能用combiner。可以通過變通的辦法使用combiner來計算平均值,即在combiner的鍵值對中不直接存儲最后的平均值,而是存儲所有值的和個數,最后在reducer輸出時再用和除以個數得到平均值。
4.3 程序代碼
4.3.1AvgTemperature.java
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
publicclass AvgTemperature {
publicstaticvoid main(String[] args) throws Exception {
if(args.length != 2) {
System.out.println("Usage: AvgTemperatrue <input path><output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(AvgTemperature.class);
job.setJobName("Avg Temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(AvgTemperatureMapper.class);
job.setCombinerClass(AvgTemperatureCombiner.class);
job.setReducerClass(AvgTemperatureReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4.3.2AvgTemperatureMapper.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
publicclass AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text> {
privatestaticfinalintMISSING = 9999;
@Override
publicvoid map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if(line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if(airTemperature != MISSING && !quality.matches("[01459]")) {
context.write(new Text(year), new Text(String.valueOf(airTemperature)));
}
}
}
4.3.3AvgTemperatureCombiner.java
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
publicclass AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text>{
@Override
publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sumValue = 0;
long numValue = 0;
for(Text value : values) {
sumValue += Double.parseDouble(value.toString());
numValue ++;
}
context.write(key, new Text(String.valueOf(sumValue) + ',' + String.valueOf(numValue)));
}
}
4.3.4AvgTemperatureReducer.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
publicclass AvgTemperatureReducer extends Reducer<Text, Text, Text, IntWritable>{
@Override
publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sumValue = 0;
long numValue = 0;
int avgValue = 0;
for(Text value : values) {
String[] valueAll = value.toString().split(",");
sumValue += Double.parseDouble(valueAll[0]);
numValue += Integer.parseInt(valueAll[1]);
}
avgValue = (int)(sumValue/numValue);
context.write(key, new IntWritable(avgValue));
}
}
4.4 實現過程
4.4.1編寫代碼
進入/usr/local/hadoop-1.1.2/myclass目錄,在該目錄中建立AvgTemperature.java、AvgTemperatureMapper.java、AvgTemperatureCombiner.java和AvgTemperatureReducer.java代碼文件,代碼內容為4.3所示,執行命令如下:
cd /usr/local/hadoop-1.1.2/myclass/
vi AvgTemperature.java
vi AvgTemperatureMapper.java
vi AvgTemperatureCombiner.java
vi AvgTemperatureReducer.java
AvgTemperature.java:
AvgTemperatureMapper.java:
AvgTemperatureCombiner.java:
AvgTemperatureReducer.java:
4.4.2編譯代碼
在/usr/local/hadoop-1.1.2/myclass目錄中,使用如下命令對java代碼進行編譯,為保證編譯成功,加入classpath變量,引入hadoop-core-1.1.2.jar包:
javac -classpath ../hadoop-core-1.1.2.jar *.java
ls
4.4.3打包編譯文件
把編譯好class文件打包,否則在執行過程會發生錯誤。把打好的包移動到上級目錄並刪除編譯好的class文件:
jar cvf ./AvgTemperature.jar ./*.class
ls
mv *.jar ..
rm *.class
4.4.4運行程序
數據使用作業2求每年最低溫度的氣象數據,數據在HDFS位置為/usr/ hadoop/in/sample.txt,以jar的方式啟動MapReduce任務,執行輸出目錄為/usr/hadoop/out1:
cd /usr/local/hadoop-1.1.2
hadoop jar AvgTemperature.jar AvgTemperature /usr/hadoop/in/sample.txt /usr/hadoop/out1
4.4.5查看結果
執行成功后,查看/usr/hadoop/out目錄中是否存在運行結果,使用cat查看結果:
hadoop fs -ls /usr/hadoop/out1
hadoop fs -cat /usr/hadoop/out1/part-r-00000
5 書面作業4:使用Hadoop流求最高溫度(awk腳本)
5.1 書面作業4內容
(選作)使用hadoop流的方法來實現對氣象數據集求最高溫度的分析任務(可能要使用awk這類腳本工具)
5.2 程序代碼
5.2.1執行代碼
hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar \
-input /usr/hadoop/in/sample.txt \
-output /usr/hadoop/out_awk \
-mapper myclass/map.sh \
-reducer myclass/reducer.sh \
-file myclass/map.sh \
-file myclass/reducer.sh
5.2.2mapper.sh
#!/usr/bin/awk -f
BEGIN { FS = "," }
{ year = substr($0, 16, 4) + 0;
temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
LF="\n";
if(temp != 9999 && q ~ /[01459]/)
{
printf "%s %s %s", year, temp,LF;
}
}
END {
printf "\n"
}
5.2.3reducer.sh
#!/usr/bin/awk -f
{
temp[$1] = $2
if ( temp[$1] > maxtemp[$1] )
maxtemp[$1] = temp[$1];
}
END {
for(num in maxtemp)
printf "%s %s\n", num, maxtemp[num];
}
5.3 實現過程
5.3.1編寫代碼
進入/usr/local/hadoop-1.1.2/myclass目錄,在該目錄中建立mapper.sh和reducer.sh代碼文件,代碼內容為5.2所示,執行命令如下:
cd /usr/local/hadoop-1.1.2/myclass/
vi mapper.sh
vi reducer.sh
mapper.sh:
reducer.sh:
5.3.2運行程序
數據使用作業2求每年最低溫度的氣象數據,數據在HDFS位置為/usr /hadoop/in/sample.txt,啟動MapReduce任務,執行輸出目錄為/usr/hadoop/out_awk:
cd /usr/local/hadoop-1.1.2
hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar \
-input /usr/hadoop/in/sample.txt \
-output /usr/hadoop/out_awk \
-mapper myclass/mapper.sh \
-reducer myclass/reducer.sh \
-file myclass/mapper.sh \
-file myclass/reducer.sh
5.3.3查看結果
執行成功后,查看/usr/hadoop/out_awk目錄中是否存在運行結果,使用cat查看結果:
hadoop fs -ls /usr/hadoop/out_awk
hadoop fs -cat /usr/hadoop/out_awk/part-00000
6 書面作業4:使用Hadoop流求最高溫度(Python語言)
6.1 書面作業4內容
(選作)使用hadoop流的方法來實現對氣象數據集求最高溫度的分析任務(可能要使用awk這類腳本工具)
6.2 程序代碼
6.2.1執行代碼
hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar \
-input /usr/hadoop/in/sample.txt \
-output /usr/hadoop/out_python \
-mapper myclass/mapper.py \
-reducer myclass/reduce.py \
-file myclass/mapper.py \
-file myclass/reduce.py
6.2.2mapper.py
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)
6.2.3reducer.py
#!/usr/bin/env python
import sys
(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)
6.3 實現過程
6.3.1編寫代碼
進入/usr/local/hadoop-1.1.2/myclass目錄,在該目錄中建立mapper.py和reducer.py代碼文件,代碼內容為6.2所示,執行命令如下:
cd /usr/local/hadoop-1.1.2/myclass/
vi mapper.py
vi reducer.py
mapper.py:
reducer.py:
6.3.2運行程序
數據使用作業2求每年最低溫度的氣象數據,數據在HDFS位置為/usr/hadoop/in/sample.txt,啟動MapReduce任務,執行輸出目錄為/usr/hadoop/out_py:
cd /usr/local/hadoop-1.1.2
hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar \
-input /usr/hadoop/in/sample.txt \
-output /usr/hadoop/out_py \
-mapper myclass/mapper.py \
-reducer myclass/reducer.py \
-file myclass/mapper.py \
-file myclass/reducer.py
6.3.3查看結果
執行成功后,查看/usr/hadoop/out_py目錄中是否存在運行結果,使用cat查看結果:
hadoop fs -ls /usr/hadoop/out_py
hadoop fs -cat /usr/hadoop/out_py/part-00000
7 書面作業5:MapReduce是否可以自動識別新增節點?
7.1 書面作業5內容
(選作)如果向正在運行的Hadoop集群增加一個新節點,Map-Reduce體系是否可以自動識別並使用這個新節點?如果不能怎樣才可以將其加入到Map-Reduce體系?用實驗進行驗證
7.2 程序代碼
參見作業2代碼並打成jar包
7.3 實現過程
在這里將運行本周作業2求每年最低溫度的MapReduce(當然也可以運行求最高溫度等例子),為了能夠更清楚的觀察實驗的效果,在該作業中運行的數據盡可能大,以下實驗中將使用1970~1972年各年份前400個氣象站數據,解壓后大概480M左右。在本實驗中將觀察兩個情況:一種是運行過程新增節點,另一種是重啟集群觀察作業是否運行在新增節點上?
新增節點信息:
序號 |
IP地址 |
機器名 |
類型 |
用戶名 |
運行進程 |
4 |
10.88.147.224 |
Hadoop4 |
數據節點 |
hadoop |
DN、TaskTracer |
7.3.1環境准備
參考第三周第二題第四個問題--SNN與NN分離--進行環境准備,增加一個新節點並確保各節點能夠免密碼SSH訪問
7.3.1.1復制虛擬機
復制DataNode節點所在虛擬機,新虛擬機作為新增節點
7.3.1.2設置新增節點虛擬機IP地址
設置該節點IP地址為:10.88.147.224
7.3.1.3設置新增節點虛擬機名稱
設置新增節點虛擬機名稱為:hadoop4
sudo vi /etc/sysconfig/network
7.3.1.4所有節點hosts 文件加入新增節點的 IP對應信息
在所有節點/etc/hosts文件中加入新增節點的IP地址10.88.147.224對應hadoop4
sudo vi /etc/hosts
7.3.1.5所有節點slavers文件加入新增節點信息
在新增節點hadoop4機器的slaves文件中加入新增節點機器名信息,使用如下命令:
sudo vi /usr/local/hadoop-1.1.2/conf/slaves
在slaves文件中加入新增節點機器名
7.3.1.6重啟所有虛擬機
7.3.1.7配置ssh免密碼登錄
1. 在hadoop4(10.88.147.244)節點中使用ssh-keygen -t rsa生成私鑰和公鑰;
2. 把hadoop4(10.88.147.244)節點中公鑰信息加入到authorized_keys文件中;
chmod 400 -R /home/hadoop/.ssh
cat id_rsa.pub >> authorized_keys
cat authorized_keys
3. 把authorized_keys分發到各個節點上;
scp authorized_keys hadoop@hadoop1:/home/hadoop/.ssh
4. 驗證是否能夠免登錄到各個節點;
7.3.2准備數據
參考作業2,為了能夠更清楚的觀察實驗的效果,在該作業中運行的數據盡可能大,在這里上傳數據大概480M左右。使用SSH工具(參見第1、2周2.1.3.1 Linux文件傳輸工具所描述)把從NCDC下載的氣象數據上傳到上步驟創建的目錄/usr/local/hadoop-1.1.2/input中。
先把這三年的數據放到/usr/local/hadoop-1.1.2/input目錄下,然后使用zcat命令把這些數據文件解壓並合並到一個newnode.txt文件中,合並后把這個文件上傳到HDFS文件系統的/usr/hadoop/in目錄中:
cd /usr/local/hadoop-1.1.2/input
mv 1971/*.gz .
mv 1972/*.gz .
mv 1973/*.gz .
zcat *.gz > newnode.txt
ll newnode.txt
hadoop fs -copyFromLocal newnode.txt /usr/hadoop/in
hadoop fs -ls /usr/hadoop/in
7.3.3不啟動Hadoop4,運行任務
使用本周作業2打包的jar包,啟動作業,執行輸出目錄為/usr/hadoop/out_newnode1:
cd /usr/local/hadoop-1.1.2
hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/newnode.txt /usr/hadoop/out_newnode1
查看運行作業節點信息,Hadoop集群由兩個DataNode節點,分別為Hadoop2和Hadoop3:
7.3.4啟動Hadoop4,運行任務
再次啟動作業,執行輸出目錄為/usr/hadoop/out_newnode2:
cd /usr/local/hadoop-1.1.2
hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/newnode.txt /usr/hadoop/out_newnode2
運行作業后,啟動新增節點Hadoop4中的DataNode和TaskTracker進程
cd /usr/local/hadoop-1.1.2/bin
hadoop-daemon.sh start datanode
hadoop-daemon.sh start tasktracker
jps
觀察結果,可以看到Hadoop集群識別到Hadoop4節點並發現該節點運行的TaskTracker:
在新增節點Hadoop4正在運行task任務
7.3.5重啟Hadoop集群,運行任務
停止Hadoop集群運行:
cd /usr/local/hadoop-1.1.2/bin
stop-all.sh
在Hadoop4節點中結束DataNode和TaskTracker進程:
cd /usr/local/hadoop-1.1.2/bin
hadoop-daemon.sh stop datanode
hadoop-daemon.sh stop tasktracker
在Hadoop1節點中HADOOP_HOME/conf/salvers中加入Hadoop4節點
cd /usr/local/hadoop-1.1.2/conf
sudo vi /usr/local/hadoop-1.1.2/conf/slaves
啟動Hadoop集群,從下圖可以看見Hadoop4節點已經隨Hadoop集群啟動:
cd /usr/local/hadoop-1.1.2/bin
start-all.sh
再次啟動運行作業,可以觀察到Hadoop4節點得到運行任務:
cd /usr/local/hadoop-1.1.2
hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/newnode.txt /usr/hadoop/out_newnode3
7.3.6結論
綜上,當Hadoop集群有新節點加入時,正在運行的MapReduce作業自動識別並使用新節點;當新節點加入Hadoop集群后,再啟動MapReduce作業,MapReduce作業也能自動識別並使用新節點。
8 書面作業6:使用Hadoop公平調度器
8.1 書面作業6內容
(選作)怎樣打開Hadoop的公平調度器?怎樣證實當前處於公平調度下。設計一個實驗方案驗證FIFO調度器和公平調度器的異同,最好能實踐之
8.2 程序代碼
參見作業2代碼並打成jar包
8.3 實現過程
在這里將運行本周作業2求每年最低溫度的MapReduce(當然也可以運行求最高溫度等例子),為了能夠更加明顯看到實驗效果該實驗將准備兩份實驗數據:數據A為1970~1972年各年份前10個氣象站合並數據,數據B為1970~1972年各年份前240個氣象站合並數據,那么分析數據A的MapReduceA時間將大大少於分析數據B的MapReduceB時間。在實驗過程中將讓分析數據B的MapReduceB先啟動,然后啟動分析數據A的MapReduceA,觀察MapReduceA和MapReduceB哪一個先結束?
8.3.1打開公平調度器
要讓公平調度器能在Hadoop中運行,需要把相應的jar放到CLASSPATH中。在Hadoop1.X以前需要把hadoop-*-fairscheduler.jar從HADOOP_HOME/build/contrib/fairscheduler拷貝到HADOOP_HOME/lib,在Hadoop1.1.2版本在HADOOP_HOME/lib已經包含了公平調度器和能力調度器兩個jar包
並需要在Hadoop的配置文件HADOOP_CONF_DIR/mapred-site.xml中設置下列屬性讓Hadoop啟用公平調度器:
<property>
<name>mapred.jobtracker.taskScheduler</name>
<value>org.apache.hadoop.mapred.FairScheduler</value>
</property>
8.3.2驗證啟動公平調度器
重啟集群
cd /usr/local/hadoop-1.1.2/bin
stop-all.sh
start-all.sh
通過jobtracker web UI查看:http://<JobTracker URL>/scheduler,在該實驗地址為http://10.88.147.221:50030/scheduler,如下圖所示
8.3.3准備數據
使用SSH工具(參見第1、2周2.1.3.1Linux文件傳輸工具所描述)把從NCDC下載的氣象數據上傳到上步驟創建的目錄/usr/local/hadoop-1.1.2/input中。
使用zcat命令把這些數據文件分別解壓並合並到shortData.txt和longData.txt文件中,合並后把這兩個文件上傳到HDFS文件系統的/usr/hadoop/in目錄中:
cd /usr/local/hadoop-1.1.2/input
zcat *.gz > shortData.txt
zcat *.gz > longData.txt
hadoop fs -copyFromLocal shortData.txt /usr/hadoop/in
hadoop fs -copyFromLocal longData.txt /usr/hadoop/in
hadoop fs -ls /usr/hadoop/in
8.3.4運行分析MapReduce
使用本周作業2打包的jar包,先啟動MapReduceB,然后在另外一個登錄Session中啟動MapReduceA,執行輸出目錄為/usr/hadoop/out_long 和/usr/hadoop/out_short:
cd /usr/local/hadoop-1.1.2
hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/longData.txt /usr/hadoop/out_long
在另外一個登錄Session運行情況:
cd /usr/local/hadoop-1.1.2
hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/shortData.txt /usr/hadoop/out_short
8.3.5觀察結果
可以從上步驟結果圖觀察到:
MapReduceB 開始執行時間2014-10-14 16:51:13 結束時間:2014-10-14 16:53:36
MapReduceA 開始執行時間2014-10-14 16:51:20 結束時間:2014-10-14 16:53:28
MapReduceB這個job先運行,但因為這個job處理的數據量較大,而后一個MapReduceA的 job處理的數據量較小,所以MapReduceA的 job反而較早運行結束,可知公平調度器設置成功。
MapReduceB 作業ID為:job_201410141618_0009
MapReduceA 作業ID為:job_201410141618_0010
9 問題解決
9.1 在作業5中新增節點后,DataNode無法啟動
通過拷貝Hadoop3(10.88.147.223)節點,修改名稱后成為Hadoop4(10.88.147.224)節點,啟動Hadoop3或者Hadoop4的DataNode會出現如下錯誤:
2014-10-15 16:28:40,344 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: DataNode is shutting down: org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException: Data node 10.88.147.223:50010 is attempting to report storage ID DS-60030049-10.88.147.223-50010-1411456729315. Node 10.88.147.224:50010 is expected to serve this storage.
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDatanode(FSNamesystem.java:4776)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.processReport(FSNamesystem.java:3628)
at org.apache.hadoop.hdfs.server.namenode.NameNode.blockReport(NameNode.java:1041)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:578)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1393)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1389)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1387)
at org.apache.hadoop.ipc.Client.call(Client.java:1107)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229)
at com.sun.proxy.$Proxy5.blockReport(Unknown Source)
at org.apache.hadoop.hdfs.server.datanode.DataNode.offerService(DataNode.java:1026)
at org.apache.hadoop.hdfs.server.datanode.DataNode.run(DataNode.java:1527)
at java.lang.Thread.run(Thread.java:745)
解決辦法是修改Hadoop4節點中HADOOP_HOME/hdfs/data/current/VERSION文件,把storageID由DS-60030049-10.88.147.223-50010-1411456729315修改成DS-60030049-10.88.147.224-50010-1411456729315即可