Mapreduce實例——排序


原理

Map、Reduce任務中Shuffle和排序的過程圖如下:

流程分析:

1.Map端:

(1)每個輸入分片會讓一個map任務來處理,默認情況下,以HDFS的一個塊的大小(默認為64M)為一個分片,當然我們也可以設置塊的大小。map輸出的結果會暫且放在一個環形內存緩沖區中(該緩沖區的大小默認為100M,由io.sort.mb屬性控制),當該緩沖區快要溢出時(默認為緩沖區大小的80%,由io.sort.spill.percent屬性控制),會在本地文件系統中創建一個溢出文件,將該緩沖區中的數據寫入這個文件。

(2)在寫入磁盤之前,線程首先根據reduce任務的數目將數據划分為相同數目的分區,也就是一個reduce任務對應一個分區的數據。這樣做是為了避免有些reduce任務分配到大量數據,而有些reduce任務卻分到很少數據,甚至沒有分到數據的尷尬局面。其實分區就是對數據進行hash的過程。然后對每個分區中的數據進行排序,如果此時設置了Combiner,將排序后的結果進行Combia操作,這樣做的目的是讓盡可能少的數據寫入到磁盤。

(3)當map任務輸出最后一個記錄時,可能會有很多的溢出文件,這時需要將這些文件合並。合並的過程中會不斷地進行排序和combia操作,目的有兩個:①盡量減少每次寫入磁盤的數據量。②盡量減少下一復制階段網絡傳輸的數據量。最后合並成了一個已分區且已排序的文件。為了減少網絡傳輸的數據量,這里可以將數據壓縮,只要將mapred.compress.map.out設置為true就可以了。

(4)將分區中的數據拷貝給相對應的reduce任務。有人可能會問:分區中的數據怎么知道它對應的reduce是哪個呢?其實map任務一直和其父TaskTracker保持聯系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整個集群中的宏觀信息。只要reduce任務向JobTracker獲取對應的map輸出位置就ok了哦。

到這里,map端就分析完了。那到底什么是Shuffle呢?Shuffle的中文意思是"洗牌",如果我們這樣看:一個map產生的數據,結果通過hash過程分區卻分配給了不同的reduce任務,是不是一個對數據洗牌的過程呢?

2.Reduce端:

(1)Reduce會接收到不同map任務傳來的數據,並且每個map傳來的數據都是有序的。如果reduce端接受的數據量相當小,則直接存儲在內存中(緩沖區大小由mapred.job.shuffle.input.buffer.percent屬性控制,表示用作此用途的堆空間的百分比),如果數據量超過了該緩沖區大小的一定比例(由mapred.job.shuffle.merge.percent決定),則對數據合並后溢寫到磁盤中。

(2)隨着溢寫文件的增多,后台線程會將它們合並成一個更大的有序的文件,這樣做是為了給后面的合並節省時間。其實不管在map端還是reduce端,MapReduce都是反復地執行排序,合並操作,現在終於明白了有些人為什么會說:排序是hadoop的靈魂。

(3)合並的過程中會產生許多的中間文件(寫入磁盤了),但MapReduce會讓寫入磁盤的數據盡可能地少,並且最后一次合並的結果並沒有寫入磁盤,而是直接輸入到reduce函數。

熟悉MapReduce的人都知道:排序是MapReduce的天然特性!在數據達到reducer之前,MapReduce框架已經對這些數據按鍵排序了。但是在使用之前,首先需要了解它的默認排序規則。它是按照key值進行排序的,如果key為封裝的int為IntWritable類型,那么MapReduce按照數字大小對key排序,如果Key為封裝String的Text類型,那么MapReduce將按照數據字典順序對字符排序。

了解了這個細節,我們就知道應該使用封裝int的Intwritable型數據結構了,也就是在map這里,將讀入的數據中要排序的字段轉化為Intwritable型,然后作為key值輸出(不排序的字段作為value)。reduce階段拿到<key,value-list>之后,將輸入的key作為的輸出key,並根據value-list中的元素的個數決定輸出的次數。

實驗環境

Linux Ubuntu 14.04

jdk-7u75-linux-x64

hadoop-2.6.0-cdh5.4.5

hadoop-2.6.0-eclipse-cdh5.4.5.jar

eclipse-java-juno-SR2-linux-gtk-x86_64

實驗內容

在電商網站上,當我們進入某電商頁面里瀏覽商品時,就會產生用戶對商品訪問情況的數據 ,名為goods_visit1,goods_visit1中包含(商品id ,點擊次數)兩個字段,內容以"\t"分割,由於數據量很大,所以為了方便統計我們只截取它的一部分數據,內容如下:

view plain copy

  1. 商品id  點擊次數  
  2. 1010037 100  
  3. 1010102 100  
  4. 1010152 97  
  5. 1010178 96  
  6. 1010280 104  
  7. 1010320 103  
  8. 1010510 104  
  9. 1010603 96  
  10. 1010637 97  

要求我們編寫mapreduce程序來對商品點擊次數有低到高進行排序。

實驗結果數據如下:

view plain copy

  1. 點擊次數 商品ID  
  2. 96  1010603  
  3. 96  1010178  
  4. 97  1010637  
  5. 97  1010152  
  6. 100 1010102  
  7. 100 1010037  
  8. 103 1010320  
  9. 104 1010510  
  10. 104 1010280  

實驗步驟

1.切換到/apps/hadoop/sbin目錄下,開啟Hadoop。

view plain copy

  1. cd /apps/hadoop/sbin  
  2. ./start-all.sh  

2.在Linux本地新建/data/mapreduce3目錄。

view plain copy

  1. mkdir -p /data/mapreduce3  

3.在Linux中切換到/data/mapreduce3目錄下,用wget命令從http://192.168.1.100:60000/allfiles/mapreduce3/goods_visit1網址上下載文本文件goods_visit1。

view plain copy

  1. cd /data/mapreduce3  
  2. wget http://192.168.1.100:60000/allfiles/mapreduce3/goods_visit1  

然后在當前目錄下用wget命令從http://192.168.1.100:60000/allfiles/mapreduce3/hadoop2lib.tar.gz網址上下載項目用到的依賴包。

view plain copy

  1. wget http://192.168.1.100:60000/allfiles/mapreduce3/hadoop2lib.tar.gz  

將hadoop2lib.tar.gz解壓到當前目錄下。

view plain copy

  1. tar zxvf hadoop2lib.tar.gz  

4.首先在HDFS上新建/mymapreduce3/in目錄,然后將Linux本地/data/mapreduce3目錄下的goods_visit1文件導入到HDFS的/mymapreduce3/in目錄中。

view plain copy

  1. hadoop fs -mkdir -p /mymapreduce3/in  
  2. hadoop fs -put /data/mapreduce3/goods_visit1 /mymapreduce3/in  

5.新建Java Project項目,項目名為mapreduce3。

在mapreduce3項目下新建包,包名為mapreduce。

在mapreduce包下新建類,類名為OneSort。

6.添加項目所需依賴的jar包,右鍵單擊項目新建一個文件夾,名為hadoop2lib,用於存放項目所需的jar包。

將/data/mapreduce3目錄下hadoop2lib文件夾中的所有jar包,拷貝到eclipse中mapreduce3項目的hadoop2lib目錄下。

選中hadoop2lib目錄下所有jar包,單擊右鍵,選擇Build Path→Add to Build Path。

7.編寫Java代碼,並描述其設計思路

在MapReduce過程中默認就有對數據的排序。它是按照key值進行排序的,如果key為封裝int的IntWritable類型,那么MapReduce會按照數字大小對key排序,如果Key為封裝String的Text類型,那么MapReduce將按照數據字典順序對字符排序。在本例中我們用到第一種,key設置為IntWritable類型,其中MapReduce程序主要分為Map部分和Reduce部分。

Map部分代碼

view plain copy

  1. public static class Map extends Mapper<Object,Text,IntWritable,Text>{  
  2.         private static Text goods=new Text();  
  3.         private static IntWritable num=new IntWritable();  
  4.         public void map(Object key,Text value,Context context) throws IOException, InterruptedException{  
  5.             String line=value.toString();  
  6.             String arr[]=line.split("\t");  
  7.             num.set(Integer.parseInt(arr[1]));  
  8.             goods.set(arr[0]);  
  9.             context.write(num,goods);  
  10.         }  
  11.     }  

在map端采用Hadoop默認的輸入方式之后,將輸入的value值用split()方法截取,把要排序的點擊次數字段轉化為IntWritable類型並設置為key,商品id字段設置為value,然后直接輸出<key,value>。map輸出的<key,value>先要經過shuffle過程把相同key值的所有value聚集起來形成<key,value-list>后交給reduce端。

Reduce部分代碼

view plain copy

  1. public static class Reduce extends Reducer<IntWritable,Text,IntWritable,Text>{  
  2.         private static IntWritable result= new IntWritable();  
  3.                  //聲明對象result  
  4.         public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{  
  5.     for(Text val:values){  
  6.     context.write(key,val);  
  7.     }  
  8.     }  
  9.     }  

reduce端接收到<key,value-list>之后,將輸入的key直接復制給輸出的key,用for循環遍歷value-list並將里面的元素設置為輸出的value,然后將<key,value>逐一輸出,根據value-list中元素的個數決定輸出的次數。

完整代碼

view plain copy

  1. package mapreduce;  
  2. import java.io.IOException;  
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Job;  
  8. import org.apache.hadoop.mapreduce.Mapper;  
  9. import org.apache.hadoop.mapreduce.Reducer;  
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  11. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  13. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  14. public class OneSort {  
  15.     public static class Map extends Mapper<Object , Text , IntWritable,Text >{  
  16.     private static Text goods=new Text();  
  17.     private static IntWritable num=new IntWritable();  
  18.     public void map(Object key,Text value,Context context) throws IOException, InterruptedException{  
  19.     String line=value.toString();  
  20.     String arr[]=line.split("\t");  
  21.     num.set(Integer.parseInt(arr[1]));  
  22.     goods.set(arr[0]);  
  23.     context.write(num,goods);  
  24.     }  
  25.     }  
  26.     public static class Reduce extends Reducer< IntWritable, Text, IntWritable, Text>{  
  27.     private static IntWritable result= new IntWritable();  
  28.     public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{  
  29.         for(Text val:values){  
  30.         context.write(key,val);  
  31.         }  
  32.         }  
  33.         }  
  34.         public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{  
  35.         Configuration conf=new Configuration();  
  36.         Job job =new Job(conf,"OneSort");  
  37.         job.setJarByClass(OneSort.class);  
  38.         job.setMapperClass(Map.class);  
  39.         job.setReducerClass(Reduce.class);  
  40.         job.setOutputKeyClass(IntWritable.class);  
  41.         job.setOutputValueClass(Text.class);  
  42.         job.setInputFormatClass(TextInputFormat.class);  
  43.         job.setOutputFormatClass(TextOutputFormat.class);  
  44.         Path in=new Path("hdfs://localhost:9000/mymapreduce3/in/goods_visit1");  
  45.         Path out=new Path("hdfs://localhost:9000/mymapreduce3/out");  
  46.         FileInputFormat.addInputPath(job,in);  
  47.         FileOutputFormat.setOutputPath(job,out);  
  48.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  49.     
  50.         }  
  51.         }  

8.在OneSort類文件中,右鍵並點擊=>Run As=>Run on Hadoop選項,將MapReduce任務提交到Hadoop中。

9.待執行完畢后,進入命令模式下,在HDFS上/mymapreduce3/out中查看實驗結果。

view plain copy

  1. hadoop fs -ls /mymapreduce3/out  
  2. hadoop fs -cat /mymapreduce3/out/part-r-00000  

提交保存

窗體頂端

窗體底端

發送消息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM