Hadoop-MR[會用]MR程序的運行模式


1.簡介
  現在很少用到使用MR計算框架來實現功能,通常的做法是使用hive等工具輔助完成。
但是對於其底層MR的原理還是有必要做一些了解。

 

2.MR客戶端程序實現套路

  這一小節總結歸納編寫mr客戶端程序的一般流程和套路。將以wordcount為例子進行理解。

  運行一個mr程序有三種模式,分別為:本地模式,本地集群模式,命令行集群模式

 

3.代碼實現

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 新API中對job提交類的建議寫法
 *
 */
public class WordCountDriver extends Configured implements Tool{
    
    /**
     * 在run方法中對job進行封裝
     */
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        
        //先構造一個用來提交我們的業務程序的一個信息封裝對象
        Job job = Job.getInstance(conf);
        
        //指定本job所采用的mapper類
        job.setMapperClass(WordCountMapper.class);
        //指定本job所采用的reducer類
        job.setReducerClass(WordCountReducer.class);
        
        //指定我們的mapper類輸出的kv數據類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        
        //指定我們的reducer類輸出的kv數據類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        
        //指定我們要處理的文件所在的路徑
        FileInputFormat.setInputPaths(job, new Path("/Users/apple/Desktop/temp/data/input/"));
        
        //指定我們的輸出結果文件所存放的路徑
        FileOutputFormat.setOutputPath(job, new Path("/Users/apple/Desktop/temp/data/output"));

        return job.waitForCompletion(true)? 0:1;
    }
    
    public static void main(String[] args) throws Exception {
        
        int res = ToolRunner.run(new Configuration(), new WordCountDriver(), args);
        System.exit(res);
        
        
    }
    
    //在hadoop中,普通的java類不適合做網絡序列化傳輸,hadoop對java的類型進行了封裝,以便於利用hadoop的序列化框架進行序列化傳輸
    public static class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        
        /**
         * map方法是每讀一行調用一次
         */
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {

            //拿到一行的內容
            String line = value.toString();
            //切分出一行中所有的單詞
            String[] words = line.split(" ");
            //輸出<word,1>這種KV對
            for(String word:words){
                //遍歷單詞數組,一對一對地輸出<hello,1>  <tom,1> .......
                context.write(new Text(word), new LongWritable(1));
                
            }
        }
    }
    
    public static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
        /**
         * reduce方法是每獲得一個<key,valueList>,執行一次
         */
        //key : 某一個單詞 ,比如  hello
        //values:  這個單詞的所有v,  封裝在一個迭代器中,可以理解為一個list{1,1,1,1.....}
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values,Context context)
                throws IOException, InterruptedException {
            
            long count = 0;
            //遍歷該key的valuelist,將所有value累加到計數器中去
            for(LongWritable value:values){
                count += value.get();
                
            }
            
            context.write(key, new LongWritable(count));
        }
        

    }    
}

 

    

 

3. 本地模式運行                                                                                                                  

     使用eclipse編完代碼后直接即可運行,但是此種運行只發生在本地,並不會被提交到集群環境運行,換句話說在yarn的web上是無法查詢到這個任務的。

  這種模式的好處在於可以方便的debug。

    在此種模式下輸入和輸出的路徑可以指定為本地路徑,也可以指定為hdfs路徑。如果使用本地路徑則上述代碼即可執行。當指定為hdfs路且hdfs集群的配置為hadoop2.x的主備

  模式的話則需要引入hdfs-site.xml文件(因為主備模式下hdfs的url是一個service,需要通過配置文件才能解析這個url):

  下述例子為指定hdfs路徑為輸入輸出源頭,需要引入xml文件到classpath

        //指定我們要處理的文件所在的路徑
        FileInputFormat.setInputPaths(job, new Path("hdfs://ns1/wordcountData/input"));
        
        //指定我們的輸出結果文件所存放的路徑
        FileOutputFormat.setOutputPath(job, new Path("hdfs://ns1/wordcountData/output"));

 

  input路徑下的文件內容為:

[hadoop@xufeng-1 temp]$ hadoop fs -cat /wordcountData/input/words.txt
16/07/28 18:09:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
aaa bbb
ccc ddd
aaa ccc
ddd eee
eee
ggg ggg hhh
aaa
[hadoop@xufeng-1 temp]$

 

  通過eclipse啟動的時候會有權限問題,可以在vm中指定用戶名:

  

  啟動程序,在日志中我們可以看到當前mr是通過本地模式執行的,在查看yarn的監控web,並沒有這個任務的記錄。

2016-08-17 11:54:02,291 INFO  [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for map tasks

  在輸出文件夾中查看結果:

[hadoop@xufeng-1 temp]$ hadoop fs -ls /wordcountData/output
16/07/28 18:22:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2016-07-28 17:58 /wordcountData/output/_SUCCESS
-rw-r--r--   1 hadoop supergroup         42 2016-07-28 17:58 /wordcountData/output/part-r-00000
[hadoop@xufeng-1 temp]$ hadoop fs -cat /wordcountData/output/part-r-00000
16/07/28 18:22:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
aaa    3
bbb    1
ccc    2
ddd    2
eee    2
ggg    2
hhh    1
[hadoop@xufeng-1 temp]$

 

 

4. 本地集群模式運行                                                                                                                                  

  在eclipse中我們可以直接讓程序在集群中運行(如yarn集群)上運行,免去打包等繁瑣工作,要想讓本地運行的關鍵需要引入mapred-site.xml 和yarn-site.xml文件

  目的是讓本地程序知道當前mr是在什么框架下執行的,並且要知道集群的信息。

  由於如下原因暫未解決:

 

Diagnostics: File file:/tmp/hadoop-yarn/staging/apple/.staging/job_1469738198989_0009/job.splitmetainfo does not exist
java.io.FileNotFoundException: File file:/tmp/hadoop-yarn/staging/apple/.staging/job_1469738198989_0009/job.splitmetainfo does not exist
    at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:534)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)
    at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)

 

5. 命令行集群模式運行                                                                                                                                  

  這種模式的運行既是將程序打成jar文件后,放到集群環境上去,通過hadoop jar命令來運行,這模式下運行的任務將運行在集群上。

  這種模式非常簡單,但是需要在run()方法中指定:

        job.setJarByClass(WordCountDriver.class);

  否則會出現mapper類無法找到的錯誤。通過這個模式我們無需使用任何配置文件,在eclipse中將程序打包后傳上集群主機。使用如下命令即可執行:

hadoop jar wordcount.jar WordCountDriver

  運行日志:

16/07/28 22:31:21 INFO mapreduce.Job:  map 0% reduce 0%
16/07/28 22:31:28 INFO mapreduce.Job:  map 100% reduce 0%
16/07/28 22:31:39 INFO mapreduce.Job:  map 100% reduce 100%
16/07/28 22:31:39 INFO mapreduce.Job: Job job_1469738198989_0014 completed successfully
16/07/28 22:31:39 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=188
        FILE: Number of bytes written=221659
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=157

 

  查看yarn上的監控web:

集中模式的介紹完畢。

 

 

 

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM