Hadoop——MapReduce(概述)


第一章 MapReduce概述

1.1 MapReduce定義

MapReduce是一個分布式運算程序的編程框架,是用戶開發“基於Hadoop的數據分析應用”的核心框架。

MapReduce核心功能是將用戶編寫的業務邏輯代碼自帶默認組件整合成一個完整的分布式運算程序,並發運行在一個Hadoop集群上。

1.2 MapReduce優缺點

1.2.1 優點

1)MapReduce易於編程
它簡單的實現一些接口,就可以完成一個分布式程序,這個分布式程序可以分布到大量廉價的PC機器上運行。也就是說你寫一個分布式程序,跟寫一個簡單的串行程序是一模一樣的。就是因為這個特點使得MapReduce編程變得非常流行。

2)良好的擴展性
當你的計算資源不能得到滿足的時候,你可以通過簡單的增加機器來擴展它的計算能力。

3)高容錯性
MapReduce設計的初衷就是使程序能夠部署在廉價的PC機器上,這就要求它具有很高的容錯性。比如其中一台機器掛了,它可以把上面的計算任務轉移到另外一個節點上運行,不至於這個任務運行失敗,而且這個過程不需要人工參與,而完全是由Hadoop內部完成的。

4)適合PB級以上海量數據的離線處理
可以實現上千台服務器集群並發工作,提供數據處理能力。

1.2.2 缺點

1)不擅長實時計算
MapReduce無法像MySQL一樣,在毫秒或者秒級內返回結果。

2)不擅長流式計算
流式計算的輸入數據是動態的,而MapReduce的輸入數據集是靜態的,不能動態變化。這是因為MapReduce自身的設計特點決定了數據源必須是靜態的。

3)不擅長DAG(有向無環圖)計算
多個應用程序存在依賴關系,后一個應用程序的輸入為前一個的輸出。在這種情況下,MapReduce並不是不能做,而是使用后,每個MapReduce作業的輸出結果都會寫入到磁盤,會造成大量的磁盤IO,導致性能非常的低下。

1.3 MapReduce核心思想

(1)分布式的運算程序往往需要分成至少2個階段。

(2)第一個階段的MapTask並發實例,完全並行運行,互不相干。

(3)第二個階段的ReduceTask並發實例互不相干,但是他們的數據依賴於上一個階段的所有MapTask並發實例的輸出。

(4)MapReduce編程模型只能包含一個Map階段和一個Reduce階段,如果用戶的業務邏輯非常復雜,那就只能多個MapReduce程序,串行運行。

總結:分析WordCount數據流走向深入理解MapReduce核心思想。

1.4 MapReduce進程

一個完整的MapReduce程序在分布式運行時有三類實例進程:

  • MrAppMaster:負責整個程序的過程調度及狀態協調。
  • MapTask:負責Map階段的整個數據處理流程。
  • ReduceTask:負責Reduce階段的整個數據處理流程。

1.5 官方WordCount源碼

采用反編譯工具反編譯源碼,發現WordCount案例有Map類、Reduce類和驅動類。且數據的類型是Hadoop自身封裝的序列化類型。

1.6 常用數據序列化類型

Java類型

Hadoop Writable類型

Boolean

BooleanWritable

Byte

ByteWritable

Int

IntWritable

Float

FloatWritable

Long

LongWritable

Double

DoubleWritable

String

Text

Map

MapWritable

Array

ArrayWritable

Null

NullWritable

1.7 MapReduce編程規范

用戶編寫的程序分成三個部分:Mapper、Reducer和Driver。

 

1.8 WordCount案例實操

1.8.1 本地測試

1)需求
在給定的文本文件中統計輸出每一個單詞出現的總次數
(1)輸入數據

atguigu atguigu
ss ss
cls cls
jiao
banzhang
xue
hadoop

(2)期望輸出數據

atguigu    2 banzhang 1 cls 2 hadoop 1 jiao 1 ss 2 xue 1

2)需求分析
按照MapReduce編程規范,分別編寫Mapper,Reducer,Driver。

 3)環境准備
(1)創建maven工程,MapReduceDemo
(2)在pom.xml文件中添加如下依賴

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>
</dependencies>

(3)在項目的src/main/resources目錄下,新建一個文件,命名為“log4j.properties”,在文件中填入。

log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

(4)創建包名:com.atguigu.mapreduce.wordcount

4)編寫程序
(1)編寫Mapper類

/**
 * KEYIN, map階段輸入的key的類型:LongWritable
 * VALUEIN,map階段輸入value類型:Text
 * KEYOUT,map階段輸出的Key類型:Text
 * VALUEOUT,map階段輸出的value類型:IntWritable
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private Text outK = new Text();
    private IntWritable outV = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1 獲取一行
        // atguigu atguigu
        String line = value.toString();

        // 2 切割
        // atguigu
        // atguigu
        String[] words = line.split(" ");

        // 3 循環寫出
        for (String word : words) {
            // 封裝outk
            outK.set(word);
            // 寫出
            context.write(outK, outV);
        }
    }
}

(2)編寫Reducer類

/**
 * KEYIN, reduce階段輸入的key的類型:Text
 * VALUEIN,reduce階段輸入value類型:IntWritable
 * KEYOUT,reduce階段輸出的Key類型:Text
 * VALUEOUT,reduce階段輸出的value類型:IntWritable
 */
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    private IntWritable outV = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int sum = 0;
        // atguigu, (1,1)
        // 累加
        for (IntWritable value : values) {
            sum += value.get();
        }

        outV.set(sum);

        // 寫出
        context.write(key,outV);
    }
}

(3)編寫Driver驅動類

public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 1 獲取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 設置jar包路徑
        job.setJarByClass(WordCountDriver.class);

        // 3 關聯mapper和reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 4 設置map輸出的kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 設置最終輸出的kV類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

//        job.setNumReduceTasks(2);

        // 6 設置輸入路徑和輸出路徑
        FileInputFormat.setInputPaths(job, new Path("D:\\input\\inputword"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output2222"));

        // 7 提交job
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    }
}

5)本地測試
(1)需要首先配置好HADOOP_HOME變量以及Windows運行依賴
(2)在IDEA/Eclipse上運行程序

1.8.2 提交到集群測試

集群上測試
(1)用maven打jar包,需要添加的打包插件依賴

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

注意:如果工程上顯示紅叉。在項目上右鍵->maven->Reimport刷新即可。
(2)將程序打成jar包

(3)修改不帶依賴的jar包名稱為wc.jar,並拷貝該jar包到Hadoop集群的/opt/module/hadoop-3.1.3路徑。
(4)啟動Hadoop集群

[atguigu@hadoop102 hadoop-3.1.3]sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh

(5)執行WordCount程序

[atguigu@hadoop102 hadoop-3.1.3]$ hadoop jar wc.jar com.atguigu.mapreduce.wordcount.WordCountDriver /user/atguigu/input /user/atguigu/output

第二章 Hadoop序列化

2.1 序列化概述

1)什么是序列化

序列化就是把內存中的對象,轉換成字節序列(或其他數據傳輸協議)以便於存儲到磁盤(持久化)和網絡傳輸。

反序列化就是將收到字節序列(或其他數據傳輸協議)或者是磁盤的持久化數據,轉換成內存中的對象。

2)為什么要序列化
一般來說,“活的”對象只生存在內存里,關機斷電就沒有了。而且“活的”對象只能由本地的進程使用,不能被發送到網絡上的另外一台計算機。 然而序列化可以存儲“活的”對象,可以將“活的”對象發送到遠程計算機

3)為什么不用Java的序列化
Java的序列化是一個重量級序列化框架(Serializable),一個對象被序列化后,會附帶很多額外的信息(各種校驗信息,Header,繼承體系等),不便於在網絡中高效傳輸。所以,Hadoop自己開發了一套序列化機制(Writable)。

4)Hadoop序列化特點:
(1)緊湊 :高效使用存儲空間。
(2)快速:讀寫數據的額外開銷小。
(3)互操作:支持多語言的交互

2.2 自定義bean對象實現序列化接口(Writable)

在企業開發中往往常用的基本序列化類型不能滿足所有需求,比如在Hadoop框架內部傳遞一個bean對象,那么該對象就需要實現序列化接口。
具體實現bean對象序列化步驟如下7步。
(1)必須實現Writable接口
(2)反序列化時,需要反射調用空參構造函數,所以必須有空參構造

public FlowBean() {
    super();
}

 (3)重寫序列化方法

@Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); }

(4)重寫反序列化方法

@Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); }

(5)注意反序列化的順序和序列化的順序完全一致
(6)要想把結果顯示在文件中,需要重寫toString(),可用"\t"分開,方便后續用。
(7)如果需要將自定義的bean放在key中傳輸,則還需要實現Comparable接口,因為MapReduce框中的Shuffle過程要求對key必須能排序。詳見后面排序案例。

@Override public int compareTo(FlowBean o) { // 倒序排列,從大到小
    return this.sumFlow > o.getSumFlow() ? -1 : 1; }

2.3 序列化案例實操   

1)需求

統計每一個手機號耗費的總上行流量、總下行流量、總流量

(1)輸入數據

1    13736230513    192.196.100.1    www.atguigu.com    2481    24681    200
2    13846544121    192.196.100.2            264    0    200
3     13956435636    192.196.100.3            132    1512    200
4     13966251146    192.168.100.1            240    0    404
5     18271575951    192.168.100.2    www.atguigu.com    1527    2106    200
6     84188413       192.168.100.3    www.atguigu.com    4116    1432    200
7     13590439668    192.168.100.4            1116    954    200
8     15910133277    192.168.100.5    www.hao123.com    3156    2936    200
9     13729199489    192.168.100.6            240    0    200
10     13630577991    192.168.100.7    www.shouhu.com    6960    690    200
11     15043685818    192.168.100.8    www.baidu.com    3659    3538    200
12     15959002129    192.168.100.9    www.atguigu.com    1938    180    500
13     13560439638    192.168.100.10            918    4938    200
14     13470253144    192.168.100.11            180    180    200
15     13682846555    192.168.100.12    www.qq.com    1938    2910    200
16     13992314666    192.168.100.13    www.gaga.com    3008    3720    200
17     13509468723    192.168.100.14    www.qinghua.com    7335    110349    404
18     18390173782    192.168.100.15    www.sogou.com    9531    2412    200
19     13975057813    192.168.100.16    www.baidu.com    11058    48243    200
20     13768778790    192.168.100.17            120    120    200
21     13568436656    192.168.100.18    www.alibaba.com    2481    24681    200
22     13568436656    192.168.100.19            1116    954    200

(2)輸入數據格式:

7 13560436666120.196.100.991116 954200

id手機號碼網絡ip上行流量  下行流量     網絡狀態

3)期望輸出數據格式

13560436666 1116      954 2070

手機號碼    上行流量        下行流量流量

2)需求分析

3)編寫MapReduce程序

(1)編寫流量統計的Bean對象

//1 繼承Writable接口
public class FlowBean implements Writable { private long upFlow; //上行流量
    private long downFlow; //下行流量
    private long sumFlow; //總流量 //2 提供無參構造
    public FlowBean() { } //3 提供三個參數的getter和setter方法
    public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public void setSumFlow() { this.sumFlow = this.upFlow + this.downFlow; } //4 實現序列化和反序列化方法,注意順序一定要保持一致
 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } //5 重寫ToString
 @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } }

(2)編寫Mapper類

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> { private Text outK = new Text(); private FlowBean outV = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1 獲取一行數據,轉成字符串
                String line = value.toString(); //2 切割數據
                String[] split = line.split("\t"); //3 抓取我們需要的數據:手機號,上行流量,下行流量
                String phone = split[1]; String up = split[split.length - 3]; String down = split[split.length - 2]; //4 封裝outK outV
 outK.set(phone); outV.setUpFlow(Long.parseLong(up)); outV.setDownFlow(Long.parseLong(down)); outV.setSumFlow(); //5 寫出outK outV
 context.write(outK, outV); } }

(3)編寫Reducer類

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> { private FlowBean outV = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long totalUp = 0; long totalDown = 0; //1 遍歷values,將其中的上行流量,下行流量分別累加
                for (FlowBean flowBean : values) { totalUp += flowBean.getUpFlow(); totalDown += flowBean.getDownFlow(); } //2 封裝outKV
 outV.setUpFlow(totalUp); outV.setDownFlow(totalDown); outV.setSumFlow(); //3 寫出outK outV
 context.write(key,outV); } }

(4)編寫Driver驅動類

 

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //1 獲取job對象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2 關聯本Driver類
        job.setJarByClass(FlowDriver.class);

        //3 關聯Mapper和Reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        
       //4 設置Map端輸出KV類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        
        //5 設置程序最終輸出的KV類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        
        //6 設置程序的輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path("D:\\inputflow"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\flowoutput"));
        
        //7 提交Job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM