Hadoop第5周練習—MapReduce計算氣象溫度等例子


1    運行環境說明... 4

1.1     硬軟件環境... 4

1.2     機器網絡環境... 4

2    書面作業1:對雲計算的看法... 4

2.1     書面作業1內容... 4

2.2     回答... 5

3    書面作業2:使用MapReduce求每年最低溫度... 6

3.1     書面作業2內容... 6

3.2     運行代碼... 6

3.2.1   MinTemperature. 6

3.2.2   MinTemperatureMapper6

3.2.3   MinTemperatureReducer7

3.3     實現過程... 8

3.3.1   編寫代碼... 8

3.3.2   編譯代碼... 9

3.3.3   打包編譯文件... 9

3.3.4   創建目錄... 10

3.3.5   解壓氣象數據並上傳到HDFS... 10

3.3.6   運行程序... 12

3.3.7   查看結果... 12

3.3.8   通過頁面結果... 12

4    書面作業3:求溫度平均值能使用combiner嗎?... 15

4.1     書面作業3內容... 15

4.2     回答... 15

4.3     程序代碼... 15

4.3.1   AvgTemperature.java. 15

4.3.2   AvgTemperatureMapper.java. 16

4.3.3   AvgTemperatureCombiner.java. 17

4.3.4   AvgTemperatureReducer.java. 17

4.4     實現過程... 18

4.4.1   編寫代碼... 18

4.4.2   編譯代碼... 20

4.4.3   打包編譯文件... 20

4.4.4   運行程序... 21

4.4.5   查看結果... 22

5    書面作業4:使用Hadoop流求最高溫度(awk腳本)... 22

5.1     書面作業4內容... 22

5.2     程序代碼... 22

5.2.1   執行代碼... 22

5.2.2   mapper.sh. 22

5.2.3   reducer.sh. 23

5.3     實現過程... 23

5.3.1   編寫代碼... 23

5.3.2   運行程序... 24

5.3.3   查看結果... 24

6    書面作業4:使Hadoop流求最高溫度(Python語言)... 25

6.1     書面作業4內容... 25

6.2     程序代碼... 25

6.2.1   執行代碼... 25

6.2.2   mapper.py. 25

6.2.3   reducer.py. 25

6.3     實現過程... 26

6.3.1   編寫代碼... 26

6.3.2   運行程序... 27

6.3.3   查看結果... 27

7    書面作業5MapReduce是否可以自動識別新增節點?... 28

7.1     書面作業5內容... 28

7.2     程序代碼... 28

7.3     實現過程... 28

7.3.1   環境准備... 28

7.3.2   准備數據... 32

7.3.3   不啟動Hadoop4,運行任務... 33

7.3.4   啟動Hadoop4,運行任務... 34

7.3.5   重啟Hadoop集群,運行任務... 35

7.3.6   結論... 37

8    書面作業6:使用Hadoop公平調度器... 38

8.1     書面作業6內容... 38

8.2     程序代碼... 38

8.3     實現過程... 38

8.3.1   打開公平調度器... 38

8.3.2   驗證啟動公平調度器... 39

8.3.3   准備數據... 40

8.3.4   運行分析MapReduce. 41

8.3.5   觀察結果... 42

9    問題解決... 43

9.1     在作業5中新增節點后,DataNode無法啟動... 43

 

1 運行環境說明

1.1  硬軟件環境

l  主機操作系統:Windows 64 bit,雙核4線程,主頻2.2G6G內存

l  虛擬軟件:VMware® Workstation 9.0.0 build-812388

l  虛擬機操作系統:CentOS 64位,單核,1G內存

l  JDK1.7.0_55 64 bit

l  Hadoop1.1.2

1.2  機器網絡環境

集群包含三個節點:1namenode2datanode,其中節點之間可以相互ping通。節點IP地址和主機名分布如下:

序號

IP地址

機器名

類型

用戶名

運行進程

1

10.88.147.221

hadoop1

名稱節點

hadoop

NNSNNJobTracer

2

10.88.147.222

hadoop2

數據節點

hadoop

DNTaskTracer

3

10.88.147.223

hadoop3

數據節點

hadoop

DNTaskTracer

所有節點均是CentOS6.5 64bit系統,防火牆均禁用,所有節點上均創建了一個hadoop用戶,用戶主目錄是/usr/hadoop。所有節點上均創建了一個目錄/usr/local/hadoop,並且擁有者是hadoop用戶。

2 書面作業1:對雲計算的看法

2.1  書面作業1內容

說說你對雲計算的看法,是忽悠?還是能帶來真實價值的東西?

2.2  回答

雲計算是對現有資源集中優化后,對客戶提供服務,從現在的情況來看雲計算真實的為大家提供了服務,比如:網盤等。至於雲計算更為准確的定義為美國國家標准與技術研究院(NIST)定義:雲計算是一種按使用量付費的模式,這種模式提供可用的、便捷的、按需的網絡訪問,進入可配置的計算資源共享池(資源包括網絡,服務器,存儲,應用軟件,服務),這些資源能夠被快速提供,只需投入很少的管理工作或與服務供應商進行很少的交互。

雲計算特點如下:

(1) 超大規模:“雲”具有相當的規模,賦予用戶前所未有的計算能力;

(2) 虛擬化:雲計算支持用戶在任意位置、使用各種終端獲取應用服務;

(3) 高可靠性:“雲”使用了數據多副本容錯、計算節點同構可互換等措施來保障服務的高可靠性;

(4) 通用性:雲計算不針對特定的應用,同一個“雲”可以同時支撐不同的應用運行;

(5) 高可擴展性:“雲”的規模可以動態伸縮,滿足應用和用戶規模增長的需要;

(6) 按需服務:“雲”是一個龐大的資源池,可以需購買;

(7) 極其廉價:由於“雲”的特殊容錯措施可以采用極其廉價的節點來構成雲,“雲”的自動化集中式管理使大量企業無需負擔日益高昂的數據中心管理成本;

(8) 潛在的危險性

雲計算可以認為包括以下幾個層次的服務:基礎設施即服務(IaaS),平台即服務(PaaS)和軟件即服務(SaaS)。

l  IaaS(Infrastructure-as-a-Service):基礎設施即服務。消費者通過Internet可以從完善的計算機基礎設施獲得服務。例如:硬件服務器租用。

l  PaaSPaaS(Platform-as-a- Service):平台即服務。PaaS實際上是指將軟件研發的平台作為一種服務,以SaaS的模式提交給用戶。因此,PaaS也是SaaS模式的一種應用。但是,PaaS的出現可以加快SaaS的發展,尤其是加快SaaS應用的開發速度。例如:軟件的個性化定制開發。

l  SaaSSaaS(Software-as-a- Service):軟件即服務。它是一種通過Internet提供軟件的模式,用戶無需購買軟件,而是向提供商租用基於Web的軟件,來管理企業經營活動。例如:陽光雲服務器。

3  書面作業2:使用MapReduce求每年最低溫度

3.1  書面作業2內容

下載氣象數據集部分數據,寫一個Map-Reduce作業,求每年的最低溫度,部署並運行之,抓圖過程

3.2  運行代碼

3.2.1MinTemperature

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

publicclass MinTemperature {

   

    publicstaticvoid main(String[] args) throws Exception {

        if(args.length != 2) {

            System.err.println("Usage: MinTemperature<input path> <output path>");

            System.exit(-1);

        }

       

        Job job = new Job();

        job.setJarByClass(MinTemperature.class);

        job.setJobName("Min temperature");

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MinTemperatureMapper.class);

        job.setReducerClass(MinTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

 

 

3.2.2MinTemperatureMapper

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

publicclass MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

 

    privatestatic final intMISSING = 9999;

   

    @Override

    publicvoid map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

       

        String line = value.toString();

        String year = line.substring(15, 19);

       

        int airTemperature;

        if(line.charAt(87) == '+') {

            airTemperature = Integer.parseInt(line.substring(88, 92));

        } else {

            airTemperature = Integer.parseInt(line.substring(87, 92));

        }

       

        String quality = line.substring(92, 93);

        if(airTemperature != MISSING && quality.matches("[01459]")) {

            context.write(new Text(year), new IntWritable(airTemperature));

        }

    }

}

3.2.3MinTemperatureReducer

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

publicclass MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

 

    @Override

    publicvoid reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

       

        int minValue = Integer.MAX_VALUE;

        for(IntWritable value : values) {

            minValue = Math.min(minValue, value.get());

        }

        context.write(key, new IntWritable(minValue));

    }

}

3.3  實現過程

3.3.1編寫代碼

進入/usr/local/hadoop-1.1.2/myclass目錄,在該目錄中建立MinTemperature.javaMinTemperatureMapper.javaMinTemperatureReducer.java代碼文件,代碼內容為3.2所示,執行命令如下:

cd /usr/local/hadoop-1.1.2/myclass/

vi MinTemperature.java

vi MinTemperatureMapper.java

vi MinTemperatureReducer.java

clip_image002

MinTemperature.java

clip_image004

MinTemperatureMapper.java

clip_image006

MinTemperatureReducer.java:

clip_image008

3.3.2編譯代碼

/usr/local/hadoop-1.1.2/myclass目錄中,使用如下命令對java代碼進行編譯,為保證編譯成功,加入classpath變量,引入hadoop-core-1.1.2.jar包:

javac -classpath ../hadoop-core-1.1.2.jar *.java

ls

clip_image010

3.3.3打包編譯文件

把編譯好class文件打包,否則在執行過程會發生錯誤。把打好的包移動到上級目錄並刪除編譯好的class文件:

jar cvf ./MinTemperature.jar ./*.class

ls

mv *.jar ..

rm *.class

clip_image012

3.3.4創建目錄

進入/usr/local/hadoop-1.1.2/bin目錄,在HDFS中創建氣象數據存放路徑/usr/hadoop/in,執行命令如下:

cd /usr/local/hadoop-1.1.2/bin

hadoop fs -mkdir /usr/hadoop/in

hadoop fs -ls /usr/hadoop                

clip_image014

3.3.5解壓氣象數據並上傳到HDFS

使用SSH工具(參見第122.1.3.1Linux文件傳輸工具所描述)把從NCDC下載的氣象數據上傳到上步驟創建的目錄/usr/local/hadoop-1.1.2/input中。

clip_image016

使用zcat命令把這些數據文件解壓並合並到一個sample.txt文件中,合並后把這個文件上傳到HDFS文件系統的/usr/hadoop/in目錄中:

cd /usr/local/hadoop-1.1.2/input

zcat *.gz > sample.txt

hadoop fs -copyFromLocal sample.txt /usr/hadoop/in

clip_image018

clip_image020

氣象數據具體的下載地址為 ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/ ,該數據包括1900年到現在所有年份的氣象數據,大小大概有70多個G。為了測試簡單,我們這里選取一部分的數據進行測試

3.3.6運行程序

jar的方式啟動MapReduce任務,執行輸出目錄為/usr/hadoop/out

cd /usr/local/hadoop-1.1.2

hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/sample.txt  /usr/hadoop/out

clip_image022

3.3.7查看結果

執行成功后,查看/usr/hadoop/out目錄中是否存在運行結果,使用cat查看結果:

hadoop fs -ls /usr/hadoop/out

hadoop fs -cat /usr/hadoop/out/part-r-00000

clip_image024

3.3.8通過頁面結果

1.     查看jobtracker.jsp

http://10.88.147.221:50030/jobtracker.jsp

clip_image026

已經完成的作業任務:

clip_image028

任務的詳細信息:

clip_image030

clip_image032

2.     查看dfshealth.jsp

http://10.88.147.221:50070/dfshealth.jsp

clip_image034

分別查看HDFS文件系統和日志

clip_image036

clip_image038

 

4    書面作業3:求溫度平均值能使用combiner嗎?

4.1  書面作業3內容

(選作)如果求溫度的平均值,能使用combiner嗎?有沒有變通的方法?說說你的看法

4.2  回答

不能使用,因為求平均值和前面求最值存在差異,各局部最值的最值還是等於整體的最值的,但是對於平均值而言,各局部平均值的平均值將不再是整體的平均值了,所以不能用combiner。可以通過變通的辦法使用combiner來計算平均值,即在combiner的鍵值對中不直接存儲最后的平均值,而是存儲所有值的和個數,最后在reducer輸出時再用和除以個數得到平均值。

4.3  程序代碼

4.3.1AvgTemperature.java

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

publicclass AvgTemperature {

   

    publicstaticvoid main(String[] args) throws Exception {

         

        if(args.length != 2) {

            System.out.println("Usage: AvgTemperatrue <input path><output path>");

            System.exit(-1);

        }

       

        Job job = new Job();

        job.setJarByClass(AvgTemperature.class);

        job.setJobName("Avg Temperature");

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

       

        job.setMapperClass(AvgTemperatureMapper.class);

        job.setCombinerClass(AvgTemperatureCombiner.class);

        job.setReducerClass(AvgTemperatureReducer.class);

       

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(Text.class);

       

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

       

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

4.3.2AvgTemperatureMapper.java

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

publicclass AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text> {

 

    privatestaticfinalintMISSING = 9999;

   

    @Override

    publicvoid map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{

       

        String line = value.toString();

        String year = line.substring(15, 19);

       

        int airTemperature;

        if(line.charAt(87) == '+') {

            airTemperature = Integer.parseInt(line.substring(88, 92));

        } else {

            airTemperature =  Integer.parseInt(line.substring(87, 92));

        }

       

        String quality = line.substring(92, 93);

        if(airTemperature != MISSING && !quality.matches("[01459]")) {

            context.write(new Text(year), new Text(String.valueOf(airTemperature)));

        }

    }

}

4.3.3AvgTemperatureCombiner.java

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

publicclass AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text>{

 

    @Override

    publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

       

        double sumValue = 0;

        long numValue = 0;

       

        for(Text value : values) {

            sumValue += Double.parseDouble(value.toString());

            numValue ++;

        }

       

        context.write(key, new Text(String.valueOf(sumValue) + ',' + String.valueOf(numValue)));

    }

}

4.3.4AvgTemperatureReducer.java

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

publicclass AvgTemperatureReducer extends Reducer<Text, Text, Text, IntWritable>{

 

    @Override

    publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

       

        double sumValue = 0;

        long numValue = 0;

        int avgValue = 0;

       

        for(Text value : values) {

            String[] valueAll = value.toString().split(",");

            sumValue += Double.parseDouble(valueAll[0]);

            numValue += Integer.parseInt(valueAll[1]);

        }

       

        avgValue  = (int)(sumValue/numValue);

        context.write(key, new IntWritable(avgValue));

    }

}

4.4  實現過程

4.4.1編寫代碼

進入/usr/local/hadoop-1.1.2/myclass目錄,在該目錄中建立AvgTemperature.javaAvgTemperatureMapper.javaAvgTemperatureCombiner.javaAvgTemperatureReducer.java代碼文件,代碼內容為4.3所示,執行命令如下:

cd /usr/local/hadoop-1.1.2/myclass/

vi AvgTemperature.java

vi AvgTemperatureMapper.java

vi AvgTemperatureCombiner.java

vi AvgTemperatureReducer.java

clip_image040

AvgTemperature.java

clip_image042

AvgTemperatureMapper.java

clip_image044

AvgTemperatureCombiner.java

clip_image046

AvgTemperatureReducer.java:

clip_image048

4.4.2編譯代碼

/usr/local/hadoop-1.1.2/myclass目錄中,使用如下命令對java代碼進行編譯,為保證編譯成功,加入classpath變量,引入hadoop-core-1.1.2.jar包:

javac -classpath ../hadoop-core-1.1.2.jar *.java

ls

clip_image050

4.4.3打包編譯文件

把編譯好class文件打包,否則在執行過程會發生錯誤。把打好的包移動到上級目錄並刪除編譯好的class文件:

jar cvf ./AvgTemperature.jar ./*.class

ls

mv *.jar ..

rm *.class

clip_image052

4.4.4運行程序

數據使用作業2求每年最低溫度的氣象數據,數據在HDFS位置為/usr/ hadoop/in/sample.txt,以jar的方式啟動MapReduce任務,執行輸出目錄為/usr/hadoop/out1

cd /usr/local/hadoop-1.1.2

hadoop jar AvgTemperature.jar AvgTemperature /usr/hadoop/in/sample.txt  /usr/hadoop/out1

clip_image054

clip_image056

4.4.5查看結果

執行成功后,查看/usr/hadoop/out目錄中是否存在運行結果,使用cat查看結果:

hadoop fs -ls /usr/hadoop/out1

hadoop fs -cat /usr/hadoop/out1/part-r-00000

clip_image058

5    書面作業4:使用Hadoop流求最高溫度(awk腳本)

5.1  書面作業4內容

(選作)使用hadoop流的方法來實現對氣象數據集求最高溫度的分析任務(可能要使用awk這類腳本工具)

5.2  程序代碼

5.2.1執行代碼

hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar \

-input /usr/hadoop/in/sample.txt \

-output /usr/hadoop/out_awk \

-mapper myclass/map.sh \

-reducer myclass/reducer.sh \

-file myclass/map.sh \

-file myclass/reducer.sh

5.2.2mapper.sh

#!/usr/bin/awk -f

BEGIN { FS = "," }

{ year = substr($0, 16, 4) + 0;

  temp = substr($0, 88, 5) + 0;

  q = substr($0, 93, 1);

  LF="\n";

  if(temp != 9999 && q ~ /[01459]/)

  {

    printf "%s %s %s",  year, temp,LF;

  }

}

END {

    printf "\n"

}

5.2.3reducer.sh

#!/usr/bin/awk -f

{

  temp[$1] =  $2

  if ( temp[$1] > maxtemp[$1] )

    maxtemp[$1] = temp[$1];

 

}

END {

  for(num in maxtemp)

    printf "%s %s\n", num, maxtemp[num];

}

5.3  實現過程

5.3.1編寫代碼

進入/usr/local/hadoop-1.1.2/myclass目錄,在該目錄中建立mapper.shreducer.sh代碼文件,代碼內容為5.2所示,執行命令如下:

cd /usr/local/hadoop-1.1.2/myclass/

vi mapper.sh

vi reducer.sh

clip_image060

mapper.sh

clip_image062

reducer.sh

clip_image064

5.3.2運行程序

數據使用作業2求每年最低溫度的氣象數據,數據在HDFS位置為/usr /hadoop/in/sample.txt,啟動MapReduce任務,執行輸出目錄為/usr/hadoop/out_awk

cd /usr/local/hadoop-1.1.2

hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar \

-input /usr/hadoop/in/sample.txt \

-output /usr/hadoop/out_awk \

-mapper myclass/mapper.sh \

-reducer myclass/reducer.sh \

-file myclass/mapper.sh \

-file myclass/reducer.sh

clip_image066

5.3.3查看結果

執行成功后,查看/usr/hadoop/out_awk目錄中是否存在運行結果,使用cat查看結果:

hadoop fs -ls /usr/hadoop/out_awk

hadoop fs -cat /usr/hadoop/out_awk/part-00000

clip_image068

6    書面作業4:使用Hadoop流求最高溫度(Python語言)

6.1  書面作業4內容

(選作)使用hadoop流的方法來實現對氣象數據集求最高溫度的分析任務(可能要使用awk這類腳本工具)

6.2  程序代碼

6.2.1執行代碼

hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar  \

-input /usr/hadoop/in/sample.txt \

-output /usr/hadoop/out_python \

-mapper myclass/mapper.py \

-reducer myclass/reduce.py \

-file myclass/mapper.py \

-file myclass/reduce.py

6.2.2mapper.py

#!/usr/bin/env python

 

import re

import sys

 

for line in sys.stdin:

    val = line.strip()

    (year, temp, q) = (val[15:19], val[87:92], val[92:93])

    if (temp != "+9999" and re.match("[01459]", q)):

        print "%s\t%s" % (year, temp)

6.2.3reducer.py

#!/usr/bin/env python

 

import sys

 

(last_key, max_val) = (None, -sys.maxint)

for line in sys.stdin:

    (key, val) = line.strip().split("\t")

    if last_key and last_key != key:

        print "%s\t%s" % (last_key, max_val)

        (last_key, max_val) = (key, int(val))

    else:

        (last_key, max_val) = (key, max(max_val, int(val)))

 

if last_key:

    print "%s\t%s" % (last_key, max_val)

6.3  實現過程

6.3.1編寫代碼

進入/usr/local/hadoop-1.1.2/myclass目錄,在該目錄中建立mapper.pyreducer.py代碼文件,代碼內容為6.2所示,執行命令如下:

cd /usr/local/hadoop-1.1.2/myclass/

vi mapper.py

vi reducer.py

clip_image070

mapper.py

clip_image072

reducer.py

clip_image074

6.3.2運行程序

數據使用作業2求每年最低溫度的氣象數據,數據在HDFS位置為/usr/hadoop/in/sample.txt,啟動MapReduce任務,執行輸出目錄為/usr/hadoop/out_py

cd /usr/local/hadoop-1.1.2

hadoop jar contrib/streaming/hadoop-streaming-1.1.2.jar \

-input /usr/hadoop/in/sample.txt \

-output /usr/hadoop/out_py \

-mapper myclass/mapper.py \

-reducer myclass/reducer.py \

-file myclass/mapper.py \

-file myclass/reducer.py

clip_image076

6.3.3查看結果

執行成功后,查看/usr/hadoop/out_py目錄中是否存在運行結果,使用cat查看結果:

hadoop fs -ls /usr/hadoop/out_py

hadoop fs -cat /usr/hadoop/out_py/part-00000

clip_image078

 

7    書面作業5MapReduce是否可以自動識別新增節點?

7.1  書面作業5內容

(選作)如果向正在運行的Hadoop集群增加一個新節點,Map-Reduce體系是否可以自動識別並使用這個新節點?如果不能怎樣才可以將其加入到Map-Reduce體系?用實驗進行驗證

7.2  程序代碼

參見作業2代碼並打成jar

7.3  實現過程

在這里將運行本周作業2求每年最低溫度的MapReduce(當然也可以運行求最高溫度等例子),為了能夠更清楚的觀察實驗的效果,在該作業中運行的數據盡可能大,以下實驗中將使用1970~1972年各年份前400個氣象站數據,解壓后大概480M左右。在本實驗中將觀察兩個情況:一種是運行過程新增節點,另一種是重啟集群觀察作業是否運行在新增節點上?

新增節點信息:

序號

IP地址

機器名

類型

用戶名

運行進程

4

10.88.147.224

Hadoop4

數據節點

hadoop

DNTaskTracer

7.3.1環境准備

參考第三周第二題第四個問題--SNNNN分離--進行環境准備,增加一個新節點並確保各節點能夠免密碼SSH訪問

7.3.1.1復制虛擬機

復制DataNode節點所在虛擬機,新虛擬機作為新增節點

clip_image080

7.3.1.2設置新增節點虛擬機IP地址

設置該節點IP地址為:10.88.147.224

clip_image082

7.3.1.3設置新增節點虛擬機名稱

設置新增節點虛擬機名稱為:hadoop4

sudo vi /etc/sysconfig/network

clip_image084

clip_image086

7.3.1.4所有節點hosts 文件加入新增節點的 IP對應信息

在所有節點/etc/hosts文件中加入新增節點的IP地址10.88.147.224對應hadoop4

sudo vi /etc/hosts

clip_image088

7.3.1.5所有節點slavers文件加入新增節點信息

在新增節點hadoop4機器的slaves文件中加入新增節點機器名信息,使用如下命令:

sudo vi /usr/local/hadoop-1.1.2/conf/slaves

slaves文件中加入新增節點機器名

clip_image090

7.3.1.6重啟所有虛擬機

clip_image092

7.3.1.7配置ssh免密碼登錄

1.     hadoop410.88.147.244)節點中使用ssh-keygen -t rsa生成私鑰和公鑰;

clip_image094

2.     hadoop410.88.147.244)節點中公鑰信息加入到authorized_keys文件中;

chmod 400 -R /home/hadoop/.ssh

cat id_rsa.pub >> authorized_keys

cat authorized_keys

clip_image096

3.     authorized_keys分發到各個節點上;

scp authorized_keys hadoop@hadoop1:/home/hadoop/.ssh

clip_image098

4.     驗證是否能夠免登錄到各個節點;

clip_image100

7.3.2准備數據

參考作業2,為了能夠更清楚的觀察實驗的效果,在該作業中運行的數據盡可能大,在這里上傳數據大概480M左右。使用SSH工具(參見第122.1.3.1    Linux文件傳輸工具所描述)把從NCDC下載的氣象數據上傳到上步驟創建的目錄/usr/local/hadoop-1.1.2/input中。

clip_image102

先把這三年的數據放到/usr/local/hadoop-1.1.2/input目錄下,然后使用zcat命令把這些數據文件解壓並合並到一個newnode.txt文件中,合並后把這個文件上傳到HDFS文件系統的/usr/hadoop/in目錄中:

cd /usr/local/hadoop-1.1.2/input

mv 1971/*.gz .

mv 1972/*.gz .

mv 1973/*.gz .

zcat *.gz > newnode.txt

ll newnode.txt

clip_image104

hadoop fs -copyFromLocal newnode.txt /usr/hadoop/in

hadoop fs -ls /usr/hadoop/in

clip_image106

7.3.3不啟動Hadoop4,運行任務

使用本周作業2打包的jar包,啟動作業,執行輸出目錄為/usr/hadoop/out_newnode1

cd /usr/local/hadoop-1.1.2

hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/newnode.txt  /usr/hadoop/out_newnode1

clip_image108

查看運行作業節點信息,Hadoop集群由兩個DataNode節點,分別為Hadoop2Hadoop3

clip_image110

7.3.4啟動Hadoop4,運行任務

再次啟動作業,執行輸出目錄為/usr/hadoop/out_newnode2

cd /usr/local/hadoop-1.1.2

hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/newnode.txt  /usr/hadoop/out_newnode2

clip_image112

運行作業后,啟動新增節點Hadoop4中的DataNodeTaskTracker進程

cd /usr/local/hadoop-1.1.2/bin

hadoop-daemon.sh start datanode

hadoop-daemon.sh start tasktracker

jps

clip_image114

觀察結果,可以看到Hadoop集群識別到Hadoop4節點並發現該節點運行的TaskTracker

clip_image116

clip_image118

在新增節點Hadoop4正在運行task任務

clip_image120

 

7.3.5重啟Hadoop集群,運行任務

停止Hadoop集群運行:

cd /usr/local/hadoop-1.1.2/bin

stop-all.sh

clip_image122

Hadoop4節點中結束DataNodeTaskTracker進程:

cd /usr/local/hadoop-1.1.2/bin

hadoop-daemon.sh stop datanode

hadoop-daemon.sh stop tasktracker

clip_image124

Hadoop1節點中HADOOP_HOME/conf/salvers中加入Hadoop4節點

cd /usr/local/hadoop-1.1.2/conf

sudo vi /usr/local/hadoop-1.1.2/conf/slaves

clip_image126

啟動Hadoop集群,從下圖可以看見Hadoop4節點已經隨Hadoop集群啟動:

cd /usr/local/hadoop-1.1.2/bin

start-all.sh

clip_image128

clip_image130

再次啟動運行作業,可以觀察到Hadoop4節點得到運行任務:

cd /usr/local/hadoop-1.1.2

hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/newnode.txt  /usr/hadoop/out_newnode3

clip_image132

clip_image134

 

7.3.6結論

綜上,當Hadoop集群有新節點加入時,正在運行的MapReduce作業自動識別並使用新節點;當新節點加入Hadoop集群后,再啟動MapReduce作業,MapReduce作業也能自動識別並使用新節點。

8    書面作業6:使用Hadoop公平調度器

8.1  書面作業6內容

(選作)怎樣打開Hadoop的公平調度器?怎樣證實當前處於公平調度下。設計一個實驗方案驗證FIFO調度器和公平調度器的異同,最好能實踐之

8.2  程序代碼

參見作業2代碼並打成jar

8.3  實現過程

在這里將運行本周作業2求每年最低溫度的MapReduce(當然也可以運行求最高溫度等例子),為了能夠更加明顯看到實驗效果該實驗將准備兩份實驗數據:數據A1970~1972年各年份前10個氣象站合並數據,數據B1970~1972年各年份前240個氣象站合並數據,那么分析數據AMapReduceA時間將大大少於分析數據BMapReduceB時間。在實驗過程中將讓分析數據BMapReduceB先啟動,然后啟動分析數據AMapReduceA,觀察MapReduceAMapReduceB哪一個先結束?

8.3.1打開公平調度器

要讓公平調度器能在Hadoop中運行,需要把相應的jar放到CLASSPATH中。在Hadoop1.X以前需要把hadoop-*-fairscheduler.jarHADOOP_HOME/build/contrib/fairscheduler拷貝到HADOOP_HOME/lib,在Hadoop1.1.2版本在HADOOP_HOME/lib已經包含了公平調度器和能力調度器兩個jar

clip_image136

並需要在Hadoop的配置文件HADOOP_CONF_DIR/mapred-site.xml中設置下列屬性讓Hadoop啟用公平調度器:

  <property>

    <name>mapred.jobtracker.taskScheduler</name>

    <value>org.apache.hadoop.mapred.FairScheduler</value>

  </property>

clip_image138

clip_image140

8.3.2驗證啟動公平調度器

重啟集群

cd /usr/local/hadoop-1.1.2/bin

stop-all.sh

start-all.sh

clip_image142

通過jobtracker web UI查看:http://<JobTracker URL>/scheduler,在該實驗地址為http://10.88.147.221:50030/scheduler,如下圖所示

clip_image144

8.3.3准備數據

使用SSH工具(參見第122.1.3.1Linux文件傳輸工具所描述)把從NCDC下載的氣象數據上傳到上步驟創建的目錄/usr/local/hadoop-1.1.2/input中。

clip_image016[1]

使用zcat命令把這些數據文件分別解壓並合並到shortData.txtlongData.txt文件中,合並后把這兩個文件上傳到HDFS文件系統的/usr/hadoop/in目錄中:

cd /usr/local/hadoop-1.1.2/input

zcat *.gz > shortData.txt

zcat *.gz > longData.txt

hadoop fs -copyFromLocal shortData.txt /usr/hadoop/in

hadoop fs -copyFromLocal longData.txt /usr/hadoop/in

hadoop fs -ls /usr/hadoop/in

clip_image146

clip_image148

8.3.4運行分析MapReduce

使用本周作業2打包的jar包,先啟動MapReduceB,然后在另外一個登錄Session中啟動MapReduceA,執行輸出目錄為/usr/hadoop/out_long /usr/hadoop/out_short

cd /usr/local/hadoop-1.1.2

hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/longData.txt  /usr/hadoop/out_long

clip_image150

在另外一個登錄Session運行情況:

cd /usr/local/hadoop-1.1.2

hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/shortData.txt  /usr/hadoop/out_short

clip_image152

8.3.5觀察結果

可以從上步驟結果圖觀察到:

MapReduceB 開始執行時間2014-10-14 16:51:13 結束時間:2014-10-14 16:53:36

MapReduceA 開始執行時間2014-10-14 16:51:20 結束時間:2014-10-14 16:53:28

MapReduceB這個job先運行,但因為這個job處理的數據量較大,而后一個MapReduceAjob處理的數據量較小,所以MapReduceAjob反而較早運行結束,可知公平調度器設置成功。

 

MapReduceB 作業ID為:job_201410141618_0009

MapReduceA 作業ID為:job_201410141618_0010

clip_image154

clip_image156

clip_image158

 

9    問題解決

9.1  在作業5中新增節點后,DataNode無法啟動

通過拷貝Hadoop310.88.147.223)節點,修改名稱后成為Hadoop410.88.147.224)節點,啟動Hadoop3或者Hadoop4DataNode會出現如下錯誤:

clip_image160

2014-10-15 16:28:40,344 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: DataNode is shutting down: org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException: Data node 10.88.147.223:50010 is attempting to report storage ID DS-60030049-10.88.147.223-50010-1411456729315. Node 10.88.147.224:50010 is expected to serve this storage.

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDatanode(FSNamesystem.java:4776)

        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.processReport(FSNamesystem.java:3628)

        at org.apache.hadoop.hdfs.server.namenode.NameNode.blockReport(NameNode.java:1041)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:606)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:578)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1393)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1389)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:415)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1387)

 

        at org.apache.hadoop.ipc.Client.call(Client.java:1107)

        at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229)

        at com.sun.proxy.$Proxy5.blockReport(Unknown Source)

        at org.apache.hadoop.hdfs.server.datanode.DataNode.offerService(DataNode.java:1026)

        at org.apache.hadoop.hdfs.server.datanode.DataNode.run(DataNode.java:1527)

        at java.lang.Thread.run(Thread.java:745)

解決辦法是修改Hadoop4節點中HADOOP_HOME/hdfs/data/current/VERSION文件,把storageIDDS-60030049-10.88.147.223-50010-1411456729315修改成DS-60030049-10.88.147.224-50010-1411456729315即可

clip_image162

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM