MapReduce實例


一、MapReduce 原理
MapReduce 是一種變成模式,用於大規模的數據集的分布式運算。通俗的將就是會將任務分給不同的機器做完,然后在收集匯總。
MapReduce有兩個核心:Map,Reduce,它們分別單獨計算任務,每個機器盡量計算自己hdfs內部的保存信息,Reduce則將計算結果匯總。

一、WordCount單詞統計

1.1 數據准備test.txt

hello hadoop
wille learn hadoop WordCount
but the hadoop is not easy

1.2 Map程序:

package com.ice.hadoop.test.wordcount;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

  @Override
  protected void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    String line = value.toString();
    String[] words = line.split(" ");
    for (String word : words) {
      context.write(new Text(word), new IntWritable(1));
    }
  }
}

這里定義了一個mapper類,其中有一個map方法。MapReduce框架每讀到一行數據,就會調用一次這個map方法。
Mapper<LongWritable, Text, Text, IntWritable>其中的4個類型分別是:輸入key類型、輸入value類型、輸出key類型、輸出value類型。
MapReduce框架讀到一行數據侯以key value形式傳進來,key默認情況下是mr礦機所讀到一行文本的起始偏移量(Long類型),value默認情況下是mr框架所讀到的一行的數據內容(String類型)。
輸出也是key value形式的,是用戶自定義邏輯處理完成后定義的key,用戶自己決定用什么作為key,value是用戶自定義邏輯處理完成后的value,內容和類型也是用戶自己決定。
此例中,輸出key就是word(字符串類型),輸出value就是單詞數量(整型)。
這里的數據類型和我們常用的不一樣,因為MapReduce程序的輸出數據需要在不同機器間傳輸,所以必須是可序列化的,例如Long類型,Hadoop中定義了自己的可序列化類型LongWritable,String對應的是Text,int對應的是IntWritable。

1.3 Reduce程序:

package com.ice.hadoop.test.wordcount;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

  @Override
  protected void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    Integer count = 0;
    for (IntWritable value : values) {
      count += value.get();
    }
    context.write(key, new IntWritable(count));
  }
}

這里定義了一個Reducer類和一個reduce方法。當傳給reduce方法時,就變為:Reducer<Text, IntWritable, Text, IntWritable> 4個類型分別指:輸入key的類型、輸入value的類型、輸出key的類型、輸出value的類型。
需要注意,reduce方法接收的是:一個字符串類型的key、一個可迭代的數據集。因為reduce任務讀取到map任務處理結果是這樣的:
(good,1)(good,1)(good,1)(good,1)
當傳給reduce方法時,就變為:
key:good
value:(1,1,1,1)
所以,reduce方法接收到的是同一個key的一組value。

1.4 Main程序

package com.ice.hadoop.test.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountMapReduce {

  public static void main(String[] args) throws Exception {
    //創建配置對象
    Configuration conf = new Configuration();
    //創建Job對象
    Job job = Job.getInstance(conf, "wordCount");
    //設置mapper類
    job.setMapperClass(WordcountMapper.class);
    //設置 Reduce類
    job.setReducerClass(WordCountReducer.class);

    //設置運行job類
    job.setJarByClass(WordCountMapReduce.class);

    //設置map輸出的key,value類型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    //設置reduce輸出的key,value類型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    //設置輸入路徑金額輸出路徑
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    //提交job
    boolean b = job.waitForCompletion(true);

    if (!b){
      System.out.println("word count failed!");
    }
  }
}

編譯打包后:

hdfs dfs -mkdir -p /wordcount/input
hdfs dfs -put test.txt /wordcount/input

執行wordcount jar
hadoop jar mapreduce-wordcount-0.0.1-SNAPSHOT.jar com/ice/hadoop/test/wordcount/WordCountMapReduce /wordcount/input /wordcount/output

執行完成后驗證
hdfs dfs -cat /wordcount/output/*

二、hadoop 序列化

hadoop 為什么不使用java序列化
Hadoop的序列化機制與java的序列化機制不同,它將對象序列化到流中,值得一提的是java的序列化機制是不斷的創建對象,但在Hadoop的序列化機制中,用戶可以復用對象,這樣就減少了java對象的分配和回收,提高了應用效率。

Hadoop定義了新的序列化接口——writable:

 package org.apache.hadoop.io

 import java.io.DataOutput
 import java.io.DataInput
 import java.io.IOException

 public interface Writable{
    void write(DataOutput out) throws IOException;
    void readFields(DataInput in) throws IOException;
 }

通過實現 Writable 接口,完成序列化與反序列化。

但更多的時候,Hadoop要求同時實現序列化與可對比性,因此更常見的情況下需要實現的是 WritableComparable 接口。同時給出默認的構造函數供 MapReduce 進行實例化。下面給出一個自定義Hadoop可序列化類的示例:

import java.io.*;
import org.apache.hadoop.io.*;
public class TextPair implements WritableComparable<TextPair> {

  private Text first;
  private Text second;
  public TextPair() {
    set(new Text(), new Text());
  }

  public TextPair(String first, String second) {
    set(new Text(first), new Text(second));
  }

  public TextPair(Text first, Text second) {
    set(first, second);
  }

  public void set(Text first, Text second) {
    this.first = first;
    this.second = second;
  }

  public Text getFirst() {
    return first;
  }

  public Text getSecond() {
    return second;
  }

  @Override
  public void write(DataOutput out) throws IOException {
    first.write(out);
    second.write(out);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    first.readFields(in);
    second.readFields(in);
  }

  @Override
  public int hashCode() {
    return first.hashCode() * 163 + second.hashCode();
  }

  @Override
  public boolean equals(Object o) {
    if (o instanceof TextPair) {
      TextPair tp = (TextPair) o;
      return first.equals(tp.first) && second.equals(tp.second);
    }
    return false;
  }

  @Override
  public String toString() {
    return first + "\t" + second;
  }

  @Override
  public int compareTo(TextPair tp) {
    int cmp = first.compareTo(tp.first);
    if (cmp != 0) {
      return cmp;
    }
    return second.compareTo(tp.second);
  }
}

2.1 需求與實現思路

需要統計手機用戶流量日志,日志內容實例

手機號 上行流量 下行流量
1252548225 200 1100
1345858685 300 1200
1862538225 400 1300
1545858645 100 300
1502236225 500 1300
1362858685 300 1100

要把同一個用戶的上行流量、下行流量進行累加,並計算出綜合。

例如上面的13897230503有兩條記錄,就要對這兩條記錄進行累加,計算總和,得到:

13897230503,500,1600,2100

2.2 實現思路

  • map
    接收日志的一行數據,key為行的偏移量,value為此行數據。
    輸出時,應以手機號為key,value應為一個整體,包括:上行流量、下行流量、總流量。
    手機號是字符串類型Text,而這個整體不能用基本數據類型表示,需要我們自定義一個bean對象,並且要實現可序列化。
    key: 13897230503
    value: < upFlow:100, dFlow:300, sumFlow:400 >
  • reduce
    接收一個手機號標識的key,及這個手機號對應的bean對象集合

例如:
key:
13897230503
value:
< upFlow:400, dFlow:1300, sumFlow:1700 >,
< upFlow:100, dFlow:300, sumFlow:400 >
迭代bean對象集合,累加各項,形成一個新的bean對象,例如:
< upFlow:400+100, dFlow:1300+300, sumFlow:1700+400 >
最后輸出:
key: 13897230503
value: < upFlow:500, dFlow:1600, sumFlow:2100 >

2.3 map程序

創建實體並實現Writable

package com.ice.hadoop.test.flowbean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

/**
 * @author:ice
 * @Date: 2019/2/22 0022
 */
public class FlowBean implements Writable {

  private long upFlow;
  private long dFlow;
  private long sumFlow;

  public FlowBean() {
  }

  public FlowBean(long upFlow, long dFlow) {
    this.upFlow = upFlow;
    this.dFlow = dFlow;
    this.sumFlow = upFlow + dFlow;
  }

  @Override
  public void write(DataOutput out) throws IOException {
    out.writeLong(upFlow);//wirte寫入的順序與read讀取順序
    out.writeLong(dFlow);
    out.writeLong(sumFlow);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    upFlow = in.readLong();
    dFlow = in.readLong();
    sumFlow = in.readLong();
  }

  public long getUpFlow() {
    return upFlow;
  }

  public void setUpFlow(long upFlow) {
    this.upFlow = upFlow;
  }

  public long getdFlow() {
    return dFlow;
  }

  public void setdFlow(long dFlow) {
    this.dFlow = dFlow;
  }

  public long getSumFlow() {
    return sumFlow;
  }

  public void setSumFlow(long sumFlow) {
    this.sumFlow = sumFlow;
  }

  @Override
  public String toString() {
    return "FlowBean{" +
        "upFlow=" + upFlow +
        ", dFlow=" + dFlow +
        ", sumFlow=" + sumFlow +
        '}';
  }
}

MapReduce程序:

package com.ice.hadoop.test.flowbean;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author:ice
 * @Date: 2019/2/22 0022
 */
public class FlowCount {

  static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      String line = value.toString();
      String[] fields = line.split(" ");
      String phone = fields[0];
      Long upFlow = Long.parseLong(fields[1]);
      Long dFlow = Long.parseLong(fields[2]);
      context.write(new Text(phone), new FlowBean(upFlow, dFlow));
    }
  }

  static class FlowCountReduce extends Reducer<Text, FlowBean, Text, FlowBean> {

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)
        throws IOException, InterruptedException {

      long sumUpFlow = 0L;
      long sumDFlow = 0L;
      for (FlowBean bean : values) {
        sumUpFlow += bean.getUpFlow();
        sumDFlow += bean.getdFlow();
      }

      FlowBean sumBean = new FlowBean(sumUpFlow, sumDFlow);
      context.write(key, sumBean);
    }
  }

  public static void main(String[] args) throws Exception {
    //創建配置對象
    Configuration conf = new Configuration();
    //創建Job對象
    Job job = Job.getInstance(conf, "FlowCount");
    //設置mapper類
    job.setMapperClass(FlowCountMapper.class);
    //設置 Reduce類
    job.setReducerClass(FlowCountReduce.class);

    //設置運行job類
    job.setJarByClass(FlowCount.class);

    //設置map輸出的key,value類型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(FlowBean.class);
    //設置reduce輸出的key,value類型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);

    //設置輸入路徑金額輸出路徑
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    //提交job
    boolean b = job.waitForCompletion(true);

    System.exit(b ? 0 : 1);
  }

}

編譯打包步驟是一樣的。

三、合並小文件

為什么要合並小文件,因為mapReduce會將每一個小文件都當做一個任務,當特別多的小文件時,導致創建非常多的任務從而效率損耗

如何實現:文件的讀取有map負責,為了將小文件合並,需要使用Inputformat,RecordReader,RecordReader負責實現一次讀取一個完整文件封裝為key value,map接收到文件內容,然后以文件名為key,以文件內容為value,向外輸出的格式要注意,要使用SequenceFileOutPutFormat(用來輸出對象)。

因為reduce收到的key value都是對象,不是普通的文本,reduce默認的輸出格式是TextOutputFormat,使用它的話,最終輸出的內容就是對象ID,所以要使用SequenceFileOutPutFormat進行輸出

3.1 代碼實踐

package com.ice.hadoop.test.mergefile;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;


public class MyInputFormat extends FileInputFormat<NullWritable, ByteWritable> {

  @Override
  public org.apache.hadoop.mapreduce.RecordReader<NullWritable, ByteWritable> createRecordReader(
      org.apache.hadoop.mapreduce.InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
      throws IOException, InterruptedException {
    MyRecordReader reader = new MyRecordReader();
    reader.initialize(inputSplit, taskAttemptContext);
    return null;
  }

  @Override
  protected boolean isSplitable(JobContext context, Path filename) {
    //設置每個小文件不可分割,保證一個小文件生成一個key-value鍵值對
    return false;
  }
}

createRecordReader方法中創建一個自定義的reader

package com.ice.hadoop.test.mergefile;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class MyRecordReader extends RecordReader<NullWritable, BytesWritable> {

  private FileSplit fileSplit;
  private Configuration conf;
  private BytesWritable value = new BytesWritable();
  private boolean processed = false;

  @Override
  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
      throws IOException, InterruptedException {
    this.fileSplit = (FileSplit) inputSplit;
    this.conf = context.getConfiguration();
  }

  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    if (!processed) {
      byte[] contents = new byte[(int) fileSplit.getLength()];
      Path file = fileSplit.getPath();
      FileSystem fs = file.getFileSystem(conf);
      FSDataInputStream in = null;

      try {
        in = fs.open(file);
        IOUtils.readFully(in, contents, 0, contents.length);
        value.set(contents, 0, contents.length);
      } finally {
        IOUtils.closeStream(in);
      }
      processed = true;
      return true;
    }
    return false;
  }

  @Override
  public NullWritable getCurrentKey() throws IOException, InterruptedException {
    return NullWritable.get();
  }

  @Override
  public BytesWritable getCurrentValue() throws IOException, InterruptedException {
    return value;
  }

  @Override
  public float getProgress() throws IOException, InterruptedException {
    return processed ? 1.0f : 0.0f;
  }

  @Override
  public void close() throws IOException {

  }
}

其中有3個核心方法:nextKeyValue、getCurrentKey、getCurrentValue。

nextKeyValue負責生成要傳遞給map方法的key和value。getCurrentKey、getCurrentValue是實際獲取key和value的。所以RecordReader的核心機制就是:通過nextKeyValue生成key value,然后通過getCurrentKey和getCurrentValue來返回上面構造好的key value。這里的nextKeyValue負責把整個文件內容作為value。

MapReduce程序:

package com.ice.hadoop.test.mergefile;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

/**
 * @author:ice
 * @Date: 2019/2/22 0022
 */
public class ManyToOne {

  static class FileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {

    private Text fileNameKey;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      InputSplit split = context.getInputSplit();
      Path path = ((FileSplit) split).getPath();
      fileNameKey = new Text(path.toString());
    }

    @Override
    protected void map(NullWritable key, BytesWritable value, Context context)
        throws IOException, InterruptedException {
      context.write(fileNameKey, value);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    job.setJarByClass(ManyToOne.class);

    job.setInputFormatClass(MyInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(BytesWritable.class);
    job.setMapperClass(FileMapper.class);

    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
  }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM