1:MapReduce的概述:
(1):MapReduce是一種分布式計算模型,由Google提出,主要用於搜索領域,解決海量數據的計算問題.
(2):MapReduce由兩個階段組成:Map和Reduce,用戶只需要實現map()和reduce()兩個函數,即可實現分布式計算,非常簡單。
(3):這兩個函數的形參是key、value對,表示函數的輸入信息。
2:MapReduce執行步驟:
(1): map任務處理
(a):讀取輸入文件內容,解析成key、value對。對輸入文件的每一行,解析成key、value對。每一個鍵值對調用一次map函數。
(b):寫自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
(2)reduce任務處理
(a)在reduce之前,有一個shuffle的過程對多個map任務的輸出進行合並、排序。
(b)寫reduce函數自己的邏輯,對輸入的key、value處理,轉換成新的key、value輸出。
(c)把reduce的輸出保存到文件中。
例子:實現WordCountApp
3:map、reduce鍵值對格式:
4:MapReduce流程:
(1)代碼編寫
(2)作業配置
(3)提交作業
(4)初始化作業
(5)分配任務
(6)執行任務
(7)更新任務和狀態
(8)完成作業
5:MapReduce介紹及wordcount和wordcount的編寫和提交集群運行的案例:
WcMap類進行單詞的局部處理:
1 package com.mapreduce; 2 3 4 import java.io.IOException; 5 6 import org.apache.commons.lang.StringUtils; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Mapper; 10 11 /*** 12 * 13 * @author Administrator 14 * 1:4個泛型中,前兩個是指定mapper輸入數據的類型,KEYIN是輸入的key的類型,VALUEIN是輸入的value的值 15 * KEYOUT是輸入的key的類型,VALUEOUT是輸入的value的值 16 * 2:map和reduce的數據輸入和輸出都是以key-value的形式封裝的。 17 * 3:默認情況下,框架傳遞給我們的mapper的輸入數據中,key是要處理的文本中一行的起始偏移量,這一行的內容作為value 18 * 4:key-value數據是在網絡中進行傳遞,節點和節點之間互相傳遞,在網絡之間傳輸就需要序列化,但是jdk自己的序列化很冗余 19 * 所以使用hadoop自己封裝的數據類型,而不要使用jdk自己封裝的數據類型; 20 * Long--->LongWritable 21 * String--->Text 22 */ 23 public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{ 24 25 //重寫map這個方法 26 //mapreduce框架每讀一行數據就調用一次該方法 27 @Override 28 protected void map(LongWritable key, Text value, Context context) 29 throws IOException, InterruptedException { 30 //具體業務邏輯就寫在這個方法體中,而且我們業務要處理的數據已經被框架傳遞進來,在方法的參數中key-value 31 //key是這一行數據的起始偏移量,value是這一行的文本內容 32 33 //1:切分單詞,首先拿到單詞value的值,轉化為String類型的 34 String str = value.toString(); 35 //2:切分單詞,空格隔開,返回切分開的單詞 36 String[] words = StringUtils.split(str," "); 37 //3:遍歷這個單詞數組,輸出為key-value的格式,將單詞發送給reduce 38 for(String word : words){ 39 //輸出的key是Text類型的,value是LongWritable類型的 40 context.write(new Text(word), new LongWritable(1)); 41 } 42 43 44 } 45 }
WcReduce進行單詞的計數處理:
1 package com.mapreduce; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Reducer; 8 9 /*** 10 * 11 * @author Administrator 12 * 1:reduce的四個參數,第一個key-value是map的輸出作為reduce的輸入,第二個key-value是輸出單詞和次數,所以 13 * 是Text,LongWritable的格式; 14 */ 15 public class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable>{ 16 17 //繼承Reducer之后重寫reduce方法 18 //第一個參數是key,第二個參數是集合。 19 //框架在map處理完成之后,將所有key-value對緩存起來,進行分組,然后傳遞一個組<key,valus{}>,調用一次reduce方法 20 //<hello,{1,1,1,1,1,1.....}> 21 @Override 22 protected void reduce(Text key, Iterable<LongWritable> values,Context context) 23 throws IOException, InterruptedException { 24 //將values進行累加操作,進行計數 25 long count = 0; 26 //遍歷value的list,進行累加求和 27 for(LongWritable value : values){ 28 29 count += value.get(); 30 } 31 32 //輸出這一個單詞的統計結果 33 //輸出放到hdfs的某一個目錄上面,輸入也是在hdfs的某一個目錄 34 context.write(key, new LongWritable(count)); 35 } 36 37 38 }
WcRunner用來描述一個特定的作業
1 package com.mapreduce; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 13 14 /*** 15 * 1:用來描述一個特定的作業 16 * 比如,該作業使用哪個類作為邏輯處理中的map,那個作為reduce 17 * 2:還可以指定該作業要處理的數據所在的路徑 18 * 還可以指定改作業輸出的結果放到哪個路徑 19 * @author Administrator 20 * 21 */ 22 public class WcRunner { 23 24 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 25 //創建配置文件 26 Configuration conf = new Configuration(); 27 //獲取一個作業 28 Job job = Job.getInstance(conf); 29 30 //設置整個job所用的那些類在哪個jar包 31 job.setJarByClass(WcRunner.class); 32 33 //本job使用的mapper和reducer的類 34 job.setMapperClass(WcMap.class); 35 job.setReducerClass(WcReduce.class); 36 37 //指定reduce的輸出數據key-value類型 38 job.setOutputKeyClass(Text.class); 39 job.setOutputValueClass(LongWritable.class); 40 41 42 //指定mapper的輸出數據key-value類型 43 job.setMapOutputKeyClass(Text.class); 44 job.setMapOutputValueClass(LongWritable.class); 45 46 //指定要處理的輸入數據存放路徑 47 FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/wc/srcdata")); 48 49 //指定處理結果的輸出數據存放路徑 50 FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/wc/output")); 51 52 //將job提交給集群運行 53 job.waitForCompletion(true); 54 } 55 56 }
書寫好上面的三個類以后打成jar包上傳到虛擬機上面進行運行:
然后啟動你的hadoop集群:start-dfs.sh和start-yarn.sh啟動集群;然后將jar分發到節點上面進行運行;
之前先造一些數據,如下所示:
內容自己隨便搞吧:
然后上傳到hadoop集群上面,首選創建目錄,存放測試數據,將數據上傳到創建的目錄即可;但是輸出目錄不需要手動創建,會自動創建,自己創建會報錯:
然后將jar分發到節點上面進行運行;命令格式如hadoop jar 自己的jar包 主類的路徑
正常性運行完過后可以查看一下運行的效果:
6:MapReduce的本地模式運行如下所示(本地運行需要修改輸入數據存放路徑和輸出數據存放路徑):
1 package com.mapreduce; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 12 13 14 /*** 15 * 1:用來描述一個特定的作業 16 * 比如,該作業使用哪個類作為邏輯處理中的map,那個作為reduce 17 * 2:還可以指定該作業要處理的數據所在的路徑 18 * 還可以指定改作業輸出的結果放到哪個路徑 19 * @author Administrator 20 * 21 */ 22 public class WcRunner { 23 24 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 25 //創建配置文件 26 Configuration conf = new Configuration(); 27 //獲取一個作業 28 Job job = Job.getInstance(conf); 29 30 //設置整個job所用的那些類在哪個jar包 31 job.setJarByClass(WcRunner.class); 32 33 //本job使用的mapper和reducer的類 34 job.setMapperClass(WcMap.class); 35 job.setReducerClass(WcReduce.class); 36 37 //指定reduce的輸出數據key-value類型 38 job.setOutputKeyClass(Text.class); 39 job.setOutputValueClass(LongWritable.class); 40 41 42 //指定mapper的輸出數據key-value類型 43 job.setMapOutputKeyClass(Text.class); 44 job.setMapOutputValueClass(LongWritable.class); 45 46 //指定要處理的輸入數據存放路徑 47 //FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/wc/srcdata/")); 48 FileInputFormat.setInputPaths(job, new Path("d:/wc/srcdata/")); 49 50 51 //指定處理結果的輸出數據存放路徑 52 //FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/wc/output/")); 53 FileOutputFormat.setOutputPath(job, new Path("d:/wc/output/")); 54 55 56 //將job提交給集群運行 57 job.waitForCompletion(true); 58 } 59 60 }
然后去自己定義的盤里面創建文件夾即可:
然后直接運行出現下面的錯誤:
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:120)
at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:82)
at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:75)
at org.apache.hadoop.mapreduce.Job$9.run(Job.java:1255)
at org.apache.hadoop.mapreduce.Job$9.run(Job.java:1251)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapreduce.Job.connect(Job.java:1250)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1279)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
at com.mapreduce.WcRunner.main(WcRunner.java:57)
解決辦法:
缺少Jar包:hadoop-mapreduce-client-common-2.2.0.jar
好吧,最后還是沒有實現在本地運行此運行,先在這里記一下吧。下面這個錯搞不定,先做下筆記吧;
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.IllegalArgumentException: Pathname /c:/wc/output from hdfs://master:9000/c:/wc/output is not a valid DFS filename.
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:102)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1124)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:145)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:458)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:343)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
at com.mapreduce.WcRunner.main(WcRunner.java:57)
7:MapReduce程序的幾種提交運行模式:
本地模型運行
1:在windows的eclipse里面直接運行main方法,就會將job提交給本地執行器localjobrunner執行
----輸入輸出數據可以放在本地路徑下(c:/wc/srcdata/)
----輸入輸出數據也可以放在hdfs中(hdfs://master:9000/wc/srcdata)
2:在linux的eclipse里面直接運行main方法,但是不要添加yarn相關的配置,也會提交給localjobrunner執行
----輸入輸出數據可以放在本地路徑下(/home/hadoop/wc/srcdata/)
----輸入輸出數據也可以放在hdfs中(hdfs://master:9000/wc/srcdata)
集群模式運行
1:將工程打成jar包,上傳到服務器,然后用hadoop命令提交 hadoop jar wc.jar cn.itcast.hadoop.mr.wordcount.WCRunner
2:在linux的eclipse中直接運行main方法,也可以提交到集群中去運行,但是,必須采取以下措施:
----在工程src目錄下加入 mapred-site.xml 和 yarn-site.xml
----將工程打成jar包(wc.jar),同時在main方法中添加一個conf的配置參數 conf.set("mapreduce.job.jar","wc.jar");
3:在windows的eclipse中直接運行main方法,也可以提交給集群中運行,但是因為平台不兼容,需要做很多的設置修改
----要在windows中存放一份hadoop的安裝包(解壓好的)
----要將其中的lib和bin目錄替換成根據你的windows版本重新編譯出的文件
----再要配置系統環境變量 HADOOP_HOME 和 PATH
----修改YarnRunner這個類的源碼