第四章、MapReduce編程入門
目錄結構
1.使用Eclipse建立MapReduce工程
1.1 下載與安裝Eclipse
1.2 配置MapReduce環境
1.3 新建MapReduce工程
2.通過源碼初識MapReduce工程
2.1 通俗理解MapReduce原理
2.2 了解MR實現詞頻統計的執行流程
2.3 讀懂官方提供的WordCount源碼
3.編程實現按日期統計訪問次數
3.1 分析思路與處理邏輯
3.2 編寫核心模塊代碼
3.3 任務實現
4.編程實現按訪問次數排序
4.1 分析思路與處理邏輯
4.2 編寫核心模塊代碼
4.3 任務實現
5.小結
6.實訓
實訓1.獲取成績表的最高分記錄
實訓2.對兩個文件中的數據進行合並和去重
7.課后練習
背景:某社交網站經過幾年的發展,注冊用戶超過1000萬,其中付費用戶(VIP)占用戶總數的0.1%。網站運營方的重點之一是向付費用戶提供更加優質的服務,必須根據服務對象的特點設計有針對性的服務方案。需要對付費用戶訪問網站的數據分析,這是一項非常重要的工作任務。這個任務由以下幾個階段來詳細展開:
1.使用Eclipse建立MapReduce工程
具體參考: https://blog.csdn.net/hehe_soft_engineer/article/details/102147721
2.通過源碼初識MapReduce工程
此部分,目的是對MapReduce的核心模塊 Mapper與Reducer的執行流程有一定的認識。通過學習wordcount的源碼來了解一下。
2.1 通俗理解MapReduce原理
(1)MapReduce包括Mapper模塊和Reducer模塊,MapReduce可以看做是一個專業處理大數據的工程隊,主要由下面成員構成:
①Mapper:映射器 ②Mapper助理InputFormat:輸入文件讀取器
③Shuffle:運輸隊 ④Shuffle助理Sorter:排序器
⑤Reducer:歸約器 ⑥Reducer助理OutputFormat:輸出結果寫入器
(2)簡化的MapReduce處理流程圖:
①數據切片:系統把數據分給多個Mapper來處理,通過數據分片的形式,這是分布式計算的第一步。
②數據映射:分片完成后,Mapper助理InputFormat從文件輸入目錄讀取數據,再由Mapper對數據進行解析,組織成為新的格式(鍵值對形式),最后Mapper將處理好的數據輸出,等待shuffle運輸隊取走結果。
③數據混洗:shuffle運輸隊把獲取的結果按照相同的鍵(Key)進行匯集,再把結果送到Shuffle助理Sorter處,由Sorter負責對這些結果排序,,然后提交給Reducer。
④數據歸約:Reducer收到數據后,將結果進行匯總與映射工作,得到最終的計算結果,最后由Reducer助理OutputFormat將結果輸出到指定位置處。
2.2 了解MR實現詞頻統計的執行流程
下面舉一個實例來說明一下Map和Reduce過程
輸入 |
輸出 |
Hello World Our World |
BigData 2 |
Hello BigData Real BigData |
Great 1 |
Hello Hadoop Great Hadoop |
Hadoop 3 |
Hadoop MapReduce |
Hello 3 |
|
MapReduce 1 |
|
Our 1 |
|
Real 1 |
|
World 2 |
(1)Map任務的處理過程
(2)Reduce任務的處理過程
2.3 讀懂官方提供的WordCount源碼
要編寫數據處理程序,還要參考MapReduce編程的具體規范,下面進行代碼級別的分析和說明:
在“D:\tools\hadoop-2.7.7\share\hadoop\mapreduce\sources”目錄下找到“hadoop-mapreduce-examples-2.7.7-sources.jar”,解壓縮此文件,在子目錄“\org\apache\hadoop\examples”看到WordCount文件,這就是WordCount程序的源代碼。
從結構上可以分為3部分,分別是應用程序Driver、Mapper模塊與Reducer模塊。
(1)應用程序Driver分析
這里的Driver程序主要是指的main函數,在main函數里面進行MapReduce程序的一些初始化設置,並提交任務,等待程序運行完成。
基本上是這樣的格式,在此基礎上只需要修改部分參數即可。
(2)Mapper模式分析
(3)Reducer模式分析
(4)概括地講:
進行MapReduce編程時,開發者主要處理的是Mapper和Reducer兩個模塊,其中包括定義輸入輸出的鍵值對格式、編寫map與reduce函數中定義的處理邏輯等。
3.編程實現按日期統計訪問次數
本部分任務目標是統計用戶在2016年每個自然日的總訪問次數。原始數據文件中提供了用戶名稱與訪問日期,這個任務實質就是要獲取以每個自然日為單位的所有用戶訪問次數的累加值。如果通過MapReduce編程實現這個任務,首先要考慮的是,Mapper與Reducer各自的處理邏輯是怎樣的,然后根據處理邏輯編寫核心代碼,最后在Eclipse中編寫核心代碼,編譯打包后提交集群運行。
3.1 分析思路與處理邏輯
着重考慮以下幾個要素:
①輸入輸出格式 ②Mapper要實現的邏輯 ③Reducer要實現的計算邏輯
(1) 定義輸入/輸出格式
社交網站用戶的訪問日期在格式上屬於文本格式,訪問次數為整型數值格式。其組成的鍵值對為<訪問日期,訪問次數>,因此Mapper的輸出與Reducer的輸出都選用Text類與IntWritable類。
(2)Mapper 類的邏輯實現
Mapper類中最主要的部分就是map函數。map函數的主要任務就是讀取用戶訪問文件中的數據,輸出所有訪問日期與初始次數的鍵值對。因此訪問日期是數據文件的第二列,所有先定義一個數組,再提取第二個元素,與初始次數1一起構成要輸出的鍵值對,即<訪問日期,1>。
(3)Reducer的邏輯實現
Reducer類中最主要的部分就是reduce函數。reduce的主要任務就是讀取Mapper輸出的鍵值對<訪問日期,1>。這一部分與官網給出的WordCount中的Reducer完全相同。
3.2 編寫核心模塊代碼
目錄結構:
package test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DailyAccessCount {
// Mapper模塊
public static class MyMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
public void map(Object key, Text value, Context context) //map函數的編寫要根據讀取的文件內容和業務邏輯來寫
throws IOException, InterruptedException {
String line = value.toString();
String array[] = line.split(",");//指定,為分隔符,組成數組
String keyOutput = array[1];//提取數組中的訪問日期作為Key
context.write(new Text(keyOutput), one);//形成鍵值對
}
}
// Reducer模塊
public static class MyReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0; //定義累加器,初始值為0
for (IntWritable val : values) {
sum += val.get(); //將相同鍵的所有值進行累加
}
result.set(sum);
context.write(key, result);
}
}
//Driver模塊,主要是配置參數
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "DailyAccessCount");
job.setJarByClass(DailyAccessCount.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < args.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(args[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(args[args.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3.3 任務實現
將文件編譯生成JAR包文件,提交Hadoop集群執行
在運行過程中報錯,原因是我在外面的JDK用的是1.9的,等級過高了(Linux系統的JDK是1.8的),所以要重新配置JDK。
記住,在用Windows環境下的JDK要和Hadoop集群環境下的JDK環境相同。
歷史各個版本的JDK下載地址:
https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8-2177648.html
經過配置,終於可以運行啦,哈哈哈 o(* ̄︶ ̄*)o
hadoop jar NewDaily.jar test.NewDaily /user/dftest/user_login.txt /user/dftest/AccessCount
結果如下:
再來查看輸出結果:
打開文件可以看到:
第一列是已經按照自然日期排好順序,第二列是對應日期的總訪問次數,任務基本完成。
4.編程實現按訪問次數排序
前一部分完成了日期統計任務,本部分要對AccessCount中的數據按照訪問次數進行排序,將排序后的結果存放在相同目錄下的TimesSort中。
4.1 分析思路與處理邏輯
MapReduce只會對鍵值進行排序,所以我們在Mapper模塊中對於輸入的鍵值對,把Key與Value位置互換,在Mapper輸出后,鍵值對經過shuffle的處理,已經變成了按照訪問次數排序的數據順序啦,輸出格式為<訪問次數,日期>。Reducer的處理和Mapper恰好相反,將鍵和值的位置互換,輸出格式變為<日期,訪問次數>。
4.2 編寫核心模塊代碼
package test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AccessTimesSort {
// Mapper模塊
public static class MyMapper
extends Mapper<Object, Text, IntWritable,Text>{
public void map(Object key, Text value, Context context) //map函數的編寫要根據讀取的文件內容和業務邏輯來寫
throws IOException, InterruptedException {
String line = value.toString();
String array[] = line.split("\t");//指定,為分隔符,組成數組
int keyOutput = Integer.parseInt(array[1]);//提取數組中的訪問次數作為Key
String valueOutput = array[0]; //將日期作為value
context.write(new IntWritable(keyOutput), new Text(valueOutput));
}
}
// Reducer模塊
public static class MyReducer
extends Reducer<IntWritable,Text,Text,IntWritable> {//注意與上面輸出對應
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text val : values) {
context.write(val, key); //進行鍵值位置互換
}
}
}
//Driver模塊,主要是配置參數
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "AccessTimesSort");
job.setJarByClass(AccessTimesSort.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < args.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(args[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(args[args.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4.3 任務實現
(1)編譯生成jar包
(2)將jar包上傳到集群
(3)執行jar包內的AccessTimesSort類
hadoop jar NewDaily.jar test.AccessTimesSort /user/dftest/AccessCount /user/dftest/TimesSort
(4)查看執行結果(由此可以看到結果為升序排列)
此任務順利完成 哈哈。
5.小結
本章介紹了MapReduce編程的基礎知識,通過對Hadoop官方的示例代碼的分析及解讀,深入了解了MapReduce的執行過程。MapReduce把復雜的、運行在Hadoop集群上的並行計算過程集成到了兩個模塊——Mapper和Reducer上。開發人員只需要把業務處理邏輯通過其中的map函數和reduce函數來實現,就可以達到分布式並行編程的目的。
MapReduce執行過程主要包括以下幾個部分:讀取分布式文件系統的數據,進行數據分片,執行map任務以輸出中間結果,shuffle階段把中間結果進行匯合、排序,再傳到Reduce任務,在Reduce階段對數據進行處理,輸出最終結果到分布式文件系統內。
6.實訓
實訓目的是,掌握MapReduce編程的基本方法,通過MapReduce編程來實現一些常用的數據處理方法,包括求最大值、去重等。
實訓1.獲取成績表的最高分記錄
(1)需求說明:對於樣例文件subject_score,即成績表A。文件中的每一行數據包含兩個字段:科目和分數。要求獲得成績列表中每個科目成績最高的記錄,並將結果輸出到最高成績表B。
表A的部分內容:
語文 |
96 |
數學 |
102 |
英語 |
130 |
物理 |
19 |
化學 |
44 |
生物 |
44 |
語文 |
109 |
數學 |
118 |
英語 |
141 |
要輸出的表B結構:
化學 |
99 |
數學 |
149 |
物理 |
99 |
生物 |
99 |
英語 |
144 |
語文 |
114 |
(2)實現思路與步驟:
①在Mapper中,map函數讀取成績表A中的數據,直接將讀取的數據以空格分隔,組成鍵值對<科目,成績>,即設置輸出鍵值對類型為<Text,IntWritable>。
②在Reducer中,由於map函數輸出鍵值對類型是<Text,IntWritable>,所以在Reducer中接收的鍵值對類型就是<Text,Iterable<IntWritable>>。針對相同的鍵遍歷它的值,找到最高值,最后輸出的鍵值對為<科目,最高成績>。
(3)實現及輸出結果:
①代碼實現:
package test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ScoreSorting {
// Mapper模塊
public static class MyMapper
extends Mapper<LongWritable, Text, Text ,IntWritable>{
Text course=new Text();
IntWritable score=new IntWritable();
public void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text ,IntWritable>.Context context) //map函數的編寫要根據讀取的文件內容和業務邏輯來寫
throws IOException, InterruptedException {
String line = value.toString();
String array[] = line.trim().split(" ");//trim函數去掉兩邊多余的空格,指定空格為分隔符,組成數組
course.set(array[0]);//第一列是科目
score.set(Integer.parseInt(array[1]));//第二列是分數
context.write(course, score);
}
}
// Reducer模塊
public static class MyReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {//注意與上面輸出對應
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text,IntWritable,Text,IntWritable>.Context context)
throws IOException, InterruptedException {
int maxscore=0; //初始化最大值
for (IntWritable score:values) {
if(maxscore < score.get()) {
maxscore=score.get(); //相同鍵內找最大值
}
}
result.set(maxscore);
context.write(key, result);
}
}
//Driver模塊,主要是配置參數
public static void main(String[] args) throws Exception { //對有幾個參數要有很強的敏感性,如果多可以用前面的遍歷方式,如果少就可以直接指定。
if(args.length!=2) {
System.err.println("ScoreSorting <input> <output>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "ScoreSorting");
job.setJarByClass(ScoreSorting.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
上傳數據文件並執行程序:
hdfs dfs -put /testhadoop/subject_score.txt /user/dftest
hadoop jar NewDaily3.jar test.ScoreSorting /user/dftest/subject_score.txt /user/dftest/SortScore8 //經過8次才調好的。
經過不斷地調試,但是總會出現輸入類型不匹配的問題,最終,找到在map函數重寫的時候,因為值類型錯了,應該是Text類型,寫成了IntWritable類型,但是報錯總是報是因為Text 和 LongWritable問題,所以有點迷。
最后問題解決:
實訓2.對兩個文件中的數據進行合並和去重
(1)需求說明:
有兩個樣例文件:XX與YY。要求合並兩個文件中的數據,並對合並后的數據進行去重,將結果輸出到文件ZZ。
(2)實現思路與步驟:
①利用MapReduce中Reducer類會合並相同鍵值對的特性,對目標數據進行去重。
②在HDFS創建目錄XXYY,將樣例文件XX與YY上傳到此目錄。MapReduce程序讀取此目錄下的文件。
③在Mapper類中,map函數讀取兩個文件數據,直接將讀取的數據作為鍵,將值設置為1,最后輸出格式為<Text,IntWritable>。
④在Reducer中,鍵保持不變,將對應的值取為空,輸出類型為<Text,NullWritable>。
(3)實現及輸出結果:
略
7.小練習
在MapReduce程序中,Reducer類中包括的函數有:B
A. startup、reduce、end B. setup、reduce、cleanup
C. start、run、reduce、end D. startup、run、end
下一章將對MapReduce編程進行更深一步的剖析 ^_^ 。