一、MapReduce介紹
MapReduce是一個分布式計算框架
,可以部署在Hadoop、Spark等大數據平台上,實現海量數據的並行計算。它采用“分而治之
”的思想,將一個計算任務交給集群中的多台機器共同完成,之后再匯總成最終結果。
一般來說讀取一個TB,PB級的文件,普通計算機的速度是比較慢的,而要想提高速度就要提高計算機的硬件配置,這對於普通用戶來說是很難做到的,也提高了這一領域的門檻。而采用廉價的機器組成分布式系統,只要集群的機器數量足夠多,那么計算的速度就會足夠快。
二、MapReduce特點
優點
(1)易於編程
完全獨立完成一個MapReduce程序是一個很困難的事情,這需要很強的編程能力。好在MapReduce給我們提供了大量的方便開發的接口,我們只需要繼承一些接口,實現一些特定的函數就能完成一個MapReduce程序。
(2)高拓展性
這是一個分布式計算框架,我們可以簡單粗暴的,通過增加機器來提高計算性能。
(3)高容錯性
由於MapReduce集群采用的大多是廉價的機器,宕機,BUG等都是家常便飯。但MapReduce框架提供多種有效的錯誤檢測和恢復機制。如果一個結點出現了問題,其他結點會接替這個結點的工作,等結點恢復正常后,又可以繼續工作,這些都由Hadoop內部完成。
(4)高吞吐量
MapReduce可以對PB級以上也就是1024TB的數據進行離線計算。
缺點
(1)難以實時計算
MapReduce處理的是磁盤上的數據。
(2)不能流式計算
MapReduce處理的是磁盤上的靜態數據,而流式計算的輸入數據的動態的。
(3)難以用於DAG計算
DAG(有向無環圖)多個任務間存在依賴關系,后一個應用的輸入可能是前一個應用的輸出。而MapReduce的輸出結果都會寫在磁盤上,這會造成大量的磁盤IO,降低集群的性能。
三、MapReduce編程
以詞頻統計程序為例
pom依賴
去https://mvnrepository.com/尋找以下幾個依賴
hadoop-common、hadoop-hdfs、hadoop-mapreduce-client-core、junit
插件:maven-compiler-plugin、maven-shade-plugin
下面是我的pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.pineapple</groupId>
<artifactId>MapReduceTest</artifactId>
<!-- <packaging>pom</packaging>-->
<version>1.0-SNAPSHOT</version>
<properties>
<hadoop.version>2.6.0</hadoop.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
<!-- <scope>test</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-shade-plugin -->
<!-- <dependency>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-shade-plugin</artifactId>-->
<!-- <version>2.4.3</version>-->
<!-- </dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
要做統計的文件內容為:
Hello,Hadoop,BigData
Hello,Hadoop,MapReduce
Hello,Hadoop,HDFS
BigData,Perfect
MapReduce由Map和Reduce兩個階段組成。
Map
在此之前還有一個讀取文件的操作,這個只要在主類中指定一下就好,不需要寫。
讀取后文件的內容為:
//K1 V1
0 Hello,Hadoop,BigData
21 Hello,Hadoop,MapReduce
44 Hello,Hadoop,HDFS
62 BigData,Perfect
Map后的文件內容為:
//K2 V2
Hello 1
Hadoop 1
BigData 1
Hello 1
Hadoop 1
MapReduce 1
Hello 1
Hadoop 1
HDFS 1
BigData 1
Perfect 1
Map表示“映射”,將文件拆分成多個塊,然后發給集群上的機器統一計算
要繼承Mapper類並重寫map()函數
,將K1,V1轉換成下面的K2,V2
package cn.pineapple.day1;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 四個泛型的解釋:
* <p>
* KEYIN:K1的類型
* <p>
* VALUEIN:V1的類型
* <p>
* KEYOUT:K2的類型
* <p>
* VALUEOUT:V2的類型
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
/**
* 將K1,V1轉換成K2,V2
*
* @param key: 行偏移量
* @param value: 一行文本內容
* @param context: 上下文對象
* @throws IOException:
* @throws InterruptedException:
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text text = new Text();
LongWritable longWritable = new LongWritable();
//1:對一行的文本數據進行拆分
String[] split = value.toString().split(",");
//2:遍歷數組,組裝K2和V2
for (String word : split) {
//3:將K2和V2寫入上下文中
text.set(word);
longWritable.set(1);
context.write(text, longWritable);
}
}
}
map()函數內的代碼,只需要針對文件一行考慮。所以首先要提取每個單詞,用value.toString()將value也就是這一行的內容轉換成String類型,再用split()方法進行拆分。K2的值默認為1。在寫入的時候,還要將K2和V2轉換成Text類型和LongWritable類型。
Reduce
在Map和Reduce中間還有一個shuffle,目前用不到這個shuffle,可以采取默認的方式,它會把K2,V2轉換成:
//K2 V2
Hello <1,1,1>
Hadoop <1,1,1>
BigData <1,1>
HDFS <1>
MapReduce <1>
Perfect <1>
接着Reduce的最終結果是:
//K3 V3
BigData 2
HDFS 1
Hadoop 3
Hello 3
MapReduce 1
Perfect 1
Reduce表示“歸約”,將所有結果都統一起來,繼承Reducer類並重寫reduce()函數
,將K2和V2轉換為K3和V3,也就是最終結果。
package cn.pineapple.day1;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 四個泛型的解釋:
* <p>
* KEYIN:K2類型
* <p>
* VALUEIN:V2類型
* <p>
* KEYOUT:K3類型
* <p>
* VALUEOUT:V3類型
*/
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
/**
* 將K2和V2轉換為K3和V3,將K3和V3寫入上下文中
*
* @param key: 新K2
* @param values: 新V2
* @param context: 上下文對象
* @throws IOException:
* @throws InterruptedException:
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
//1:遍歷集合,將集合中的數字相加,得到V3
for (LongWritable value : values) {
count += value.get();
}
//2:將K3V3寫入上下文中
context.write(key, new LongWritable(count));
}
}
為了實現這個最終結果,K3和K2是一樣的,我們只需要遍歷這個集合,然后相加就能得到V3。value.get()可以將LongWritable類型轉換成long類型,最后在寫入的時候再進行一次轉換。
Main方法
光有這兩個類是不行的,我們還要寫一個Main方法,詳細的列一下任務流程,指定一下任務配置等。
要繼承Configured類並實現Tool接口。
package cn.pineapple.day1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
/**
* 指定一個job任務
*
* @param strings:
* @return :
* @throws Exception:
*/
public int run(String[] strings) throws Exception {
//1:創建一個job任務對象
Job wordCount = Job.getInstance(super.getConf(), "WordCount");
//如果打包運行出錯,則需要加改配置
wordCount.setJarByClass(JobMain.class);
//2:配置job任務對象(八個步驟)
//第一步:指定文件的讀取方式和
wordCount.setInputFormatClass(TextInputFormat.class);
// 讀取路徑
TextInputFormat.addInputPath(wordCount, new Path("hdfs://nsv:8020/input/wordcount"));
//第二步:指定Map階段的處理方式和數據類型
wordCount.setMapperClass(WordCountMapper.class);
// 設置K2的類型
wordCount.setMapOutputKeyClass(Text.class);
// 設置V2的類型
wordCount.setMapOutputValueClass(LongWritable.class);
//第三、四、五、六shuffle階段采用默認的方式
//第七步:指定Reduce階段的處理方式和數據類型
wordCount.setReducerClass(WordCountReducer.class);
// 設置K3的類型
wordCount.setOutputKeyClass(Text.class);
// 設置V3的類型
wordCount.setOutputValueClass(LongWritable.class);
//第八步:指定輸出類型
wordCount.setOutputFormatClass(TextOutputFormat.class);
// 輸出路徑
TextOutputFormat.setOutputPath(wordCount, new Path("hdfs://nsv:8020/output/wordcount"));
//等待任務結束
boolean bl = wordCount.waitForCompletion(true);
return bl ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//啟動job任務
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
run()方法里就是本次任務的詳細流程和配置,然后main()方法里要調用上面寫的run方法。它接受一個int類型作為退出代碼,0或1。
四、打包運行
雙擊package打包,出現BUILD SUCCESS表示打包成功
出現了兩個jar包,origin包是因為用了maven-shade-plugin打包插件,不會包含依賴jar包,所有體積較小,可以選擇這個jar包運行。
將要統計的文件放到HDFS上
hdfs dfs -mkdir -p /input/wordcount
hdfs dfs -put wordcount.txt /input/wordcount
將jar包上傳到集群上后,跑一下嘍
hadoop jar original-MapReduceTest-1.0-SNAPSHOT.jar cn.pineapple.day1.JobMain
,cn.pineapple.day1.JobMain是主類的全路徑
completed successfully看來是成功了,去Web上看一下實際結果
點擊Download可以查看下載結果