一臉懵逼學習MapReduce的原理和編程(Map局部處理,Reduce匯總)和MapReduce幾種運行方式


1:MapReduce的概述:

  (1):MapReduce是一種分布式計算模型,由Google提出,主要用於搜索領域,解決海量數據的計算問題.
  (2):MapReduce由兩個階段組成:Map和Reduce,用戶只需要實現map()和reduce()兩個函數,即可實現分布式計算,非常簡單。
  (3):這兩個函數的形參是key、value對,表示函數的輸入信息。

2:MapReduce執行步驟:

  (1): map任務處理

    (a):讀取輸入文件內容,解析成key、value對。對輸入文件的每一行,解析成key、value對。每一個鍵值對調用一次map函數。
    (b):寫自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
  (2)reduce任務處理

    (a)在reduce之前,有一個shuffle的過程對多個map任務的輸出進行合並、排序。
    (b)寫reduce函數自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
    (c)把reduce的輸出保存到文件中。
       例子:實現WordCountApp
3:map、reduce鍵值對格式:

 4:MapReduce流程:
  (1)代碼編寫
  (2)作業配置
  (3)提交作業
  (4)初始化作業
  (5)分配任務
  (6)執行任務
  (7)更新任務和狀態
  (8)完成作業


5:MapReduce介紹及wordcount和wordcount的編寫和提交集群運行的案例:

WcMap類進行單詞的局部處理:

 1 package com.mapreduce;
 2 
 3 
 4 import java.io.IOException;
 5 
 6 import org.apache.commons.lang.StringUtils;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 
11 /***
12  * 
13  * @author Administrator
14  * 1:4個泛型中,前兩個是指定mapper輸入數據的類型,KEYIN是輸入的key的類型,VALUEIN是輸入的value的值
15  *       KEYOUT是輸入的key的類型,VALUEOUT是輸入的value的值 
16  * 2:map和reduce的數據輸入和輸出都是以key-value的形式封裝的。
17  * 3:默認情況下,框架傳遞給我們的mapper的輸入數據中,key是要處理的文本中一行的起始偏移量,這一行的內容作為value
18  * 4:key-value數據是在網絡中進行傳遞,節點和節點之間互相傳遞,在網絡之間傳輸就需要序列化,但是jdk自己的序列化很冗余
19  *    所以使用hadoop自己封裝的數據類型,而不要使用jdk自己封裝的數據類型;
20  *    Long--->LongWritable
21  *    String--->Text    
22  */
23 public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{
24 
25     //重寫map這個方法
26     //mapreduce框架每讀一行數據就調用一次該方法
27     @Override
28     protected void map(LongWritable key, Text value, Context context)
29             throws IOException, InterruptedException {
30         //具體業務邏輯就寫在這個方法體中,而且我們業務要處理的數據已經被框架傳遞進來,在方法的參數中key-value
31         //key是這一行數據的起始偏移量,value是這一行的文本內容
32         
33         //1:切分單詞,首先拿到單詞value的值,轉化為String類型的
34         String str = value.toString();
35         //2:切分單詞,空格隔開,返回切分開的單詞
36         String[] words = StringUtils.split(str," ");
37         //3:遍歷這個單詞數組,輸出為key-value的格式,將單詞發送給reduce
38         for(String word : words){
39             //輸出的key是Text類型的,value是LongWritable類型的
40             context.write(new Text(word), new LongWritable(1));
41         }
42         
43         
44     }
45 }

WcReduce進行單詞的計數處理:

 1 package com.mapreduce;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8 
 9 /***
10  * 
11  * @author Administrator
12  * 1:reduce的四個參數,第一個key-value是map的輸出作為reduce的輸入,第二個key-value是輸出單詞和次數,所以
13  *      是Text,LongWritable的格式;
14  */
15 public class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
16 
17     //繼承Reducer之后重寫reduce方法
18     //第一個參數是key,第二個參數是集合。
19     //框架在map處理完成之后,將所有key-value對緩存起來,進行分組,然后傳遞一個組<key,valus{}>,調用一次reduce方法
20     //<hello,{1,1,1,1,1,1.....}>
21     @Override
22     protected void reduce(Text key, Iterable<LongWritable> values,Context context) 
23             throws IOException, InterruptedException {
24         //將values進行累加操作,進行計數
25         long count = 0;
26         //遍歷value的list,進行累加求和
27         for(LongWritable value : values){
28             
29             count += value.get();
30         }
31         
32         //輸出這一個單詞的統計結果
33         //輸出放到hdfs的某一個目錄上面,輸入也是在hdfs的某一個目錄
34         context.write(key, new LongWritable(count));
35     }
36     
37     
38 }

WcRunner用來描述一個特定的作業

 1 package com.mapreduce;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12 
13 
14 /***
15  * 1:用來描述一個特定的作業
16  *       比如,該作業使用哪個類作為邏輯處理中的map,那個作為reduce
17  * 2:還可以指定該作業要處理的數據所在的路徑
18  *        還可以指定改作業輸出的結果放到哪個路徑
19  * @author Administrator
20  *
21  */
22 public class WcRunner {
23 
24     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
25         //創建配置文件
26         Configuration conf = new Configuration();
27         //獲取一個作業
28         Job job = Job.getInstance(conf);
29         
30         //設置整個job所用的那些類在哪個jar包
31         job.setJarByClass(WcRunner.class);
32         
33         //本job使用的mapper和reducer的類
34         job.setMapperClass(WcMap.class);
35         job.setReducerClass(WcReduce.class);
36         
37         //指定reduce的輸出數據key-value類型
38         job.setOutputKeyClass(Text.class);
39         job.setOutputValueClass(LongWritable.class);
40         
41         
42         //指定mapper的輸出數據key-value類型
43         job.setMapOutputKeyClass(Text.class);
44         job.setMapOutputValueClass(LongWritable.class);
45         
46         //指定要處理的輸入數據存放路徑
47         FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/wc/srcdata"));
48         
49         //指定處理結果的輸出數據存放路徑
50         FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/wc/output"));
51         
52         //將job提交給集群運行 
53         job.waitForCompletion(true);
54     }
55     
56 }

書寫好上面的三個類以后打成jar包上傳到虛擬機上面進行運行:

然后啟動你的hadoop集群:start-dfs.sh和start-yarn.sh啟動集群;然后將jar分發到節點上面進行運行;

之前先造一些數據,如下所示:

內容自己隨便搞吧:

 然后上傳到hadoop集群上面,首選創建目錄,存放測試數據,將數據上傳到創建的目錄即可;但是輸出目錄不需要手動創建,會自動創建,自己創建會報錯:

然后將jar分發到節點上面進行運行;命令格式如hadoop    jar   自己的jar包   主類的路徑

 正常性運行完過后可以查看一下運行的效果:

6:MapReduce的本地模式運行如下所示(本地運行需要修改輸入數據存放路徑和輸出數據存放路徑):

 1 package com.mapreduce;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.LongWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12 
13 
14 /***
15  * 1:用來描述一個特定的作業
16  *       比如,該作業使用哪個類作為邏輯處理中的map,那個作為reduce
17  * 2:還可以指定該作業要處理的數據所在的路徑
18  *        還可以指定改作業輸出的結果放到哪個路徑
19  * @author Administrator
20  *
21  */
22 public class WcRunner {
23 
24     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
25         //創建配置文件
26         Configuration conf = new Configuration();
27         //獲取一個作業
28         Job job = Job.getInstance(conf);
29         
30         //設置整個job所用的那些類在哪個jar包
31         job.setJarByClass(WcRunner.class);
32         
33         //本job使用的mapper和reducer的類
34         job.setMapperClass(WcMap.class);
35         job.setReducerClass(WcReduce.class);
36         
37         //指定reduce的輸出數據key-value類型
38         job.setOutputKeyClass(Text.class);
39         job.setOutputValueClass(LongWritable.class);
40         
41         
42         //指定mapper的輸出數據key-value類型
43         job.setMapOutputKeyClass(Text.class);
44         job.setMapOutputValueClass(LongWritable.class);
45         
46         //指定要處理的輸入數據存放路徑
47         //FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/wc/srcdata/"));
48         FileInputFormat.setInputPaths(job, new Path("d:/wc/srcdata/"));
49         
50         
51         //指定處理結果的輸出數據存放路徑
52         //FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/wc/output/"));
53         FileOutputFormat.setOutputPath(job, new Path("d:/wc/output/"));
54         
55         
56         //將job提交給集群運行 
57         job.waitForCompletion(true);
58     }
59     
60 }

然后去自己定義的盤里面創建文件夾即可:

然后直接運行出現下面的錯誤:

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
    at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:120)
    at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:82)
    at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:75)
    at org.apache.hadoop.mapreduce.Job$9.run(Job.java:1255)
    at org.apache.hadoop.mapreduce.Job$9.run(Job.java:1251)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
    at org.apache.hadoop.mapreduce.Job.connect(Job.java:1250)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1279)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
    at com.mapreduce.WcRunner.main(WcRunner.java:57)

解決辦法:

缺少Jar包:hadoop-mapreduce-client-common-2.2.0.jar


 好吧,最后還是沒有實現在本地運行此運行,先在這里記一下吧。下面這個錯搞不定,先做下筆記吧;

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.IllegalArgumentException: Pathname /c:/wc/output from hdfs://master:9000/c:/wc/output is not a valid DFS filename.
    at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194)
    at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:102)
    at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1124)
    at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:145)
    at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:458)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:343)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
    at com.mapreduce.WcRunner.main(WcRunner.java:57)


7:MapReduce程序的幾種提交運行模式:

本地模型運行
1:在windows的eclipse里面直接運行main方法,就會將job提交給本地執行器localjobrunner執行
      ----輸入輸出數據可以放在本地路徑下(c:/wc/srcdata/)
      ----輸入輸出數據也可以放在hdfs中(hdfs://master:9000/wc/srcdata)
2:在linux的eclipse里面直接運行main方法,但是不要添加yarn相關的配置,也會提交給localjobrunner執行
      ----輸入輸出數據可以放在本地路徑下(/home/hadoop/wc/srcdata/)
      ----輸入輸出數據也可以放在hdfs中(hdfs://master:9000/wc/srcdata)  
     
集群模式運行
1:將工程打成jar包,上傳到服務器,然后用hadoop命令提交  hadoop jar wc.jar cn.itcast.hadoop.mr.wordcount.WCRunner
2:在linux的eclipse中直接運行main方法,也可以提交到集群中去運行,但是,必須采取以下措施:
      ----在工程src目錄下加入 mapred-site.xml  和  yarn-site.xml
      ----將工程打成jar包(wc.jar),同時在main方法中添加一個conf的配置參數 conf.set("mapreduce.job.jar","wc.jar");           

3:在windows的eclipse中直接運行main方法,也可以提交給集群中運行,但是因為平台不兼容,需要做很多的設置修改
        ----要在windows中存放一份hadoop的安裝包(解壓好的)
        ----要將其中的lib和bin目錄替換成根據你的windows版本重新編譯出的文件
        ----再要配置系統環境變量 HADOOP_HOME  和 PATH
        ----修改YarnRunner這個類的源碼 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM