MapReduce本地運行模式wordcount實例(附:MapReduce原理簡析)


1.      環境配置

a)        配置系統環境變量HADOOP_HOME

 

 

 

b)        hadoop.dll文件放到c:/windows/System32目錄下

 

 

c)        hadoop-2.6.0\share\hadoop\common\sources目錄下hadoop-common-2.6.0-sources.jar文件中找到org\apache\hadoop\io\nativeioNativeIO.java文件,復制到對應的Eclipseproject NativeIO.java文件還要在原來的包名下

 

 

 

d)        修改此文件的557行,替換為return true

 


e)        在主機中配置虛擬機的IP和用戶名

 


 


f)         以管理員身份運行eclipse

 

 

2.      代碼(以wordcount為例)

a)        MapReduceMapReduce兩部分,加上測試,一共三部分

 


b)        Map里主要解決文件分割的問題;

package com.hadoop.hdfs.api.test.mr.wc;

 

import java.io.IOException;

 

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

/*

 * KEYIN, VALUEIN, 輸入的key-value數據類型

 *

 * KEYOUT, VALUEOUT 輸出的key-value數據類型

 *

*/

public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

     

/*   key:輸入的key值,偏移量

      value:輸入的value,一行的內容

*/

      @Override

      protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)

                    throws IOException, InterruptedException {

            

             //獲取一行的內容

             String linestr=value.toString();

 

             String[] words=linestr.split(" ");//正則表達式怎么實現這些東西?

             for (String word : words) {

                    //輸出寫

                    context.write(new Text(word), new LongWritable(1));

             }

            

      }

}

 

 

c)        接收map的結果,然后整合輸出

package com.hadoop.hdfs.api.test.mr.wc;

 

import java.io.IOException;

import java.util.Iterator;

 

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

/*

 * org.apache.hadoop.mapreduceHadoop2api

*/

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

 

      @Override

      protected void reduce(Text key, Iterable<LongWritable> values,

                    Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {

 

                    Iterator<LongWritable>iter=values.iterator();

                    long sum=0;

                    while(iter.hasNext()){

                           sum+=iter.next().get();

                    }

                    context.write(key, new LongWritable(sum));

      }

}

d)        運行文件里只需要配置好路徑即可

package com.hadoop.hdfs.api.test.mr.wc;

 

 

 

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class WCRunner {

 

      public static void main(String[] args) {

            

             Configuration conf =new Configuration();

/*          conf.set("hadoop.tmp.dir", "j:/tmp/tmpData");*/

             try {

                    Job job=Job.getInstance(conf);

                    //指定main方法所在的類

                    job.setJarByClass(WCRunner.class);

                    //指定mapreduce

                    job.setMapperClass(WCMapper.class);

                    job.setReducerClass(WCReducer.class);

                    //指定map的輸出keyvalue的數據類型

                    job.setMapOutputKeyClass(Text.class);

                    job.setMapOutputValueClass(LongWritable.class);

                    //指定reduce的輸出keyvalue的數據類型

                    job.setOutputKeyClass(Text.class);

                    job.setOutputValueClass(LongWritable.class);

                    //指定輸入的文件目錄,這里可以是文件,也可以是目錄

                    FileInputFormat.setInputPaths(job, new Path(args[0]));

                    //指定輸出的文件目錄,這里只能是目錄,不能是文件

                    FileOutputFormat.setOutputPath(job, new Path(args[1]));

                   

                    //執行job

                    job.waitForCompletion(true);

                   

             } catch (Exception e) {

                    // TODO Auto-generated catch block

                    e.printStackTrace();

             }

 

      }

 

}

 

注:數據類型

       Value Text類型 Key LongWritable類型

       特別聲明:Text的包需要特別注意

       

 

 

 

運行前,傳一下路徑

 

 

Input是讀文件的路徑,里面的文件就是我們要讀的

Outputreduce生成文件存放的地方

特別聲明:input路徑必須存在,output必須是不存在的

特特特別聲明:用戶名一定要注意,不要有空格!!!

 

 

 

MapReduce原理

MapReduce分為兩大部分,Map(抓取數據、數據分割)和Reduce(處理數據,數據整合,上傳數據)。

 

從單文件看MapReduce

1.      HDFS上讀取一個文件

2.      為本地主機分配一個Map任務

3.      Map作業從輸入數據中抽取出鍵值對

4.      每一個鍵值對都作為參數傳遞給map函數,map函數產生的中間鍵值對被緩存在內存中。

5.      上一階段中解析出來的每一個鍵值對,調用一次map方法。如果有1000個鍵值對,就會調用1000map方法。每一次調用map方法會輸出零個或者多個鍵值對。

6.      為本地主機分配一個Reduce任務

7.      Reduce任務讀取Map任務產生的中間值並排序(因為Map任務產生的鍵值對可能映射到不同的分區中,當然本地只有一個分區,所以要排序),排序的目的是使相同鍵的鍵值對聚集在一起。

8.      遍歷排序后中間鍵值對,將具有相同鍵的鍵值對調用一次reduce方法,對每個不同的鍵分別調用一次reduce方法。

9.      reduce函數產生的輸出會添加到這個分區的輸出文件中。

 

從集群上看MapReduce

1.      HDFS上的文件分塊,如需要輸入的文件為100MB200MB時,因為塊大小為128MB,所以共分為三塊,塊一:100MB;塊二:128MB;塊三:72MB

每個塊對應一個Map,需要三個Map進程來處理

2.      為集群上空閑的機器分配Map任務,被分配了Map作業的機器,開始讀取對應分片的輸入數據

3.      與單文件過程類似

4.      Map產生的中間鍵值對分為N個區保存在本地中,每個區對應一個Reduce任務,將這N個區的位置報告給集群中負責調度的機器,讓其將位置信息轉發給已分配好Reduce任務的機器。

5.      Reduce任務的機器從剛獲取的地址處,讀取中間鍵值對,然后與單文件類似

6.      所有執行完畢后,MapReduce輸出放在了N個分區的輸出文件中(分別對應一個Reduce作業)。用戶通常並不需要合並這N個文件,而是將其作為輸入交給另一個MapReduce程序處理。整個過程中,輸入數據是來自底層分布式文件系統(GFS)的,中間數據是放在本地文件系統的,最終輸出數據是寫入底層分布式文件系統(GFS)的。

 

 

 

注:關於MapReduce之間的數據傳輸過程,MapReduce的核心Shuffle,現在知識有限。只知道它的作用,不知道為什么作用,希望過幾天可以整理一下


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM