Hadoop入門進階課程5--MapReduce原理及操作


本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,博主為石山園,博客地址為 http://www.cnblogs.com/shishanyuan  。該系列課程是應邀實驗樓整理編寫的,這里需要贊一下實驗樓提供了學習的新方式,可以邊看博客邊上機實驗,課程地址為 https://www.shiyanlou.com/courses/237

【注】該系列所使用到安裝包、測試數據和代碼均可在百度網盤下載,具體地址為 http://pan.baidu.com/s/10PnDs,下載該PDF文件

1環境說明

部署節點操作系統為CentOS,防火牆和SElinux禁用,創建了一個shiyanlou用戶並在系統根目錄下創建/app目錄,用於存放Hadoop等組件運行包。因為該目錄用於安裝hadoop等組件程序,用戶對shiyanlou必須賦予rwx權限(一般做法是root用戶在根目錄下創建/app目錄,並修改該目錄擁有者為shiyanlou(chown R shiyanlou:shiyanlou /app)。

Hadoop搭建環境:

l  虛擬機操作系統: CentOS6.6  64位,單核,1G內存

l  JDK1.7.0_55 64

l  Hadoop1.1.2

2MapReduce原理

2.1 MapReduce簡介

MapReduce 是現今一個非常流行的分布式計算框架,它被設計用於並行計算海量數據。第一個提出該技術框架的是Google 公司,而Google 的靈感則來自於函數式編程語言,如LISPSchemeML 等。MapReduce 框架的核心步驟主要分兩部分:Map Reduce。當你向MapReduce 框架提交一個計算作業時,它會首先把計算作業拆分成若干個Map 任務,然后分配到不同的節點上去執行,每一個Map 任務處理輸入數據中的一部分,當Map 任務完成后,它會生成一些中間文件,這些中間文件將會作為Reduce 任務的輸入數據。Reduce 任務的主要目標就是把前面若干個Map 的輸出匯總到一起並輸出。從高層抽象來看,MapReduce的數據流圖如下圖所示:

clip_image001[4]

2.2 MapReduce流程分析

clip_image003[4]

 

2.2.1 Map過程

1. 每個輸入分片會讓一個map任務來處理,默認情況下,以HDFS的一個塊的大小(默認為64M)為一個分片,當然我們也可以設置塊的大小。map輸出的結果會暫且放在一個環形內存緩沖區中(該緩沖區的大小默認為100M,由io.sort.mb屬性控制),當該緩沖區快要溢出時(默認為緩沖區大小的80%,由io.sort.spill.percent屬性控制),會在本地文件系統中創建一個溢出文件,將該緩沖區中的數據寫入這個文件;

2. 在寫入磁盤之前,線程首先根據reduce任務的數目將數據划分為相同數目的分區,也就是一個reduce任務對應一個分區的數據。這樣做是為了避免有些reduce任務分配到大量數據,而有些reduce任務卻分到很少數據,甚至沒有分到數據的尷尬局面。其實分區就是對數據進行hash的過程。然后對每個分區中的數據進行排序,如果此時設置了Combiner,將排序后的結果進行Combia操作,這樣做的目的是讓盡可能少的數據寫入到磁盤;

3. map任務輸出最后一個記錄時,可能會有很多的溢出文件,這時需要將這些文件合並。合並的過程中會不斷地進行排序和combia操作,目的有兩個:

l盡量減少每次寫入磁盤的數據量

l盡量減少下一復制階段網絡傳輸的數據量。最后合並成了一個已分區且已排序的文件。為了減少網絡傳輸的數據量,這里可以將數據壓縮,只要將mapred.compress.map.out設置為true就可以了

4. 將分區中的數據拷貝給相對應的reduce任務。有人可能會問:分區中的數據怎么知道它對應的reduce是哪個呢?其實map任務一直和其父TaskTracker保持聯系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整個集群中的宏觀信息。只要reduce任務向JobTracker獲取對應的map輸出位置就可以了。

2.2.2 Reduce過程

1. Reduce會接收到不同map任務傳來的數據,並且每個map傳來的數據都是有序的。如果reduce端接受的數據量相當小,則直接存儲在內存中(緩沖區大小由mapred.job.shuffle.input.buffer.percent屬性控制,表示用作此用途的堆空間的百分比),如果數據量超過了該緩沖區大小的一定比例(由mapred.job.shuffle.merge.percent決定),則對數據合並后溢寫到磁盤中;

2. 隨着溢寫文件的增多,后台線程會將它們合並成一個更大的有序的文件,這樣做是為了給后面的合並節省時間。其實不管在map端還是reduce端,MapReduce都是反復地執行排序,合並操作;

3. 合並的過程中會產生許多的中間文件(寫入磁盤了),但MapReduce會讓寫入磁盤的數據盡可能地少,並且最后一次合並的結果並沒有寫入磁盤,而是直接輸入到reduce函數。

2.3 MapReduce工作機制剖析

clip_image005[4]

1.在集群中的任意一個節點提交MapReduce程序;

2.JobClient收到作業后,JobClientJobTracker請求獲取一個Job ID

3.將運行作業所需要的資源文件復制到HDFS上(包括MapReduce程序打包的JAR文件、配置文件和客戶端計算所得的輸入划分信息),這些文件都存放在JobTracker專門為該作業創建的文件夾中,文件夾名為該作業的Job ID

4.獲得作業ID后,提交作業;

5.JobTracker接收到作業后,將其放在一個作業隊列里,等待作業調度器對其進行調度,當作業調度器根據自己的調度算法調度到該作業時,會根據輸入划分信息為每個划分創建一個map任務,並將map任務分配給TaskTracker執行;

6.對於mapreduce任務,TaskTracker根據主機核的數量和內存的大小有固定數量的map槽和reduce槽。這里需要強調的是:map任務不是隨隨便便地分配給某個TaskTracker的,這里有個概念叫:數據本地化(Data-Local)。意思是:將map任務分配給含有該map處理的數據塊的TaskTracker上,同時將程序JAR包復制到該TaskTracker上來運行,這叫“運算移動,數據不移動”;

7.TaskTracker每隔一段時間會給JobTracker發送一個心跳,告訴JobTracker它依然在運行,同時心跳中還攜帶着很多的信息,比如當前map任務完成的進度等信息。當JobTracker收到作業的最后一個任務完成信息時,便把該作業設置成“成功”。當JobClient查詢狀態時,它將得知任務已完成,便顯示一條消息給用戶;

8.運行的TaskTrackerHDFS中獲取運行所需要的資源,這些資源包括MapReduce程序打包的JAR文件、配置文件和客戶端計算所得的輸入划分等信息;

9.TaskTracker獲取資源后啟動新的JVM虛擬機;

10.  運行每一個任務;

3測試例子1

3.1 測試例子1內容

下載氣象數據集部分數據,寫一個Map-Reduce作業,求每年的最低溫度

3.2 運行代碼

3.2.1 MinTemperature

 1 import org.apache.hadoop.fs.Path;
 2 import org.apache.hadoop.io.IntWritable;
 3 import org.apache.hadoop.io.Text;
 4 import org.apache.hadoop.mapreduce.Job;
 5 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 6 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 7 
 8 public class MinTemperature {
 9     
10     public static void main(String[] args) throws Exception {
11         if(args.length != 2) {
12             System.err.println("Usage: MinTemperature<input path> <output path>");
13             System.exit(-1);
14         }
15         
16         Job job = new Job();
17         job.setJarByClass(MinTemperature.class);
18         job.setJobName("Min temperature");
19         FileInputFormat.addInputPath(job, new Path(args[0]));
20         FileOutputFormat.setOutputPath(job, new Path(args[1]));
21         job.setMapperClass(MinTemperatureMapper.class);
22         job.setReducerClass(MinTemperatureReducer.class);
23         job.setOutputKeyClass(Text.class);
24         job.setOutputValueClass(IntWritable.class);
25         System.exit(job.waitForCompletion(true) ? 0 : 1);
26     }
27 }

3.2.2 MinTemperatureMapper

 1 import java.io.IOException;
 2 import org.apache.hadoop.io.IntWritable;
 3 import org.apache.hadoop.io.LongWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Mapper;
 6 
 7 public class MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
 8 
 9     private static final int MISSING = 9999;
10     
11     @Override 
12     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
13         
14         String line = value.toString();
15         String year = line.substring(15, 19);
16         
17         int airTemperature;
18         if(line.charAt(87) == '+') {
19             airTemperature = Integer.parseInt(line.substring(88, 92));
20         } else {
21             airTemperature = Integer.parseInt(line.substring(87, 92));
22         }
23         
24         String quality = line.substring(92, 93);
25         if(airTemperature != MISSING && quality.matches("[01459]")) {
26             context.write(new Text(year), new IntWritable(airTemperature));
27         }
28     }
29 }

3.2.3 MinTemperatureReducer

 1 import java.io.IOException;
 2 import org.apache.hadoop.io.IntWritable;
 3 import org.apache.hadoop.io.Text;
 4 import org.apache.hadoop.mapreduce.Reducer;
 5 
 6 public class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 7 
 8     @Override
 9     public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
10         
11         int minValue = Integer.MAX_VALUE;
12         for(IntWritable value : values) {
13             minValue = Math.min(minValue, value.get());
14         }
15         context.write(key, new IntWritable(minValue));
16     }
17 }

3.3 實現過程

3.3.1 編寫代碼

進入/app/hadoop-1.1.2/myclass目錄,在該目錄中建立MinTemperature.javaMinTemperatureMapper.javaMinTemperatureReducer.java代碼文件,執行命令如下:

cd /app/hadoop-1.1.2/myclass/

vi MinTemperature.java

vi MinTemperatureMapper.java

vi MinTemperatureReducer.java

MinTemperature.java

clip_image007[4]

MinTemperatureMapper.java

clip_image009[4]

MinTemperatureReducer.java:

clip_image011[4]

3.3.2 編譯代碼

/app/hadoop-1.1.2/myclass目錄中,使用如下命令對java代碼進行編譯,為保證編譯成功,加入classpath變量,引入hadoop-core-1.1.2.jar包:

javac -classpath ../hadoop-core-1.1.2.jar *.java

clip_image013[4]

3.3.3 打包編譯文件

把編譯好class文件打包,否則在執行過程會發生錯誤。把打好的包移動到上級目錄並刪除編譯好的class文件:

jar cvf ./MinTemperature.jar ./Min*.class

mv *.jar ..

rm Min*.class

clip_image015[4]

3.3.4 解壓氣象數據並上傳到HDFS

NCDC氣象數據解壓,並使用zcat命令把這些數據文件解壓並合並到一個temperature.txt文件中

cd /home/shiyanlou

unzip temperature

cd temperature

zcat *.gz > temperature.txt

clip_image017[4]

clip_image019[4]

氣象數據具體的下載地址為 ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/ ,該數據包括1900年到現在所有年份的氣象數據,大小大概有70多個G,為了測試簡單,我們這里選取一部分的數據進行測試。合並后把這個文件上傳到HDFS文件系統的/class5/in目錄中:

hadoop fs -mkdir -p /class5/in

hadoop fs -copyFromLocal temperature.txt /class5/in

hadoop fs -ls /class5/in

clip_image021[4]

3.3.5 運行程序

jar的方式啟動MapReduce任務,執行輸出目錄為/class5/out

cd /app/hadoop-1.1.2

hadoop jar MinTemperature.jar MinTemperature /class5/in/temperature.txt  /class5/out

clip_image023[4]

3.3.6 查看結果

執行成功后,查看/class5/out目錄中是否存在運行結果,使用cat查看結果(溫度需要除以10):

hadoop fs -ls /class5/out

hadoop fs -cat /class5/out/part-r-00000

clip_image025[4]

3.3.7 通過頁面結果(由於實驗樓環境是命令行界面,以下僅為說明運行過程和結果可以通過界面進行查看)

1.查看jobtracker.jsp

http://XX. XXX.XX.XXX:50030/jobtracker.jsp

clip_image027[4]

查看已經完成的作業任務:

clip_image029[4]

任務的詳細信息:

clip_image031[4]

 

2.  查看dfshealth.jsp

http://XX. XXX.XX.XXX:50070/dfshealth.jsp

clip_image033[4]

分別查看HDFS文件系統和日志

clip_image035[4]

clip_image037[4]

 

4測試例子2

4.1 測試例子2內容

如果求溫度的平均值,能使用combiner嗎?有沒有變通的方法?

4.2 回答

不能直接使用,因為求平均值和前面求最值存在差異,各局部最值的最值還是等於整體的最值的,但是對於平均值而言,各局部平均值的平均值將不再是整體的平均值了,所以不能直接用combiner。可以通過變通的辦法使用combiner來計算平均值,即在combiner的鍵值對中不直接存儲最后的平均值,而是存儲所有值的和個數,最后在reducer輸出時再用和除以個數得到平均值。

4.3 程序代碼

4.3.1 AvgTemperature.java

 1 import org.apache.hadoop.fs.Path;
 2 import org.apache.hadoop.io.IntWritable;
 3 import org.apache.hadoop.io.Text;
 4 import org.apache.hadoop.mapreduce.Job;
 5 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 6 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 7 
 8 public class AvgTemperature {
 9     
10     public static void main(String[] args) throws Exception {
11          
12         if(args.length != 2) {
13             System.out.println("Usage: AvgTemperatrue <input path><output path>");
14             System.exit(-1);
15         }
16         
17         Job job = new Job();
18         job.setJarByClass(AvgTemperature.class);
19         job.setJobName("Avg Temperature");
20         FileInputFormat.addInputPath(job, new Path(args[0]));
21         FileOutputFormat.setOutputPath(job, new Path(args[1]));
22         
23         job.setMapperClass(AvgTemperatureMapper.class);
24         job.setCombinerClass(AvgTemperatureCombiner.class);
25         job.setReducerClass(AvgTemperatureReducer.class);
26         
27         job.setMapOutputKeyClass(Text.class);
28         job.setMapOutputValueClass(Text.class);
29         
30         job.setOutputKeyClass(Text.class);
31         job.setOutputValueClass(IntWritable.class);
32         
33         System.exit(job.waitForCompletion(true) ? 0 : 1);
34     }
35 }

 4.3.2 AvgTemperatureMapper.java

 1 import java.io.IOException;
 2 import org.apache.hadoop.io.IntWritable;
 3 import org.apache.hadoop.io.LongWritable;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Mapper;
 6 
 7 public class AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text> {
 8 
 9     private static final int MISSING = 9999;
10     
11     @Override
12     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
13         
14         String line = value.toString();
15         String year = line.substring(15, 19);
16         
17         int airTemperature;
18         if(line.charAt(87) == '+') {
19             airTemperature = Integer.parseInt(line.substring(88, 92));
20         } else {
21             airTemperature =  Integer.parseInt(line.substring(87, 92));
22         }
23         
24         String quality = line.substring(92, 93);
25         if(airTemperature != MISSING && !quality.matches("[01459]")) {
26             context.write(new Text(year), new Text(String.valueOf(airTemperature)));
27         }
28     }
29 }

 4.3.3 AvgTemperatureCombiner.java

 1 import java.io.IOException;
 2 import org.apache.hadoop.io.Text;
 3 import org.apache.hadoop.mapreduce.Reducer;
 4 
 5 public class AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text>{
 6 
 7     @Override
 8     public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 9         
10         double sumValue = 0;
11         long numValue = 0;
12         
13         for(Text value : values) {
14             sumValue += Double.parseDouble(value.toString());
15             numValue ++;
16         }
17         
18         context.write(key, new Text(String.valueOf(sumValue) + ',' + String.valueOf(numValue)));
19     }
20 }

 4.3.4 AvgTemperatureReducer.java

 1 import java.io.IOException;
 2 import org.apache.hadoop.io.IntWritable;
 3 import org.apache.hadoop.io.Text;
 4 import org.apache.hadoop.mapreduce.Reducer;
 5 
 6 public class AvgTemperatureReducer extends Reducer<Text, Text, Text, IntWritable>{
 7 
 8     @Override
 9     public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
10         
11         double sumValue = 0;
12         long numValue = 0;
13         int avgValue = 0;
14         
15         for(Text value : values) {
16             String[] valueAll = value.toString().split(",");
17             sumValue += Double.parseDouble(valueAll[0]);
18             numValue += Integer.parseInt(valueAll[1]);
19         }
20         
21         avgValue  = (int)(sumValue/numValue);
22         context.write(key, new IntWritable(avgValue));
23     }
24 }

4.4  實現過程

4.4.1 編寫代碼

進入/app/hadoop-1.1.2/myclass目錄,在該目錄中建立AvgTemperature.javaAvgTemperatureMapper.javaAvgTemperatureCombiner.javaAvgTemperatureReducer.java代碼文件,代碼內容為4.3所示,執行命令如下:

cd /app/hadoop-1.1.2/myclass/

vi AvgTemperature.java

vi AvgTemperatureMapper.java

vi AvgTemperatureCombiner.java

vi AvgTemperatureReducer.java

AvgTemperature.java

clip_image039[4]

AvgTemperatureMapper.java

clip_image041[4]

AvgTemperatureCombiner.java

clip_image043[4]

AvgTemperatureReducer.java:

clip_image045[4]

4.4.2 編譯代碼

/app/hadoop-1.1.2/myclass目錄中,使用如下命令對java代碼進行編譯,為保證編譯成功,加入classpath變量,引入hadoop-core-1.1.2.jar包:

javac -classpath ../hadoop-core-1.1.2.jar Avg*.java

clip_image047[4]

4.4.3 打包編譯文件

把編譯好class文件打包,否則在執行過程會發生錯誤。把打好的包移動到上級目錄並刪除編譯好的class文件:

jar cvf ./AvgTemperature.jar ./Avg*.class

ls

mv *.jar ..

rm Avg*.class

clip_image049[4]

4.4.4 運行程序

數據使用作業2求每年最低溫度的氣象數據,數據在HDFS位置為/class5/in/temperature.txt,以jar的方式啟動MapReduce任務,執行輸出目錄為/class5/out2

cd /app/hadoop-1.1.2

hadoop jar AvgTemperature.jar AvgTemperature /class5/in/temperature.txt /class5/out2

clip_image051[4]

4.4.5 查看結果

執行成功后,查看/class5/out2目錄中是否存在運行結果,使用cat查看結果(溫度需要除以10):

hadoop fs -ls /class5/out2

hadoop fs -cat /class5/out2/part-r-00000

clip_image053[4]


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM