一、MapReduce 原理
MapReduce 是一種變成模式,用於大規模的數據集的分布式運算。通俗的將就是會將任務分給不同的機器做完,然后在收集匯總。
MapReduce有兩個核心:Map,Reduce,它們分別單獨計算任務,每個機器盡量計算自己hdfs內部的保存信息,Reduce則將計算結果匯總。
一、WordCount單詞統計
1.1 數據准備test.txt
hello hadoop
wille learn hadoop WordCount
but the hadoop is not easy
1.2 Map程序:
package com.ice.hadoop.test.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
這里定義了一個mapper類,其中有一個map方法。MapReduce框架每讀到一行數據,就會調用一次這個map方法。
Mapper<LongWritable, Text, Text, IntWritable>其中的4個類型分別是:輸入key類型、輸入value類型、輸出key類型、輸出value類型。
MapReduce框架讀到一行數據侯以key value形式傳進來,key默認情況下是mr礦機所讀到一行文本的起始偏移量(Long類型),value默認情況下是mr框架所讀到的一行的數據內容(String類型)。
輸出也是key value形式的,是用戶自定義邏輯處理完成后定義的key,用戶自己決定用什么作為key,value是用戶自定義邏輯處理完成后的value,內容和類型也是用戶自己決定。
此例中,輸出key就是word(字符串類型),輸出value就是單詞數量(整型)。
這里的數據類型和我們常用的不一樣,因為MapReduce程序的輸出數據需要在不同機器間傳輸,所以必須是可序列化的,例如Long類型,Hadoop中定義了自己的可序列化類型LongWritable,String對應的是Text,int對應的是IntWritable。
1.3 Reduce程序:
package com.ice.hadoop.test.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
Integer count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
這里定義了一個Reducer類和一個reduce方法。當傳給reduce方法時,就變為:Reducer<Text, IntWritable, Text, IntWritable> 4個類型分別指:輸入key的類型、輸入value的類型、輸出key的類型、輸出value的類型。
需要注意,reduce方法接收的是:一個字符串類型的key、一個可迭代的數據集。因為reduce任務讀取到map任務處理結果是這樣的:
(good,1)(good,1)(good,1)(good,1)
當傳給reduce方法時,就變為:
key:good
value:(1,1,1,1)
所以,reduce方法接收到的是同一個key的一組value。
1.4 Main程序
package com.ice.hadoop.test.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountMapReduce {
public static void main(String[] args) throws Exception {
//創建配置對象
Configuration conf = new Configuration();
//創建Job對象
Job job = Job.getInstance(conf, "wordCount");
//設置mapper類
job.setMapperClass(WordcountMapper.class);
//設置 Reduce類
job.setReducerClass(WordCountReducer.class);
//設置運行job類
job.setJarByClass(WordCountMapReduce.class);
//設置map輸出的key,value類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//設置reduce輸出的key,value類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設置輸入路徑金額輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交job
boolean b = job.waitForCompletion(true);
if (!b){
System.out.println("word count failed!");
}
}
}
編譯打包后:
hdfs dfs -mkdir -p /wordcount/input
hdfs dfs -put test.txt /wordcount/input
執行wordcount jar
hadoop jar mapreduce-wordcount-0.0.1-SNAPSHOT.jar com/ice/hadoop/test/wordcount/WordCountMapReduce /wordcount/input /wordcount/output
執行完成后驗證
hdfs dfs -cat /wordcount/output/*
二、hadoop 序列化
hadoop 為什么不使用java序列化
Hadoop的序列化機制與java的序列化機制不同,它將對象序列化到流中,值得一提的是java的序列化機制是不斷的創建對象,但在Hadoop的序列化機制中,用戶可以復用對象,這樣就減少了java對象的分配和回收,提高了應用效率。
Hadoop定義了新的序列化接口——writable:
package org.apache.hadoop.io
import java.io.DataOutput
import java.io.DataInput
import java.io.IOException
public interface Writable{
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
通過實現 Writable 接口,完成序列化與反序列化。
但更多的時候,Hadoop要求同時實現序列化與可對比性,因此更常見的情況下需要實現的是 WritableComparable 接口。同時給出默認的構造函數供 MapReduce 進行實例化。下面給出一個自定義Hadoop可序列化類的示例:
import java.io.*;
import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first + "\t" + second;
}
@Override
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}
}
2.1 需求與實現思路
需要統計手機用戶流量日志,日志內容實例
手機號 | 上行流量 | 下行流量 |
---|---|---|
1252548225 | 200 | 1100 |
1345858685 | 300 | 1200 |
1862538225 | 400 | 1300 |
1545858645 | 100 | 300 |
1502236225 | 500 | 1300 |
1362858685 | 300 | 1100 |
要把同一個用戶的上行流量、下行流量進行累加,並計算出綜合。
例如上面的13897230503有兩條記錄,就要對這兩條記錄進行累加,計算總和,得到:
13897230503,500,1600,2100
2.2 實現思路
- map
接收日志的一行數據,key為行的偏移量,value為此行數據。
輸出時,應以手機號為key,value應為一個整體,包括:上行流量、下行流量、總流量。
手機號是字符串類型Text,而這個整體不能用基本數據類型表示,需要我們自定義一個bean對象,並且要實現可序列化。
key: 13897230503
value: < upFlow:100, dFlow:300, sumFlow:400 > - reduce
接收一個手機號標識的key,及這個手機號對應的bean對象集合
例如:
key:
13897230503
value:
< upFlow:400, dFlow:1300, sumFlow:1700 >,
< upFlow:100, dFlow:300, sumFlow:400 >
迭代bean對象集合,累加各項,形成一個新的bean對象,例如:
< upFlow:400+100, dFlow:1300+300, sumFlow:1700+400 >
最后輸出:
key: 13897230503
value: < upFlow:500, dFlow:1600, sumFlow:2100 >
2.3 map程序
創建實體並實現Writable
package com.ice.hadoop.test.flowbean;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* @author:ice
* @Date: 2019/2/22 0022
*/
public class FlowBean implements Writable {
private long upFlow;
private long dFlow;
private long sumFlow;
public FlowBean() {
}
public FlowBean(long upFlow, long dFlow) {
this.upFlow = upFlow;
this.dFlow = dFlow;
this.sumFlow = upFlow + dFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);//wirte寫入的順序與read讀取順序
out.writeLong(dFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
dFlow = in.readLong();
sumFlow = in.readLong();
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getdFlow() {
return dFlow;
}
public void setdFlow(long dFlow) {
this.dFlow = dFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public String toString() {
return "FlowBean{" +
"upFlow=" + upFlow +
", dFlow=" + dFlow +
", sumFlow=" + sumFlow +
'}';
}
}
MapReduce程序:
package com.ice.hadoop.test.flowbean;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @author:ice
* @Date: 2019/2/22 0022
*/
public class FlowCount {
static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");
String phone = fields[0];
Long upFlow = Long.parseLong(fields[1]);
Long dFlow = Long.parseLong(fields[2]);
context.write(new Text(phone), new FlowBean(upFlow, dFlow));
}
}
static class FlowCountReduce extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long sumUpFlow = 0L;
long sumDFlow = 0L;
for (FlowBean bean : values) {
sumUpFlow += bean.getUpFlow();
sumDFlow += bean.getdFlow();
}
FlowBean sumBean = new FlowBean(sumUpFlow, sumDFlow);
context.write(key, sumBean);
}
}
public static void main(String[] args) throws Exception {
//創建配置對象
Configuration conf = new Configuration();
//創建Job對象
Job job = Job.getInstance(conf, "FlowCount");
//設置mapper類
job.setMapperClass(FlowCountMapper.class);
//設置 Reduce類
job.setReducerClass(FlowCountReduce.class);
//設置運行job類
job.setJarByClass(FlowCount.class);
//設置map輸出的key,value類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//設置reduce輸出的key,value類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//設置輸入路徑金額輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
編譯打包步驟是一樣的。
三、合並小文件
為什么要合並小文件,因為mapReduce會將每一個小文件都當做一個任務,當特別多的小文件時,導致創建非常多的任務從而效率損耗
如何實現:文件的讀取有map負責,為了將小文件合並,需要使用Inputformat,RecordReader,RecordReader負責實現一次讀取一個完整文件封裝為key value,map接收到文件內容,然后以文件名為key,以文件內容為value,向外輸出的格式要注意,要使用SequenceFileOutPutFormat(用來輸出對象)。
因為reduce收到的key value都是對象,不是普通的文本,reduce默認的輸出格式是TextOutputFormat,使用它的話,最終輸出的內容就是對象ID,所以要使用SequenceFileOutPutFormat進行輸出
3.1 代碼實踐
package com.ice.hadoop.test.mergefile;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class MyInputFormat extends FileInputFormat<NullWritable, ByteWritable> {
@Override
public org.apache.hadoop.mapreduce.RecordReader<NullWritable, ByteWritable> createRecordReader(
org.apache.hadoop.mapreduce.InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
MyRecordReader reader = new MyRecordReader();
reader.initialize(inputSplit, taskAttemptContext);
return null;
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
//設置每個小文件不可分割,保證一個小文件生成一個key-value鍵值對
return false;
}
}
createRecordReader方法中創建一個自定義的reader
package com.ice.hadoop.test.mergefile;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MyRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) inputSplit;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
}
}
其中有3個核心方法:nextKeyValue、getCurrentKey、getCurrentValue。
nextKeyValue負責生成要傳遞給map方法的key和value。getCurrentKey、getCurrentValue是實際獲取key和value的。所以RecordReader的核心機制就是:通過nextKeyValue生成key value,然后通過getCurrentKey和getCurrentValue來返回上面構造好的key value。這里的nextKeyValue負責把整個文件內容作為value。
MapReduce程序:
package com.ice.hadoop.test.mergefile;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
/**
* @author:ice
* @Date: 2019/2/22 0022
*/
public class ManyToOne {
static class FileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
private Text fileNameKey;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
InputSplit split = context.getInputSplit();
Path path = ((FileSplit) split).getPath();
fileNameKey = new Text(path.toString());
}
@Override
protected void map(NullWritable key, BytesWritable value, Context context)
throws IOException, InterruptedException {
context.write(fileNameKey, value);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ManyToOne.class);
job.setInputFormatClass(MyInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setMapperClass(FileMapper.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}