大數據技術之Hadoop(MapReduce)


第1章 MapReduce概述

1.1 MapReduce定義

1.2 MapReduce優缺點

1.2.1 優點

1.2.2 缺點

1.3 MapReduce核心思想

MapReduce核心編程思想,如圖4-1所示。

圖4-1 MapReduce核心編程思想

1)分布式的運算程序往往需要分成至少2個階段。

2)第一個階段的MapTask並發實例,完全並行運行,互不相干。

3)第二個階段的ReduceTask並發實例互不相干,但是他們的數據依賴於上一個階段的所有MapTask並發實例的輸出。

4)MapReduce編程模型只能包含一個Map階段和一個Reduce階段,如果用戶的業務邏輯非常復雜,那就只能多個MapReduce程序,串行運行。

總結:分析WordCount數據流走向深入理解MapReduce核心思想。

1.4 MapReduce進程

1.5 官方WordCount源碼

采用反編譯工具反編譯源碼,發現WordCount案例有Map類、Reduce類和驅動類。且數據的類型是Hadoop自身封裝的序列化類型。

1.6 常用數據序列化類型

4-1 常用的數據類型對應的Hadoop數據序列化類型

Java類型

Hadoop Writable類型

boolean

BooleanWritable

byte

ByteWritable

int

IntWritable

float

FloatWritable

long

LongWritable

double

DoubleWritable

String

Text

map

MapWritable

array

ArrayWritable

1.7 MapReduce編程規范

用戶編寫的程序分成三個部分:Mapper、Reducer和Driver。

1.8 WordCount案例實操

1.需求

在給定的文本文件中統計輸出每一個單詞出現的總次數

(1)輸入數據

(2)期望輸出數據

atguigu    2

banzhang    1

cls    2

hadoop    1

jiao    1

ss    2

xue    1

2.需求分析

按照MapReduce編程規范,分別編寫Mapper,Reducer,Driver,如圖4-2所示。

圖4-2 需求分析

3.環境准備

(1)創建maven工程

(2)在pom.xml文件中添加如下依賴

<dependencies>

        <dependency>

            <groupId>junit</groupId>

            <artifactId>junit</artifactId>

            <version>RELEASE</version>

        </dependency>

        <dependency>

            <groupId>org.apache.logging.log4j</groupId>

            <artifactId>log4j-core</artifactId>

            <version>2.8.2</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.7.2</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-client</artifactId>

            <version>2.7.2</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.7.2</version>

        </dependency>

</dependencies>

(2)在項目的src/main/resources目錄下,新建一個文件,命名為"log4j.properties",在文件中填入。

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

log4j.appender.logfile=org.apache.log4j.FileAppender

log4j.appender.logfile.File=target/spring.log

log4j.appender.logfile.layout=org.apache.log4j.PatternLayout

log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

4.編寫程序

(1)編寫Mapper類

package com.atguigu.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

    

    Text k = new Text();

    IntWritable v = new IntWritable(1);

    

    @Override

    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {

        

        // 1 獲取一行

        String line = value.toString();

        

        // 2 切割

        String[] words = line.split(" ");

        

        // 3 輸出

        for (String word : words) {

            

            k.set(word);

            context.write(k, v);

        }

    }

}

(2)編寫Reducer類

package com.atguigu.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

 

int sum;

IntWritable v = new IntWritable();

 

    @Override

    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

        

        // 1 累加求和

        sum = 0;

        for (IntWritable count : values) {

            sum += count.get();

        }

        

        // 2 輸出

v.set(sum);

        context.write(key,v);

    }

}

(3)編寫Driver驅動類

package com.atguigu.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class WordcountDriver {

 

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

 

        // 1 獲取配置信息以及封裝任務

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration);

 

        // 2 設置jar加載路徑

        job.setJarByClass(WordcountDriver.class);

 

        // 3 設置mapreduce

        job.setMapperClass(WordcountMapper.class);

        job.setReducerClass(WordcountReducer.class);

 

        // 4 設置map輸出

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(IntWritable.class);

 

        // 5 設置最終輸出kv類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        

        // 6 設置輸入和輸出路徑

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

        // 7 提交

        boolean result = job.waitForCompletion(true);

 

        System.exit(result ? 0 : 1);

    }

}

5.本地測試

(1)如果電腦系統是win7的就將win7的hadoop jar包解壓到非中文路徑,並在Windows環境上配置HADOOP_HOME環境變量。如果是電腦win10操作系統,就解壓win10的hadoop jar包,並配置HADOOP_HOME環境變量。

注意:win8電腦和win10家庭版操作系統可能有問題,需要重新編譯源碼或者更改操作系統。

(2)在Eclipse/Idea上運行程序

6.集群上測試

(0)用maven打jar包,需要添加的打包插件依賴

注意:標記紅顏色的部分需要替換為自己工程主類

<build>

        <plugins>

            <plugin>

                <artifactId>maven-compiler-plugin</artifactId>

                <version>2.3.2</version>

                <configuration>

                    <source>1.8</source>

                    <target>1.8</target>

                </configuration>

            </plugin>

            <plugin>

                <artifactId>maven-assembly-plugin </artifactId>

                <configuration>

                    <descriptorRefs>

                        <descriptorRef>jar-with-dependencies</descriptorRef>

                    </descriptorRefs>

                    <archive>

                        <manifest>

                            <mainClass>com.atguigu.mr.WordcountDriver</mainClass>

                        </manifest>

                    </archive>

                </configuration>

                <executions>

                    <execution>

                        <id>make-assembly</id>

                        <phase>package</phase>

                        <goals>

                            <goal>single</goal>

                        </goals>

                    </execution>

                </executions>

            </plugin>

        </plugins>

    </build>

注意:如果工程上顯示紅叉。在項目上右鍵->maven->update project即可。

(1)將程序打成jar包,然后拷貝到Hadoop集群中

步驟詳情:右鍵->Run as->maven install。等待編譯完成就會在項目的target文件夾中生成jar包。如果看不到。在項目上右鍵-》Refresh,即可看到。修改不帶依賴的jar包名稱為wc.jar,並拷貝該jar包到Hadoop集群。

(2)啟動Hadoop集群

(3)執行WordCount程序

[atguigu@hadoop102 software]$ hadoop jar wc.jar

com.atguigu.wordcount.WordcountDriver /user/atguigu/input /user/atguigu/output

第2章 Hadoop序列化

2.1 序列化概述

 

2.2 自定義bean對象實現序列化接口(Writable

在企業開發中往往常用的基本序列化類型不能滿足所有需求,比如在Hadoop框架內部傳遞一個bean對象,那么該對象就需要實現序列化接口。

具體實現bean對象序列化步驟如下7步。

(1)必須實現Writable接口

(2)反序列化時,需要反射調用空參構造函數,所以必須有空參構造

public FlowBean() {

    super();

}

(3)重寫序列化方法

@Override

public void write(DataOutput out) throws IOException {

    out.writeLong(upFlow);

    out.writeLong(downFlow);

    out.writeLong(sumFlow);

}

(4)重寫反序列化方法

@Override

public void readFields(DataInput in) throws IOException {

    upFlow = in.readLong();

    downFlow = in.readLong();

    sumFlow = in.readLong();

}

(5)注意反序列化的順序和序列化的順序完全一致

(6)要想把結果顯示在文件中,需要重寫toString(),可用"\t"分開,方便后續用。

(7)如果需要將自定義的bean放在key中傳輸,則還需要實現Comparable接口,因為MapReduce框中的Shuffle過程要求對key必須能排序。詳見后面排序案例。

@Override

public int compareTo(FlowBean o) {

    // 倒序排列,從大到小

    return this.sumFlow > o.getSumFlow() ? -1 : 1;

}

2.3 序列化案例實操    

1.    需求

統計每一個手機號耗費的總上行流量、下行流量、總流量

(1)輸入數據

(2)輸入數據格式:

7     13560436666    120.196.100.99        1116         954            200

id    手機號碼        網絡ip            上行流量 下行流量 網絡狀態碼

(3)期望輸出數據格式

13560436666         1116         954             2070

手機號碼         上行流量 下行流量        總流量

2.需求分析

3.編寫MapReduce程序

(1)編寫流量統計的Bean對象

package com.atguigu.mapreduce.flowsum;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.Writable;

 

// 1 實現writable接口

public class FlowBean implements Writable{

 

    private long upFlow;

    private long downFlow;

    private long sumFlow;

    

    //2 反序列化時,需要反射調用空參構造函數,所以必須有

    public FlowBean() {

        super();

    }

 

    public FlowBean(long upFlow, long downFlow) {

        super();

        this.upFlow = upFlow;

        this.downFlow = downFlow;

        this.sumFlow = upFlow + downFlow;

    }

    

    //3 寫序列化方法

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeLong(upFlow);

        out.writeLong(downFlow);

        out.writeLong(sumFlow);

    }

    

    //4 反序列化方法

    //5 反序列化方法讀順序必須和寫序列化方法的寫順序必須一致

    @Override

    public void readFields(DataInput in) throws IOException {

        this.upFlow = in.readLong();

        this.downFlow = in.readLong();

        this.sumFlow = in.readLong();

    }

 

    // 6 編寫toString方法,方便后續打印到文本

    @Override

    public String toString() {

        return upFlow + "\t" + downFlow + "\t" + sumFlow;

    }

 

    public long getUpFlow() {

        return upFlow;

    }

 

    public void setUpFlow(long upFlow) {

        this.upFlow = upFlow;

    }

 

    public long getDownFlow() {

        return downFlow;

    }

 

    public void setDownFlow(long downFlow) {

        this.downFlow = downFlow;

    }

 

    public long getSumFlow() {

        return sumFlow;

    }

 

    public void setSumFlow(long sumFlow) {

        this.sumFlow = sumFlow;

    }

}

(2)編寫Mapper類

package com.atguigu.mapreduce.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

    

    FlowBean v = new FlowBean();

    Text k = new Text();

    

    @Override

    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {

        

        // 1 獲取一行

        String line = value.toString();

        

        // 2 切割字段

        String[] fields = line.split("\t");

        

        // 3 封裝對象

        // 取出手機號碼

        String phoneNum = fields[1];

 

        // 取出上行流量和下行流量

        long upFlow = Long.parseLong(fields[fields.length - 3]);

        long downFlow = Long.parseLong(fields[fields.length - 2]);

 

        k.set(phoneNum);

        v.set(downFlow, upFlow);

        

        // 4 寫出

        context.write(k, v);

    }

}

(3)編寫Reducer類

package com.atguigu.mapreduce.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

 

    @Override

    protected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {

 

        long sum_upFlow = 0;

        long sum_downFlow = 0;

 

        // 1 遍歷所用bean,將其中的上行流量,下行流量分別累加

        for (FlowBean flowBean : values) {

            sum_upFlow += flowBean.getUpFlow();

            sum_downFlow += flowBean.getDownFlow();

        }

 

        // 2 封裝對象

        FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);

        

        // 3 寫出

        context.write(key, resultBean);

    }

}

(4)編寫Driver驅動類

package com.atguigu.mapreduce.flowsum;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class FlowsumDriver {

 

    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

        

// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置

args = new String[] { "e:/input/inputflow", "e:/output1" };

 

        // 1 獲取配置信息,或者job對象實例

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration);

 

        // 6 指定本程序的jar包所在的本地路徑

        job.setJarByClass(FlowsumDriver.class);

 

        // 2 指定本業務job要使用的mapper/Reducer業務類

        job.setMapperClass(FlowCountMapper.class);

        job.setReducerClass(FlowCountReducer.class);

 

        // 3 指定mapper輸出數據的kv類型

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(FlowBean.class);

 

        // 4 指定最終輸出的數據的kv類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(FlowBean.class);

        

        // 5 指定job的輸入原始文件所在目錄

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

        // 7 job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

第3章 MapReduce框架原理

3.1 InputFormat數據輸入

3.1.1 切片與MapTask並行度決定機制

1.問題引出

MapTask的並行度決定Map階段的任務處理並發度,進而影響到整個Job的處理速度。

思考:1G的數據,啟動8個MapTask,可以提高集群的並發處理能力。那么1K的數據,也啟動8個MapTask,會提高集群性能嗎?MapTask並行任務是否越多越好呢?哪些因素影響了MapTask並行度?

2MapTask並行度決定機制

數據塊:Block是HDFS物理上把數據分成一塊一塊。

數據切片:數據切片只是在邏輯上對輸入進行分片,並不會在磁盤上將其切分成片進行存儲。

圖4-11 MapTask並行度決定機制

3.1.2 Job提交流程源碼和切片源碼詳解

1Job提交流程源碼詳解,如圖4-8所示

waitForCompletion()

 

submit();

 

// 1建立連接

    connect();    

        // 1)創建提交Job的代理

        new Cluster(getConfiguration());

            // 1)判斷是本地yarn還是遠程

            initialize(jobTrackAddr, conf);

 

// 2 提交job

submitter.submitJobInternal(Job.this, cluster)

    // 1)創建給集群提交數據的Stag路徑

    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

 

    // 2)獲取jobid ,並創建Job路徑

    JobID jobId = submitClient.getNewJobID();

 

    // 3)拷貝jar包到集群

copyAndConfigureFiles(job, submitJobDir);    

    rUploader.uploadFiles(job, jobSubmitDir);

 

// 4)計算切片,生成切片規划文件

writeSplits(job, submitJobDir);

        maps = writeNewSplits(job, jobSubmitDir);

        input.getSplits(job);

 

// 5)向Stag路徑寫XML配置文件

writeConf(conf, submitJobFile);

    conf.writeXml(out);

 

// 6)提交Job,返回提交狀態

status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

圖4-8 Job提交流程源碼分析

2FileInputFormat切片源碼解析(input.getSplits(job))

3.1.3 FileInputFormat切片機制

3.1.4 CombineTextInputFormat切片機制

框架默認的TextInputFormat切片機制是對任務按文件規划切片,不管文件多小,都會是一個單獨的切片,都會交給一個MapTask,這樣如果有大量小文件,就會產生大量的MapTask,處理效率極其低下。

1、應用場景:

CombineTextInputFormat用於小文件過多的場景,它可以將多個小文件從邏輯上規划到一個切片中,這樣,多個小文件就可以交給一個MapTask處理。

2、虛擬存儲切片最大值設置

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

注意:虛擬存儲切片最大值設置最好根據實際的小文件大小情況來設置具體的值。

3、切片機制

生成切片過程包括:虛擬存儲過程和切片過程二部分。

(1)虛擬存儲過程:

將輸入目錄下所有文件大小,依次和設置的setMaxInputSplitSize值比較,如果不大於設置的最大值,邏輯上划分一個塊。如果輸入文件大於設置的最大值且大於兩倍,那么以最大值切割一塊;當剩余數據大小超過設置的最大值且不大於最大值2倍,此時將文件均分成2個虛擬存儲塊(防止出現太小切片)。

例如setMaxInputSplitSize值為4M,輸入文件大小為8.02M,則先邏輯上分成一個4M。剩余的大小為4.02M,如果按照4M邏輯划分,就會出現0.02M的小的虛擬存儲文件,所以將剩余的4.02M文件切分成(2.01M和2.01M)兩個文件。

(2)切片過程:

(a)判斷虛擬存儲的文件大小是否大於setMaxInputSplitSize值,大於等於則單獨形成一個切片。

(b)如果不大於則跟下一個虛擬存儲文件進行合並,共同形成一個切片。

(c)測試舉例:有4個小文件大小分別為1.7M、5.1M、3.4M以及6.8M這四個小文件,則虛擬存儲之后形成6個文件塊,大小分別為:

1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)

最終會形成3個切片,大小分別為:

(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

3.1.5 CombineTextInputFormat案例實操

1.需求

將輸入的大量小文件合並成一個切片統一處理。

(1)輸入數據

准備4個小文件

(2)期望

期望一個切片處理4個文件

2.實現過程

(1)不做任何處理,運行1.6節的WordCount案例程序,觀察切片個數為4。

(2)在WordcountDriver中增加如下代碼,運行程序,並觀察運行的切片個數為3。

(a)驅動類中添加代碼如下:

// 如果不設置InputFormat,它默認用的是TextInputFormat.class

job.setInputFormatClass(CombineTextInputFormat.class);

 

//虛擬存儲切片最大值設置4m

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

        (b)運行如果為3個切片。

(3)在WordcountDriver中增加如下代碼,運行程序,並觀察運行的切片個數為1。

        (a)驅動中添加代碼如下:

// 如果不設置InputFormat,它默認用的是TextInputFormat.class

job.setInputFormatClass(CombineTextInputFormat.class);

 

//虛擬存儲切片最大值設置20m

CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);

(b)運行如果為1個切片。

3.1.6 FileInputFormat實現類

3.1.7 KeyValueTextInputFormat使用案例

1.需求

統計輸入文件中每一行的第一個單詞相同的行數。

(1)輸入數據

banzhang ni hao

xihuan hadoop banzhang

banzhang ni hao

xihuan hadoop banzhang

(2)期望結果數據

banzhang    2

xihuan    2

2.需求分析

3.代碼實現

(1)編寫Mapper類

package com.atguigu.mapreduce.KeyValueTextInputFormat;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class KVTextMapper extends Mapper<Text, Text, Text, LongWritable>{

    

// 1 設置value

LongWritable v = new LongWritable(1);

 

    @Override

    protected void map(Text key, Text value, Context context)

            throws IOException, InterruptedException {

 

// banzhang ni hao

 

// 2 寫出

context.write(key, v);

    }

}

(2)編寫Reducer類

package com.atguigu.mapreduce.KeyValueTextInputFormat;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class KVTextReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

      

LongWritable v = new LongWritable();

 

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values,    Context context) throws IOException, InterruptedException {

        

         long sum = 0L;

 

         // 1 匯總統計

for (LongWritable value : values) {

sum += value.get();

}

 

v.set(sum);

 

// 2 輸出

context.write(key, v);

    }

}

(3)編寫Driver類

package com.atguigu.mapreduce.keyvaleTextInputFormat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class KVTextDriver {

 

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        

        Configuration conf = new Configuration();

        // 設置切割符

    conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");

        // 1 獲取job對象

        Job job = Job.getInstance(conf);

        

        // 2 設置jar包位置,關聯mapperreducer

        job.setJarByClass(KVTextDriver.class);

        job.setMapperClass(KVTextMapper.class);

job.setReducerClass(KVTextReducer.class);

                

        // 3 設置map輸出kv類型

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(LongWritable.class);

 

        // 4 設置最終輸出kv類型

        job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

        

        // 5 設置輸入輸出數據路徑

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        

        // 設置輸入格式

    job.setInputFormatClass(KeyValueTextInputFormat.class);

        

        // 6 設置輸出數據路徑

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        

        // 7 提交job

        job.waitForCompletion(true);

    }

}

3.1.8 NLineInputFormat使用案例

1.需求

對每個單詞進行個數統計,要求根據每個輸入文件的行數來規定輸出多少個切片。此案例要求每三行放入一個切片中。

(1)輸入數據

banzhang ni hao

xihuan hadoop banzhang

banzhang ni hao

xihuan hadoop banzhang

banzhang ni hao

xihuan hadoop banzhang

banzhang ni hao

xihuan hadoop banzhang

banzhang ni hao

xihuan hadoop banzhang banzhang ni hao

xihuan hadoop banzhang

(2)期望輸出數據

Number of splits:4

2.需求分析

3.代碼實現

(1)編寫Mapper類

package com.atguigu.mapreduce.nline;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class NLineMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    

    private Text k = new Text();

    private LongWritable v = new LongWritable(1);

    

    @Override

    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {

        

         // 1 獲取一行

String line = value.toString();

 

// 2 切割

String[] splited = line.split(" ");

 

// 3 循環寫出

for (int i = 0; i < splited.length; i++) {

      

    k.set(splited[i]);

      

context.write(k, v);

}

    }

}

(2)編寫Reducer類

package com.atguigu.mapreduce.nline;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class NLineReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    

    LongWritable v = new LongWritable();

    

    @Override

    protected void reduce(Text key, Iterable<LongWritable> values,    Context context) throws IOException, InterruptedException {

          

long sum = 0l;

 

// 1 匯總

for (LongWritable value : values) {

sum += value.get();

}

 

v.set(sum);

 

// 2 輸出

context.write(key, v);

    }

}

(3)編寫Driver類

package com.atguigu.mapreduce.nline;

import java.io.IOException;

import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class NLineDriver {

    

    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {

        

// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置

args = new String[] { "e:/input/inputword", "e:/output1" };

 

         // 1 獲取job對象

         Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);

 

// 7設置每個切片InputSplit中划分三條記錄

NLineInputFormat.setNumLinesPerSplit(job, 3);

 

// 8使用NLineInputFormat處理記錄數

job.setInputFormatClass(NLineInputFormat.class);

 

// 2設置jar包位置,關聯mapperreducer

job.setJarByClass(NLineDriver.class);

job.setMapperClass(NLineMapper.class);

job.setReducerClass(NLineReducer.class);

 

// 3設置map輸出kv類型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(LongWritable.class);

 

// 4設置最終輸出kv類型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

 

// 5設置輸入輸出數據路徑

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 6提交job

job.waitForCompletion(true);

    }

}

4.測試

(1)輸入數據

banzhang ni hao

xihuan hadoop banzhang

banzhang ni hao

xihuan hadoop banzhang

banzhang ni hao

xihuan hadoop banzhang

banzhang ni hao

xihuan hadoop banzhang

banzhang ni hao

xihuan hadoop banzhang banzhang ni hao

xihuan hadoop banzhang

(2)輸出結果的切片數,如圖4-10所示:

圖4-10 輸出結果的切片數

3.1.9 自定義InputFormat

3.1.10 自定義InputFormat案例實操

無論HDFS還是MapReduce,在處理小文件時效率都非常低,但又難免面臨處理大量小文件的場景,此時,就需要有相應解決方案。可以自定義InputFormat實現小文件的合並。

1.需求

將多個小文件合並成一個SequenceFile文件(SequenceFile文件是Hadoop用來存儲二進制形式的key-value對的文件格式),SequenceFile里面存儲着多個文件,存儲的形式為文件路徑+名稱為key,文件內容為value。

(1)輸入數據

(2)期望輸出文件格式

2.需求分析

3.程序實現

(1)自定義InputFromat

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.JobContext;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

 

// 定義類繼承FileInputFormat

public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{

    

    @Override

    protected boolean isSplitable(JobContext context, Path filename) {

        return false;

    }

 

    @Override

    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)    throws IOException, InterruptedException {

        

        WholeRecordReader recordReader = new WholeRecordReader();

        recordReader.initialize(split, context);

        

        return recordReader;

    }

}

(2)自定義RecordReader類

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

public class WholeRecordReader extends RecordReader<Text, BytesWritable>{

 

    private Configuration configuration;

    private FileSplit split;

    

    private boolean isProgress= true;

    private BytesWritable value = new BytesWritable();

    private Text k = new Text();

 

    @Override

    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        

        this.split = (FileSplit)split;

        configuration = context.getConfiguration();

    }

 

    @Override

    public boolean nextKeyValue() throws IOException, InterruptedException {

        

        if (isProgress) {

 

            // 1 定義緩存區

            byte[] contents = new byte[(int)split.getLength()];

            

            FileSystem fs = null;

            FSDataInputStream fis = null;

            

            try {

                // 2 獲取文件系統

                Path path = split.getPath();

                fs = path.getFileSystem(configuration);

                

                // 3 讀取數據

                fis = fs.open(path);

                

                // 4 讀取文件內容

                IOUtils.readFully(fis, contents, 0, contents.length);

                

                // 5 輸出文件內容

                value.set(contents, 0, contents.length);

 

// 6 獲取文件路徑及名稱

String name = split.getPath().toString();

 

// 7 設置輸出的key

k.set(name);

 

            } catch (Exception e) {

                

            }finally {

                IOUtils.closeStream(fis);

            }

            

            isProgress = false;

            

            return true;

        }

        

        return false;

    }

 

    @Override

    public Text getCurrentKey() throws IOException, InterruptedException {

        return k;

    }

 

    @Override

    public BytesWritable getCurrentValue() throws IOException, InterruptedException {

        return value;

    }

 

    @Override

    public float getProgress() throws IOException, InterruptedException {

        return 0;

    }

 

    @Override

    public void close() throws IOException {

    }

}

(3)編寫SequenceFileMapper類處理流程

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{

    

    @Override

    protected void map(Text key, BytesWritable value,            Context context)        throws IOException, InterruptedException {

 

        context.write(key, value);

    }

}

(4)編寫SequenceFileReducer類處理流程

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {

 

    @Override

    protected void reduce(Text key, Iterable<BytesWritable> values, Context context)        throws IOException, InterruptedException {

 

        context.write(key, values.iterator().next());

    }

}

(5)編寫SequenceFileDriver類處理流程

package com.atguigu.mapreduce.inputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

 

public class SequenceFileDriver {

 

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

          

// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置

        args = new String[] { "e:/input/inputinputformat", "e:/output1" };

 

// 1 獲取job對象

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

 

// 2 設置jar包存儲位置、關聯自定義的mapperreducer

        job.setJarByClass(SequenceFileDriver.class);

        job.setMapperClass(SequenceFileMapper.class);

        job.setReducerClass(SequenceFileReducer.class);

 

// 7設置輸入的inputFormat

        job.setInputFormatClass(WholeFileInputformat.class);

 

// 8設置輸出的outputFormat

     job.setOutputFormatClass(SequenceFileOutputFormat.class);

 

// 3 設置map輸出端的kv類型

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(BytesWritable.class);

          

// 4 設置最終輸出端的kv類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(BytesWritable.class);

 

// 5 設置輸入輸出路徑

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

// 6 提交job

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

3.2 MapReduce工作流程

1.流程示意圖,如圖4-64-7所示

圖4-6 MapReduce詳細工作流程(一)

圖4-7 MapReduce詳細工作流程(二)

2.流程詳解

上面的流程是整個MapReduce最全工作流程,但是Shuffle過程只是從第7步開始到第16步結束,具體Shuffle過程詳解,如下:

1)MapTask收集我們的map()方法輸出的kv對,放到內存緩沖區中

2)從內存緩沖區不斷溢出本地磁盤文件,可能會溢出多個文件

3)多個溢出文件會被合並成大的溢出文件

4)在溢出過程及合並的過程中,都要調用Partitioner進行分區和針對key進行排序

5)ReduceTask根據自己的分區號,去各個MapTask機器上取相應的結果分區數據

6)ReduceTask會取到同一個分區的來自不同MapTask的結果文件,ReduceTask會將這些文件再進行合並(歸並排序)

7)合並成大文件后,Shuffle的過程也就結束了,后面進入ReduceTask的邏輯運算過程(從文件中取出一個一個的鍵值對Group,調用用戶自定義的reduce()方法)

3.注意

Shuffle中的緩沖區大小會影響到MapReduce程序的執行效率,原則上說,緩沖區越大,磁盤io的次數越少,執行速度就越快。

緩沖區的大小可以通過參數調整,參數:io.sort.mb默認100M。

4.源碼解析流程

context.write(k, NullWritable.get());

output.write(key, value);

collector.collect(key, value,partitioner.getPartition(key, value, partitions));

    HashPartitioner();

collect()

    close()

    collect.flush()

sortAndSpill()

    sort() QuickSort

mergeParts();

    

collector.close();

3.3 Shuffle機制

3.3.1 Shuffle機制

Map方法之后,Reduce方法之前的數據處理過程稱之為Shuffle。如圖4-14所示。

圖4-14 Shuffle機制

3.3.2 Partition分區

3.3.3 Partition分區案例實操

1.需求

將統計結果按照手機歸屬地不同省份輸出到不同文件中(分區)

(1)輸入數據

    

(2)期望輸出數據

    手機號136、137、138、139開頭都分別放到一個獨立的4個文件中,其他開頭的放到一個文件中。

2.需求分析

3.在案例2.4的基礎上,增加一個分區類

package com.atguigu.mapreduce.flowsum;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

 

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

 

    @Override

    public int getPartition(Text key, FlowBean value, int numPartitions) {

 

        // 1 獲取電話號碼的前三位

        String preNum = key.toString().substring(0, 3);

        

        int partition = 4;

        

        // 2 判斷是哪個省

        if ("136".equals(preNum)) {

            partition = 0;

        }else if ("137".equals(preNum)) {

            partition = 1;

        }else if ("138".equals(preNum)) {

            partition = 2;

        }else if ("139".equals(preNum)) {

            partition = 3;

        }

 

        return partition;

    }

}

4.在驅動函數中增加自定義數據分區設置和ReduceTask設置

package com.atguigu.mapreduce.flowsum;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class FlowsumDriver {

 

    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

 

        // 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置

        args = new String[]{"e:/output1","e:/output2"};

 

        // 1 獲取配置信息,或者job對象實例

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration);

 

        // 2 指定本程序的jar包所在的本地路徑

        job.setJarByClass(FlowsumDriver.class);

 

        // 3 指定本業務job要使用的mapper/Reducer業務類

        job.setMapperClass(FlowCountMapper.class);

        job.setReducerClass(FlowCountReducer.class);

 

        // 4 指定mapper輸出數據的kv類型

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(FlowBean.class);

 

        // 5 指定最終輸出的數據的kv類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(FlowBean.class);

 

        // 8 指定自定義數據分區

        job.setPartitionerClass(ProvincePartitioner.class);

 

        // 9 同時指定相應數量的reduce task

        job.setNumReduceTasks(5);

        

        // 6 指定job的輸入原始文件所在目錄

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

        // 7 job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

3.3.4 WritableComparable排序

1.排序的分類

2.自定義排序WritableComparable

(1)原理分析

bean對象做為key傳輸,需要實現WritableComparable接口重寫compareTo方法,就可以實現排序。

@Override

public int compareTo(FlowBean o) {

 

    int result;

          

    // 按照總流量大小,倒序排列

    if (sumFlow > bean.getSumFlow()) {

        result = -1;

    }else if (sumFlow < bean.getSumFlow()) {

        result = 1;

    }else {

        result = 0;

    }

 

    return result;

}

3.3.5 WritableComparable排序案例實操(全排序)

1.需求

根據案例2.3產生的結果再次對總流量進行排序。

(1)輸入數據

原始數據 第一次處理后的數據

(2)期望輸出數據

13509468723    7335    110349    117684

13736230513    2481    24681    27162

13956435636    132        1512    1644

13846544121    264        0        264

。。。 。。。

2.需求分析

    

3.代碼實現

(1)FlowBean對象在在需求1基礎上增加了比較功能

package com.atguigu.mapreduce.sort;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

 

public class FlowBean implements WritableComparable<FlowBean> {

 

    private long upFlow;

    private long downFlow;

    private long sumFlow;

 

    // 反序列化時,需要反射調用空參構造函數,所以必須有

    public FlowBean() {

        super();

    }

 

    public FlowBean(long upFlow, long downFlow) {

        super();

        this.upFlow = upFlow;

        this.downFlow = downFlow;

        this.sumFlow = upFlow + downFlow;

    }

 

    public void set(long upFlow, long downFlow) {

        this.upFlow = upFlow;

        this.downFlow = downFlow;

        this.sumFlow = upFlow + downFlow;

    }

 

    public long getSumFlow() {

        return sumFlow;

    }

 

    public void setSumFlow(long sumFlow) {

        this.sumFlow = sumFlow;

    }    

 

    public long getUpFlow() {

        return upFlow;

    }

 

    public void setUpFlow(long upFlow) {

        this.upFlow = upFlow;

    }

 

    public long getDownFlow() {

        return downFlow;

    }

 

    public void setDownFlow(long downFlow) {

        this.downFlow = downFlow;

    }

 

    /**

     * 序列化方法

     * @param out

     * @throws IOException

     */

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeLong(upFlow);

        out.writeLong(downFlow);

        out.writeLong(sumFlow);

    }

 

    /**

     * 反序列化方法 注意反序列化的順序和序列化的順序完全一致

     * @param in

     * @throws IOException

     */

    @Override

    public void readFields(DataInput in) throws IOException {

        upFlow = in.readLong();

        downFlow = in.readLong();

        sumFlow = in.readLong();

    }

 

    @Override

    public String toString() {

        return upFlow + "\t" + downFlow + "\t" + sumFlow;

    }

 

    @Override

    public int compareTo(FlowBean o) {

        

        int result;

        

        // 按照總流量大小,倒序排列

        if (sumFlow > bean.getSumFlow()) {

            result = -1;

        }else if (sumFlow < bean.getSumFlow()) {

            result = 1;

        }else {

            result = 0;

        }

 

        return result;

    }

}

    (2)編寫Mapper類

package com.atguigu.mapreduce.sort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{

 

    FlowBean bean = new FlowBean();

    Text v = new Text();

 

    @Override

    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {

 

        // 1 獲取一行

        String line = value.toString();

        

        // 2 截取

        String[] fields = line.split("\t");

        

        // 3 封裝對象

        String phoneNbr = fields[0];

        long upFlow = Long.parseLong(fields[1]);

        long downFlow = Long.parseLong(fields[2]);

        

        bean.set(upFlow, downFlow);

        v.set(phoneNbr);

        

        // 4 輸出

        context.write(bean, v);

    }

}

    (3)編寫Reducer類

package com.atguigu.mapreduce.sort;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{

 

    @Override

    protected void reduce(FlowBean key, Iterable<Text> values, Context context)    throws IOException, InterruptedException {

        

        // 循環輸出,避免總流量相同情況

        for (Text text : values) {

            context.write(text, key);

        }

    }

}

    (4)編寫Driver類

package com.atguigu.mapreduce.sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class FlowCountSortDriver {

 

    public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {

 

        // 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置

        args = new String[]{"e:/output1","e:/output2"};

 

        // 1 獲取配置信息,或者job對象實例

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration);

 

        // 2 指定本程序的jar包所在的本地路徑

        job.setJarByClass(FlowCountSortDriver.class);

 

        // 3 指定本業務job要使用的mapper/Reducer業務類

        job.setMapperClass(FlowCountSortMapper.class);

        job.setReducerClass(FlowCountSortReducer.class);

 

        // 4 指定mapper輸出數據的kv類型

        job.setMapOutputKeyClass(FlowBean.class);

        job.setMapOutputValueClass(Text.class);

 

        // 5 指定最終輸出的數據的kv類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(FlowBean.class);

 

        // 6 指定job的輸入原始文件所在目錄

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        

        // 7 job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

3.3.6 WritableComparable排序案例實操(區內排序)

1.需求

要求每個省份手機號輸出的文件中按照總流量內部排序。

2.需求分析

    基於前一個需求,增加自定義分區類,分區按照省份手機號設置。

3.案例實操

(1)增加自定義分區類

package com.atguigu.mapreduce.sort;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

 

public class ProvincePartitioner extends Partitioner<FlowBean, Text> {

 

    @Override

    public int getPartition(FlowBean key, Text value, int numPartitions) {

        

        // 1 獲取手機號碼前三位

        String preNum = value.toString().substring(0, 3);

        

        int partition = 4;

        

        // 2 根據手機號歸屬地設置分區

        if ("136".equals(preNum)) {

            partition = 0;

        }else if ("137".equals(preNum)) {

            partition = 1;

        }else if ("138".equals(preNum)) {

            partition = 2;

        }else if ("139".equals(preNum)) {

            partition = 3;

        }

 

        return partition;

    }

}

(2)在驅動類中添加分區類

// 加載自定義分區類

job.setPartitionerClass(ProvincePartitioner.class);

 

// 設置Reducetask個數

job.setNumReduceTasks(5);

3.3.7 Combiner合並

(6)自定義Combiner實現步驟

(a)自定義一個Combiner繼承Reducer,重寫Reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{

 

    @Override

    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

 

// 1 匯總操作

        int count = 0;

        for(IntWritable v :values){

            count += v.get();

        }

 

// 2 寫出

        context.write(key, new IntWritable(count));

    }

}

(b)在Job驅動類中設置:

job.setCombinerClass(WordcountCombiner.class);

3.3.8 Combiner合並案例實操

1.需求

統計過程中對每一個MapTask的輸出進行局部匯總,以減小網絡傳輸量即采用Combiner功能。

(1)數據輸入

(2)期望輸出數據

期望:Combine輸入數據多,輸出時經過合並,輸出數據降低。

2.需求分析

圖4-15 Combiner的合並案例

3.案例實操-方案一

1)增加一個WordcountCombiner類繼承Reducer

package com.atguigu.mr.combiner;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

 

IntWritable v = new IntWritable();

 

    @Override

    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

 

// 1 匯總

        int sum = 0;

 

        for(IntWritable value :values){

            sum += value.get();

        }

 

        v.set(sum);

 

        // 2 寫出

        context.write(key, v);

    }

}

2)在WordcountDriver驅動類中指定Combiner

// 指定需要使用combiner,以及用哪個類作為combiner的邏輯

job.setCombinerClass(WordcountCombiner.class);

4.案例實操-方案二

1)將WordcountReducer作為Combiner在WordcountDriver驅動類中指定

// 指定需要使用Combiner,以及用哪個類作為Combiner的邏輯

job.setCombinerClass(WordcountReducer.class);

運行程序,如圖4-16,4-17所示

圖4-16 未使用前

圖4-17 使用后

3.3.9 GroupingComparator分組(輔助排序)

對Reduce階段的數據根據某一個或幾個字段進行分組。

分組排序步驟:

(1)自定義類繼承WritableComparator

(2)重寫compare()方法

@Override

public int compare(WritableComparable a, WritableComparable b) {

        // 比較的業務邏輯

        return result;

}

(3)創建一個構造將比較對象的類傳給父類

protected OrderGroupingComparator() {

        super(OrderBean.class, true);

}

3.3.10 GroupingComparator分組案例實操

1.需求

有如下訂單數據

4-2 訂單數據

訂單id

商品id

成交金額

0000001

Pdt_01

222.8

Pdt_02

33.8

0000002

Pdt_03

522.8

Pdt_04

122.4

Pdt_05

722.4

0000003

Pdt_06

232.8

Pdt_02

33.8

現在需要求出每一個訂單中最貴的商品。

(1)輸入數據

(2)期望輸出數據

1    222.8

2    722.4

3    232.8

2.需求分析

(1)利用"訂單id和成交金額"作為key,可以將Map階段讀取到的所有訂單數據按照id升序排序,如果id相同再按照金額降序排序,發送到Reduce。

(2)在Reduce端利用groupingComparator將訂單id相同的kv聚合成組,然后取第一個即是該訂單中最貴商品,如圖4-18所示。

圖4-18 過程分析

3.代碼實現

(1)定義訂單信息OrderBean類

package com.atguigu.mapreduce.order;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

 

public class OrderBean implements WritableComparable<OrderBean> {

 

    private int order_id; // 訂單id

    private double price; // 價格

 

    public OrderBean() {

        super();

    }

 

    public OrderBean(int order_id, double price) {

        super();

        this.order_id = order_id;

        this.price = price;

    }

 

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeInt(order_id);

        out.writeDouble(price);

    }

 

    @Override

    public void readFields(DataInput in) throws IOException {

        order_id = in.readInt();

        price = in.readDouble();

    }

 

    @Override

    public String toString() {

        return order_id + "\t" + price;

    }

 

    public int getOrder_id() {

        return order_id;

    }

 

    public void setOrder_id(int order_id) {

        this.order_id = order_id;

    }

 

    public double getPrice() {

        return price;

    }

 

    public void setPrice(double price) {

        this.price = price;

    }

 

    // 二次排序

    @Override

    public int compareTo(OrderBean o) {

 

        int result;

 

        if (order_id > o.getOrder_id()) {

            result = 1;

        } else if (order_id < o.getOrder_id()) {

            result = -1;

        } else {

            // 價格倒序排序

            result = price > o.getPrice() ? -1 : 1;

        }

 

        return result;

    }

}

(2)編寫OrderSortMapper類

package com.atguigu.mapreduce.order;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

 

    OrderBean k = new OrderBean();

    

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        

        // 1 獲取一行

        String line = value.toString();

        

        // 2 截取

        String[] fields = line.split("\t");

        

        // 3 封裝對象

        k.setOrder_id(Integer.parseInt(fields[0]));

        k.setPrice(Double.parseDouble(fields[2]));

        

        // 4 寫出

        context.write(k, NullWritable.get());

    }

}

(3)編寫OrderSortGroupingComparator類

package com.atguigu.mapreduce.order;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

 

public class OrderGroupingComparator extends WritableComparator {

 

    protected OrderGroupingComparator() {

        super(OrderBean.class, true);

    }

 

    @Override

    public int compare(WritableComparable a, WritableComparable b) {

 

        OrderBean aBean = (OrderBean) a;

        OrderBean bBean = (OrderBean) b;

 

        int result;

        if (aBean.getOrder_id() > bBean.getOrder_id()) {

            result = 1;

        } else if (aBean.getOrder_id() < bBean.getOrder_id()) {

            result = -1;

        } else {

            result = 0;

        }

 

        return result;

    }

}

(4)編寫OrderSortReducer類

package com.atguigu.mapreduce.order;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Reducer;

 

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

 

    @Override

    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context)        throws IOException, InterruptedException {

        

        context.write(key, NullWritable.get());

    }

}

(5)編寫OrderSortDriver類

package com.atguigu.mapreduce.order;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class OrderDriver {

 

    public static void main(String[] args) throws Exception, IOException {

 

// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置

        args = new String[]{"e:/input/inputorder" , "e:/output1"};

 

        // 1 獲取配置信息

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

 

        // 2 設置jar包加載路徑

        job.setJarByClass(OrderDriver.class);

 

        // 3 加載map/reduce

        job.setMapperClass(OrderMapper.class);

        job.setReducerClass(OrderReducer.class);

 

        // 4 設置map輸出數據keyvalue類型

        job.setMapOutputKeyClass(OrderBean.class);

        job.setMapOutputValueClass(NullWritable.class);

 

        // 5 設置最終輸出數據的keyvalue類型

        job.setOutputKeyClass(OrderBean.class);

        job.setOutputValueClass(NullWritable.class);

 

        // 6 設置輸入數據和輸出數據路徑

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

        // 8 設置reduce端的分組

    job.setGroupingComparatorClass(OrderGroupingComparator.class);

 

        // 7 提交

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

3.4 MapTask工作機制

MapTask工作機制如圖4-12所示。

圖4-12 MapTask工作機制

    (1)Read階段:MapTask通過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。

    (2)Map階段:該節點主要是將解析出的key/value交給用戶編寫map()函數處理,並產生一系列新的key/value。

    (3)Collect收集階段:在用戶編寫map()函數中,當數據處理完成后,一般會調用OutputCollector.collect()輸出結果。在該函數內部,它會將生成的key/value分區(調用Partitioner),並寫入一個環形內存緩沖區中。

    (4)Spill階段:即"溢寫",當環形緩沖區滿后,MapReduce會將數據寫到本地磁盤上,生成一個臨時文件。需要注意的是,將數據寫入本地磁盤之前,先要對數據進行一次本地排序,並在必要時對數據進行合並、壓縮等操作。

    溢寫階段詳情:

    步驟1:利用快速排序算法對緩存區內的數據進行排序,排序方式是,先按照分區編號Partition進行排序,然后按照key進行排序。這樣,經過排序后,數據以分區為單位聚集在一起,且同一分區內所有數據按照key有序。

    步驟2:按照分區編號由小到大依次將每個分區中的數據寫入任務工作目錄下的臨時文件output/spillN.out(N表示當前溢寫次數)中。如果用戶設置了Combiner,則寫入文件之前,對每個分區中的數據進行一次聚集操作。

    步驟3:將分區數據的元信息寫到內存索引數據結構SpillRecord中,其中每個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮后數據大小。如果當前內存索引大小超過1MB,則將內存索引寫到文件output/spillN.out.index中。

    (5)Combine階段:當所有數據處理完成后,MapTask對所有臨時文件進行一次合並,以確保最終只會生成一個數據文件。

    當所有數據處理完后,MapTask會將所有臨時文件合並成一個大文件,並保存到文件output/file.out中,同時生成相應的索引文件output/file.out.index。

    在進行文件合並過程中,MapTask以分區為單位進行合並。對於某個分區,它將采用多輪遞歸合並的方式。每輪合並io.sort.factor(默認10)個文件,並將產生的文件重新加入待合並列表中,對文件排序后,重復以上過程,直到最終得到一個大文件。

    讓每個MapTask最終只生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。

3.5 ReduceTask工作機制

1ReduceTask工作機制

ReduceTask工作機制,如圖4-19所示。

圖4-19 ReduceTask工作機制

    (1)Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數據,並針對某一片數據,如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內存中。

    (2)Merge階段:在遠程拷貝數據的同時,ReduceTask啟動了兩個后台線程對內存和磁盤上的文件進行合並,以防止內存使用過多或磁盤上文件過多。

    (3)Sort階段:按照MapReduce語義,用戶編寫reduce()函數輸入數據是按key進行聚集的一組數據。為了將key相同的數據聚在一起,Hadoop采用了基於排序的策略。由於各個MapTask已經實現對自己的處理結果進行了局部排序,因此,ReduceTask只需對所有數據進行一次歸並排序即可。

    (4)Reduce階段:reduce()函數將計算結果寫到HDFS上。

2.設置ReduceTask並行度(個數)

ReduceTask的並行度同樣影響整個Job的執行並發度和執行效率,但與MapTask的並發數由切片數決定不同,ReduceTask數量的決定是可以直接手動設置:

// 默認值是1,手動設置為4

job.setNumReduceTasks(4);

3.實驗:測試ReduceTask多少合適

(1)實驗環境:1個Master節點,16個Slave節點:CPU:8GHZ,內存: 2G

(2)實驗結論:

4-3 改變ReduceTask (數據量為1GB

MapTask =16

ReduceTask

1

5

10

15

16

20

25

30

45

60

總時間

892

146

110

92

88

100

128

101

145

104

4.注意事項

3.6 OutputFormat數據輸出

3.6.1 OutputFormat接口實現類

3.6.2 自定義OutputFormat

3.6.3 自定義OutputFormat案例實操

1.需求

    過濾輸入的log日志,包含atguigu的網站輸出到e:/atguigu.log,不包含atguigu的網站輸出到e:/other.log。

(1)輸入數據

(2)期望輸出數據

2.需求分析

3.案例實操

(1)編寫FilterMapper類

package com.atguigu.mapreduce.outputformat;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

    

    @Override

    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {

 

        // 寫出

        context.write(value, NullWritable.get());

    }

}

(2)編寫FilterReducer類

package com.atguigu.mapreduce.outputformat;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

 

Text k = new Text();

 

    @Override

    protected void reduce(Text key, Iterable<NullWritable> values, Context context)        throws IOException, InterruptedException {

 

// 1 獲取一行

        String line = key.toString();

 

// 2 拼接

        line = line + "\r\n";

 

// 3 設置key

k.set(line);

 

// 4 輸出

        context.write(k, NullWritable.get());

    }

}

(3)自定義一個OutputFormat類

package com.atguigu.mapreduce.outputformat;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.RecordWriter;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{

 

    @Override

    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)            throws IOException, InterruptedException {

 

        // 創建一個RecordWriter

        return new FilterRecordWriter(job);

    }

}

(4)編寫RecordWriter類

package com.atguigu.mapreduce.outputformat;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.RecordWriter;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

 

public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {

 

    FSDataOutputStream atguiguOut = null;

    FSDataOutputStream otherOut = null;

 

    public FilterRecordWriter(TaskAttemptContext job) {

 

        // 1 獲取文件系統

        FileSystem fs;

 

        try {

            fs = FileSystem.get(job.getConfiguration());

 

            // 2 創建輸出文件路徑

            Path atguiguPath = new Path("e:/atguigu.log");

            Path otherPath = new Path("e:/other.log");

 

            // 3 創建輸出流

            atguiguOut = fs.create(atguiguPath);

            otherOut = fs.create(otherPath);

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

 

    @Override

    public void write(Text key, NullWritable value) throws IOException, InterruptedException {

 

        // 判斷是否包含"atguigu"輸出到不同文件

        if (key.toString().contains("atguigu")) {

            atguiguOut.write(key.toString().getBytes());

        } else {

            otherOut.write(key.toString().getBytes());

        }

    }

 

    @Override

    public void close(TaskAttemptContext context) throws IOException, InterruptedException {

 

        // 關閉資源

IOUtils.closeStream(atguiguOut);

        IOUtils.closeStream(otherOut);    }

}

(5)編寫FilterDriver類

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class FilterDriver {

 

    public static void main(String[] args) throws Exception {

 

// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置

args = new String[] { "e:/input/inputoutputformat", "e:/output2" };

 

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

 

        job.setJarByClass(FilterDriver.class);

        job.setMapperClass(FilterMapper.class);

        job.setReducerClass(FilterReducer.class);

 

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(NullWritable.class);

        

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(NullWritable.class);

 

        // 要將自定義的輸出格式組件設置到job

        job.setOutputFormatClass(FilterOutputFormat.class);

 

        FileInputFormat.setInputPaths(job, new Path(args[0]));

 

        // 雖然我們自定義了outputformat,但是因為我們的outputformat繼承自fileoutputformat

        // fileoutputformat要輸出一個_SUCCESS文件,所以,在這還得指定一個輸出目錄

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

3.7 Join多種應用

3.7.1 Reduce Join

3.7.2 Reduce Join案例實操

1.需求

4-4 訂單數據表t_order

id

pid

amount

1001

01

1

1002

02

2

1003

03

3

1004

01

4

1005

02

5

1006

03

6

4-5 商品信息表t_product

pid

pname

01

小米

02

華為

03

格力

    將商品信息表中數據根據商品pid合並到訂單數據表中。

4-6 最終數據形式

id

pname

amount

1001

小米

1

1004

小米

4

1002

華為

2

1005

華為

5

1003

格力

3

1006

格力

6

2.需求分析

通過將關聯條件作為Map輸出的key,將兩表滿足Join條件的數據並攜帶數據所來源的文件信息,發往同一個ReduceTask,在Reduce中進行數據的串聯,如圖4-20所示。

圖4-20 Reduce端表合並

3.代碼實現

1)創建商品和訂合並后的Bean類

package com.atguigu.mapreduce.table;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.Writable;

 

public class TableBean implements Writable {

 

    private String order_id; // 訂單id

    private String p_id; // 產品id

    private int amount; // 產品數量

    private String pname; // 產品名稱

    private String flag; // 表的標記

 

    public TableBean() {

        super();

    }

 

    public TableBean(String order_id, String p_id, int amount, String pname, String flag) {

 

        super();

 

        this.order_id = order_id;

        this.p_id = p_id;

        this.amount = amount;

        this.pname = pname;

        this.flag = flag;

    }

 

    public String getFlag() {

        return flag;

    }

 

    public void setFlag(String flag) {

        this.flag = flag;

    }

 

    public String getOrder_id() {

        return order_id;

    }

 

    public void setOrder_id(String order_id) {

        this.order_id = order_id;

    }

 

    public String getP_id() {

        return p_id;

    }

 

    public void setP_id(String p_id) {

        this.p_id = p_id;

    }

 

    public int getAmount() {

        return amount;

    }

 

    public void setAmount(int amount) {

        this.amount = amount;

    }

 

    public String getPname() {

        return pname;

    }

 

    public void setPname(String pname) {

        this.pname = pname;

    }

 

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeUTF(order_id);

        out.writeUTF(p_id);

        out.writeInt(amount);

        out.writeUTF(pname);

        out.writeUTF(flag);

    }

 

    @Override

    public void readFields(DataInput in) throws IOException {

        this.order_id = in.readUTF();

        this.p_id = in.readUTF();

        this.amount = in.readInt();

        this.pname = in.readUTF();

        this.flag = in.readUTF();

    }

 

    @Override

    public String toString() {

        return order_id + "\t" + pname + "\t" + amount + "\t" ;

    }

}

2)編寫TableMapper類

package com.atguigu.mapreduce.table;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{

 

String name;

    TableBean bean = new TableBean();

    Text k = new Text();

    

    @Override

    protected void setup(Context context) throws IOException, InterruptedException {

 

        // 1 獲取輸入文件切片

        FileSplit split = (FileSplit) context.getInputSplit();

 

        // 2 獲取輸入文件名稱

        name = split.getPath().getName();

    }

 

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        

        // 1 獲取輸入數據

        String line = value.toString();

        

        // 2 不同文件分別處理

        if (name.startsWith("order")) {// 訂單表處理

 

            // 2.1 切割

            String[] fields = line.split("\t");

            

            // 2.2 封裝bean對象

            bean.setOrder_id(fields[0]);

            bean.setP_id(fields[1]);

            bean.setAmount(Integer.parseInt(fields[2]));

            bean.setPname("");

            bean.setFlag("order");

            

            k.set(fields[1]);

        }else {// 產品表處理

 

            // 2.3 切割

            String[] fields = line.split("\t");

            

            // 2.4 封裝bean對象

            bean.setP_id(fields[0]);

            bean.setPname(fields[1]);

            bean.setFlag("pd");

            bean.setAmount(0);

            bean.setOrder_id("");

            

            k.set(fields[0]);

        }

 

        // 3 寫出

        context.write(k, bean);

    }

}

3)編寫TableReducer類

package com.atguigu.mapreduce.table;

import java.io.IOException;

import java.util.ArrayList;

import org.apache.commons.beanutils.BeanUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {

 

    @Override

    protected void reduce(Text key, Iterable<TableBean> values, Context context)    throws IOException, InterruptedException {

 

        // 1准備存儲訂單的集合

        ArrayList<TableBean> orderBeans = new ArrayList<>();

        

// 2 准備bean對象

        TableBean pdBean = new TableBean();

 

        for (TableBean bean : values) {

 

            if ("order".equals(bean.getFlag())) {// 訂單表

 

                // 拷貝傳遞過來的每條訂單數據到集合中

                TableBean orderBean = new TableBean();

 

                try {

                    BeanUtils.copyProperties(orderBean, bean);

                } catch (Exception e) {

                    e.printStackTrace();

                }

 

                orderBeans.add(orderBean);

            } else {// 產品表

 

                try {

                    // 拷貝傳遞過來的產品表到內存中

                    BeanUtils.copyProperties(pdBean, bean);

                } catch (Exception e) {

                    e.printStackTrace();

                }

            }

        }

 

        // 3 表的拼接

        for(TableBean bean:orderBeans){

 

            bean.setPname (pdBean.getPname());

            

            // 4 數據寫出去

            context.write(bean, NullWritable.get());

        }

    }

}

4)編寫TableDriver類

package com.atguigu.mapreduce.table;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class TableDriver {

 

    public static void main(String[] args) throws Exception {

        

// 0 根據自己電腦路徑重新配置

args = new String[]{"e:/input/inputtable","e:/output1"};

 

// 1 獲取配置信息,或者job對象實例

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration);

 

        // 2 指定本程序的jar包所在的本地路徑

        job.setJarByClass(TableDriver.class);

 

        // 3 指定本業務job要使用的Mapper/Reducer業務類

        job.setMapperClass(TableMapper.class);

        job.setReducerClass(TableReducer.class);

 

        // 4 指定Mapper輸出數據的kv類型

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(TableBean.class);

 

        // 5 指定最終輸出的數據的kv類型

        job.setOutputKeyClass(TableBean.class);

        job.setOutputValueClass(NullWritable.class);

 

        // 6 指定job的輸入原始文件所在目錄

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

        // 7 job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

4.測試

運行程序查看結果

1001    小米    1    

1001    小米    1    

1002    華為    2    

1002    華為    2    

1003    格力    3    

1003    格力    3    

5.總結

3.7.3 Map Join

1.使用場景

Map Join適用於一張表十分小、一張表很大的場景。

2.優點

思考:在Reduce端處理過多的表,非常容易產生數據傾斜。怎么辦?

在Map端緩存多張表,提前處理業務邏輯,這樣增加Map端業務,減少Reduce端數據的壓力,盡可能的減少數據傾斜。

3.具體辦法:采用DistributedCache

    (1)在Mapper的setup階段,將文件讀取到緩存集合中。

    (2)在驅動函數中加載緩存。

// 緩存普通文件到Task運行節點。

job.addCacheFile(new URI("file://e:/cache/pd.txt"));

3.7.4 Map Join案例實操

1.需求

4-4 訂單數據表t_order

id

pid

amount

1001

01

1

1002

02

2

1003

03

3

1004

01

4

1005

02

5

1006

03

6

4-5 商品信息表t_product

pid

pname

01

小米

02

華為

03

格力

    將商品信息表中數據根據商品pid合並到訂單數據表中。

4-6 最終數據形式

id

pname

amount

1001

小米

1

1004

小米

4

1002

華為

2

1005

華為

5

1003

格力

3

1006

格力

6

2.需求分析

MapJoin適用於關聯表中有小表的情形。

圖4-21 Map端表合並

3.實現代碼

(1)先在驅動模塊中添加緩存文件

package test;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class DistributedCacheDriver {

 

    public static void main(String[] args) throws Exception {

        

// 0 根據自己電腦路徑重新配置

args = new String[]{"e:/input/inputtable2", "e:/output1"};

 

// 1 獲取job信息

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration);

 

        // 2 設置加載jar包路徑

        job.setJarByClass(DistributedCacheDriver.class);

 

        // 3 關聯map

        job.setMapperClass(DistributedCacheMapper.class);

        

// 4 設置最終輸出數據類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(NullWritable.class);

 

        // 5 設置輸入輸出路徑

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

        // 6 加載緩存數據

        job.addCacheFile(new URI("file:///e:/input/inputcache/pd.txt"));

        

        // 7 MapJoin的邏輯不需要Reduce階段,設置reduceTask數量為0

        job.setNumReduceTasks(0);

 

        // 8 提交

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

(2)讀取緩存的文件數據

package test;

import java.io.BufferedReader;

import java.io.FileInputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.util.HashMap;

import java.util.Map;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

 

    Map<String, String> pdMap = new HashMap<>();

    

    @Override

    protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {

 

        // 1 獲取緩存的文件

        URI[] cacheFiles = context.getCacheFiles();

        String path = cacheFiles[0].getPath().toString();

        

        BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));

        

        String line;

        while(StringUtils.isNotEmpty(line = reader.readLine())){

 

            // 2 切割

            String[] fields = line.split("\t");

            

            // 3 緩存數據到集合

            pdMap.put(fields[0], fields[1]);

        }

        

        // 4 關流

        reader.close();

    }

    

    Text k = new Text();

    

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

        // 1 獲取一行

        String line = value.toString();

        

        // 2 截取

        String[] fields = line.split("\t");

        

        // 3 獲取產品id

        String pId = fields[1];

        

        // 4 獲取商品名稱

        String pdName = pdMap.get(pId);

        

        // 5 拼接

        k.set(line + "\t"+ pdName);

        

        // 6 寫出

        context.write(k, NullWritable.get());

    }

}

3.8 計數器應用

3.9 數據清洗(ETL

在運行核心業務MapReduce程序之前,往往要先對數據進行清洗,清理掉不符合用戶要求的數據。清理的過程往往只需要運行Mapper程序,不需要運行Reduce程序。

3.9.1 數據清洗案例實操-簡單解析版

1.需求

去除日志中字段長度小於等於11的日志。

(1)輸入數據

(2)期望輸出數據

每行字段長度都大於11。

2.需求分析

    需要在Map階段對輸入的數據根據規則進行過濾清洗。

3.實現代碼

(1)編寫LogMapper類

package com.atguigu.mapreduce.weblog;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

    

    Text k = new Text();

    

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        

        // 1 獲取1行數據

        String line = value.toString();

        

        // 2 解析日志

        boolean result = parseLog(line,context);

        

        // 3 日志不合法退出

        if (!result) {

            return;

        }

        

        // 4 設置key

        k.set(line);

        

        // 5 寫出數據

        context.write(k, NullWritable.get());

    }

 

    // 2 解析日志

    private boolean parseLog(String line, Context context) {

 

        // 1 截取

        String[] fields = line.split(" ");

        

        // 2 日志長度大於11的為合法

        if (fields.length > 11) {

 

            // 系統計數器

            context.getCounter("map", "true").increment(1);

            return true;

        }else {

            context.getCounter("map", "false").increment(1);

            return false;

        }

    }

}

 

(2)編寫LogDriver類

package com.atguigu.mapreduce.weblog;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class LogDriver {

 

    public static void main(String[] args) throws Exception {

 

// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置

args = new String[] { "e:/input/inputlog", "e:/output1" };

 

        // 1 獲取job信息

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

 

        // 2 加載jar

        job.setJarByClass(LogDriver.class);

 

        // 3 關聯map

        job.setMapperClass(LogMapper.class);

 

        // 4 設置最終輸出類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(NullWritable.class);

 

        // 設置reducetask個數為0

        job.setNumReduceTasks(0);

 

        // 5 設置輸入和輸出路徑

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

        // 6 提交

        job.waitForCompletion(true);

    }

}

3.9.2 數據清洗案例實操-復雜解析版

1.需求

對Web訪問日志中的各字段識別切分,去除日志中不合法的記錄。根據清洗規則,輸出過濾后的數據。

(1)輸入數據

(2)期望輸出數據

都是合法的數據

2.實現代碼

(1)定義一個bean,用來記錄日志數據中的各數據字段

package com.atguigu.mapreduce.log;

 

public class LogBean {

    private String remote_addr;// 記錄客戶端的ip地址

    private String remote_user;// 記錄客戶端用戶名稱,忽略屬性"-"

    private String time_local;// 記錄訪問時間與時區

    private String request;// 記錄請求的urlhttp協議

    private String status;// 記錄請求狀態;成功是200

    private String body_bytes_sent;// 記錄發送給客戶端文件主體內容大小

    private String http_referer;// 用來記錄從那個頁面鏈接訪問過來的

    private String http_user_agent;// 記錄客戶瀏覽器的相關信息

 

    private boolean valid = true;// 判斷數據是否合法

 

    public String getRemote_addr() {

        return remote_addr;

    }

 

    public void setRemote_addr(String remote_addr) {

        this.remote_addr = remote_addr;

    }

 

    public String getRemote_user() {

        return remote_user;

    }

 

    public void setRemote_user(String remote_user) {

        this.remote_user = remote_user;

    }

 

    public String getTime_local() {

        return time_local;

    }

 

    public void setTime_local(String time_local) {

        this.time_local = time_local;

    }

 

    public String getRequest() {

        return request;

    }

 

    public void setRequest(String request) {

        this.request = request;

    }

 

    public String getStatus() {

        return status;

    }

 

    public void setStatus(String status) {

        this.status = status;

    }

 

    public String getBody_bytes_sent() {

        return body_bytes_sent;

    }

 

    public void setBody_bytes_sent(String body_bytes_sent) {

        this.body_bytes_sent = body_bytes_sent;

    }

 

    public String getHttp_referer() {

        return http_referer;

    }

 

    public void setHttp_referer(String http_referer) {

        this.http_referer = http_referer;

    }

 

    public String getHttp_user_agent() {

        return http_user_agent;

    }

 

    public void setHttp_user_agent(String http_user_agent) {

        this.http_user_agent = http_user_agent;

    }

 

    public boolean isValid() {

        return valid;

    }

 

    public void setValid(boolean valid) {

        this.valid = valid;

    }

 

    @Override

    public String toString() {

 

        StringBuilder sb = new StringBuilder();

        sb.append(this.valid);

        sb.append("\001").append(this.remote_addr);

        sb.append("\001").append(this.remote_user);

        sb.append("\001").append(this.time_local);

        sb.append("\001").append(this.request);

        sb.append("\001").append(this.status);

        sb.append("\001").append(this.body_bytes_sent);

        sb.append("\001").append(this.http_referer);

        sb.append("\001").append(this.http_user_agent);

        

        return sb.toString();

    }

}

(2)編寫LogMapper類

package com.atguigu.mapreduce.log;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

    Text k = new Text();

    

    @Override

    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {

 

        // 1 獲取1

        String line = value.toString();

        

        // 2 解析日志是否合法

        LogBean bean = parseLog(line);

        

        if (!bean.isValid()) {

            return;

        }

        

        k.set(bean.toString());

        

        // 3 輸出

        context.write(k, NullWritable.get());

    }

 

    // 解析日志

    private LogBean parseLog(String line) {

 

        LogBean logBean = new LogBean();

        

        // 1 截取

        String[] fields = line.split(" ");

        

        if (fields.length > 11) {

 

            // 2封裝數據

            logBean.setRemote_addr(fields[0]);

            logBean.setRemote_user(fields[1]);

            logBean.setTime_local(fields[3].substring(1));

            logBean.setRequest(fields[6]);

            logBean.setStatus(fields[8]);

            logBean.setBody_bytes_sent(fields[9]);

            logBean.setHttp_referer(fields[10]);

            

            if (fields.length > 12) {

                logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);

            }else {

                logBean.setHttp_user_agent(fields[11]);

            }

            

            // 大於400HTTP錯誤

            if (Integer.parseInt(logBean.getStatus()) >= 400) {

                logBean.setValid(false);

            }

        }else {

            logBean.setValid(false);

        }

        

        return logBean;

    }

}

(3)編寫LogDriver類

package com.atguigu.mapreduce.log;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class LogDriver {

    public static void main(String[] args) throws Exception {

        

// 1 獲取job信息

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

 

        // 2 加載jar

        job.setJarByClass(LogDriver.class);

 

        // 3 關聯map

        job.setMapperClass(LogMapper.class);

 

        // 4 設置最終輸出類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(NullWritable.class);

 

        // 5 設置輸入和輸出路徑

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

        // 6 提交

        job.waitForCompletion(true);

    }

}

3.10 MapReduce開發總結

在編寫MapReduce程序時,需要考慮如下幾個方面:

第4章 Hadoop數據壓縮

4.1 概述

4.2 MR支持的壓縮編碼

4-7

壓縮格式

hadoop自帶?

算法

文件擴展名

是否可切分

換成壓縮格式后,原來的程序是否需要修改

DEFLATE

是,直接使用

DEFLATE

.deflate

和文本處理一樣,不需要修改

Gzip

是,直接使用

DEFLATE

.gz

和文本處理一樣,不需要修改

bzip2

是,直接使用

bzip2

.bz2

和文本處理一樣,不需要修改

LZO

否,需要安裝

LZO

.lzo

需要建索引,還需要指定輸入格式

Snappy

否,需要安裝

Snappy

.snappy

和文本處理一樣,不需要修改

為了支持多種壓縮/解壓縮算法,Hadoop引入了編碼/解碼器,如下表所示。

4-8

壓縮格式

對應的編碼/解碼器

DEFLATE

org.apache.hadoop.io.compress.DefaultCodec

gzip

org.apache.hadoop.io.compress.GzipCodec

bzip2

org.apache.hadoop.io.compress.BZip2Codec

LZO

com.hadoop.compression.lzo.LzopCodec

Snappy

org.apache.hadoop.io.compress.SnappyCodec

壓縮性能的比較

4-9

壓縮算法

原始文件大小

壓縮文件大小

壓縮速度

解壓速度

gzip

8.3GB

1.8GB

17.5MB/s

58MB/s

bzip2

8.3GB

1.1GB

2.4MB/s

9.5MB/s

LZO

8.3GB

2.9GB

49.3MB/s

74.6MB/s

http://google.github.io/snappy/

On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.

4.3 壓縮方式選擇

4.3.1 Gzip壓縮

4.3.2 Bzip2壓縮

4.3.3 Lzo壓縮

4.3.4 Snappy壓縮

4.4 壓縮位置選擇

壓縮可以在MapReduce作用的任意階段啟用,如圖4-22所示。

圖4-22 MapReduce數據壓縮

4.5 壓縮參數配置

要在Hadoop中啟用壓縮,可以配置如下參數:

4-10 配置參數

參數

默認值

階段

建議

io.compression.codecs

(在core-site.xml中配置)

org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec

輸入壓縮

Hadoop使用文件擴展名判斷是否支持某種編解碼器

mapreduce.map.output.compress(在mapred-site.xml中配置)

false

mapper輸出

這個參數設為true啟用壓縮

mapreduce.map.output.compress.codec(在mapred-site.xml中配置)

org.apache.hadoop.io.compress.DefaultCodec

mapper輸出

企業多使用LZO或Snappy編解碼器在此階段壓縮數據

mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置)

false

reducer輸出

這個參數設為true啟用壓縮

mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置)

org.apache.hadoop.io.compress. DefaultCodec

reducer輸出

使用標准工具或者編解碼器,如gzip和bzip2

mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置)

RECORD

reducer輸出

SequenceFile輸出使用的壓縮類型:NONE和BLOCK

4.6 壓縮實操案例

4.6.1 數據流的壓縮和解壓縮

測試一下如下壓縮方式:

4-11

DEFLATE

org.apache.hadoop.io.compress.DefaultCodec

gzip

org.apache.hadoop.io.compress.GzipCodec

bzip2

org.apache.hadoop.io.compress.BZip2Codec

package com.atguigu.mapreduce.compress;

import java.io.File;

import java.io.FileInputStream;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.compress.CompressionCodec;

import org.apache.hadoop.io.compress.CompressionCodecFactory;

import org.apache.hadoop.io.compress.CompressionInputStream;

import org.apache.hadoop.io.compress.CompressionOutputStream;

import org.apache.hadoop.util.ReflectionUtils;

 

public class TestCompress {

 

    public static void main(String[] args) throws Exception {

        compress("e:/hello.txt","org.apache.hadoop.io.compress.BZip2Codec");

//        decompress("e:/hello.txt.bz2");

    }

 

    // 1壓縮

    private static void compress(String filename, String method) throws Exception {

        

        // 1)獲取輸入流

        FileInputStream fis = new FileInputStream(new File(filename));

        

        Class codecClass = Class.forName(method);

        

        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());

        

        // 2)獲取輸出流

        FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));

        CompressionOutputStream cos = codec.createOutputStream(fos);

        

        // 3)流的對拷

        IOUtils.copyBytes(fis, cos, 1024*1024*5, false);

        

        // 4)關閉資源

        cos.close();

        fos.close();

fis.close();

    }

 

    // 2解壓縮

    private static void decompress(String filename) throws FileNotFoundException, IOException {

        

        // 0)校驗是否能解壓縮

        CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());

 

        CompressionCodec codec = factory.getCodec(new Path(filename));

        

        if (codec == null) {

            System.out.println("cannot find codec for file " + filename);

            return;

        }

        

        // 1)獲取輸入流

        CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));

        

        // 2)獲取輸出流

        FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded"));

        

        // 3)流的對拷

        IOUtils.copyBytes(cis, fos, 1024*1024*5, false);

        

        // 4)關閉資源

        cis.close();

        fos.close();

    }

}

4.6.2 Map輸出端采用壓縮

即使你的MapReduce的輸入輸出文件都是未壓縮的文件,你仍然可以對Map任務的中間結果輸出做壓縮,因為它要寫在硬盤並且通過網絡傳輸到Reduce節點,對其壓縮可以提高很多性能,這些工作只要設置兩個屬性即可,我們來看下代碼怎么設置。

1.給大家提供的Hadoop源碼支持的壓縮格式有:BZip2Codec DefaultCodec

package com.atguigu.mapreduce.compress;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.compress.BZip2Codec;    

import org.apache.hadoop.io.compress.CompressionCodec;

import org.apache.hadoop.io.compress.GzipCodec;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class WordCountDriver {

 

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

 

        Configuration configuration = new Configuration();

 

        // 開啟map端輸出壓縮

    configuration.setBoolean("mapreduce.map.output.compress", true);

        // 設置map端輸出壓縮方式

    configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

 

        Job job = Job.getInstance(configuration);

 

        job.setJarByClass(WordCountDriver.class);

 

        job.setMapperClass(WordCountMapper.class);

        job.setReducerClass(WordCountReducer.class);

 

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(IntWritable.class);

 

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

 

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

        boolean result = job.waitForCompletion(true);

 

        System.exit(result ? 1 : 0);

    }

}

2Mapper保持不變

package com.atguigu.mapreduce.compress;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

 

Text k = new Text();

    IntWritable v = new IntWritable(1);

 

    @Override

    protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {

 

        // 1 獲取一行

        String line = value.toString();

 

        // 2 切割

        String[] words = line.split(" ");

 

        // 3 循環寫出

        for(String word:words){

k.set(word);

            context.write(k, v);

        }

    }

}

3Reducer保持不變

package com.atguigu.mapreduce.compress;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

 

    IntWritable v = new IntWritable();

 

    @Override

    protected void reduce(Text key, Iterable<IntWritable> values,

            Context context) throws IOException, InterruptedException {

        

        int sum = 0;

 

        // 1 匯總

        for(IntWritable value:values){

            sum += value.get();

        }

          

v.set(sum);

 

// 2 輸出

        context.write(key, v);

    }

}

4.6.3 Reduce輸出端采用壓縮

基於WordCount案例處理。

1.修改驅動

package com.atguigu.mapreduce.compress;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.compress.BZip2Codec;

import org.apache.hadoop.io.compress.DefaultCodec;

import org.apache.hadoop.io.compress.GzipCodec;

import org.apache.hadoop.io.compress.Lz4Codec;

import org.apache.hadoop.io.compress.SnappyCodec;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class WordCountDriver {

 

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        

        Configuration configuration = new Configuration();

        

        Job job = Job.getInstance(configuration);

        

        job.setJarByClass(WordCountDriver.class);

        

        job.setMapperClass(WordCountMapper.class);

        job.setReducerClass(WordCountReducer.class);

        

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(IntWritable.class);

        

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        

        // 設置reduce端輸出壓縮開啟

        FileOutputFormat.setCompressOutput(job, true);

        

        // 設置壓縮的方式

     FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);

//     FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

//     FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);

      

        boolean result = job.waitForCompletion(true);

        

        System.exit(result?1:0);

    }

}

2MapperReducer保持不變(詳見4.6.2

第5章 Yarn資源調度器

Yarn是一個資源調度平台,負責為運算程序提供服務器運算資源,相當於一個分布式的操作系統平台,而MapReduce等運算程序則相當於運行於操作系統之上的應用程序

5.1 Yarn基本架構

     YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等組件構成,如圖4-23所示。

圖4-23 Yarn基本架構

5.3 Yarn工作機制

1Yarn運行機制,如圖4-24所示。

圖4-24 Yarn工作機制

2.工作機制詳解

    (1)MR程序提交到客戶端所在的節點。

    (2)YarnRunner向ResourceManager申請一個Application。

    (3)RM將該應用程序的資源路徑返回給YarnRunner。

    (4)該程序將運行所需資源提交到HDFS上。

    (5)程序資源提交完畢后,申請運行mrAppMaster。

    (6)RM將用戶的請求初始化成一個Task。

    (7)其中一個NodeManager領取到Task任務。

    (8)該NodeManager創建容器Container,並產生MRAppmaster。

    (9)Container從HDFS上拷貝資源到本地。

    (10)MRAppmaster向RM 申請運行MapTask資源。

    (11)RM將運行MapTask任務分配給另外兩個NodeManager,另兩個NodeManager分別領取任務並創建容器。

    (12)MR向兩個接收到任務的NodeManager發送程序啟動腳本,這兩個NodeManager分別啟動MapTask,MapTask對數據分區排序。

(13)MrAppMaster等待所有MapTask運行完畢后,向RM申請容器,運行ReduceTask

    (14)ReduceTask向MapTask獲取相應分區的數據。

    (15)程序運行完畢后,MR會向RM申請注銷自己。

5.4 作業提交全過程

1.作業提交過程之YARN,如圖4-25所示。

圖4-25 作業提交過程之Yarn

作業提交全過程詳解

(1)作業提交

第1步:Client調用job.waitForCompletion方法,向整個集群提交MapReduce作業。

第2步:Client向RM申請一個作業id。

第3步:RM給Client返回該job資源的提交路徑和作業id。

第4步:Client提交jar包、切片信息和配置文件到指定的資源提交路徑。

第5步:Client提交完資源后,向RM申請運行MrAppMaster。

(2)作業初始化

第6步:當RM收到Client的請求后,將該job添加到容量調度器中。

第7步:某一個空閑的NM領取到該Job。

第8步:該NM創建Container,並產生MRAppmaster

第9步:下載Client提交的資源到本地。

(3)任務分配

第10步:MrAppMaster向RM申請運行多個MapTask任務資源。

第11步:RM將運行MapTask任務分配給另外兩個NodeManager,另兩個NodeManager分別領取任務並創建容器。

(4)任務運行

第12步:MR向兩個接收到任務的NodeManager發送程序啟動腳本,這兩個NodeManager分別啟動MapTaskMapTask對數據分區排序。

第13步:MrAppMaster等待所有MapTask運行完畢后,向RM申請容器,運行ReduceTask。

第14步:ReduceTask向MapTask獲取相應分區的數據。

第15步:程序運行完畢后,MR會向RM申請注銷自己。

(5)進度和狀態更新

YARN中的任務將其進度和狀態(包括counter)返回給應用管理器, 客戶端每秒(通過mapreduce.client.progressmonitor.pollinterval設置)向應用管理器請求進度更新, 展示給用戶。

(6)作業完成

除了向應用管理器請求作業進度外, 客戶端每5秒都會通過調用waitForCompletion()來檢查作業是否完成。時間間隔可以通過mapreduce.client.completion.pollinterval來設置。作業完成之后, 應用管理器和Container會清理工作狀態。作業的信息會被作業歷史服務器存儲以備之后用戶核查。

2.作業提交過程之MapReduce,如圖4-26所示

圖4-26 作業提交過程之MapReduce

5.5 資源調度器

目前,Hadoop作業調度器主要有三種:FIFO、Capacity Scheduler和Fair Scheduler。Hadoop2.7.2默認的資源調度器是Capacity Scheduler。

具體設置詳見:yarn-default.xml文件

<property>

<description>The class to use as the resource scheduler.</description>

<name>yarn.resourcemanager.scheduler.class</name>

<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>

</property>

1.先進先出調度器(FIFO),如圖4-27所示

    圖4-27 FIFO調度器

2.容量調度器(Capacity Scheduler),如圖4-28所示

    圖4-28容量調度器

3.公平調度器(Fair Scheduler),如圖4-29所示

圖4-29公平調度器

5.6 任務的推測執行

1.作業完成時間取決於最慢的任務完成時間

一個作業由若干個Map任務和Reduce任務構成。因硬件老化、軟件Bug等,某些任務可能運行非常慢。

思考:系統中有99%的Map任務都完成了,只有少數幾個Map老是進度很慢,完不成,怎么辦?

2.推測執行機制

發現拖后腿的任務,比如某個任務運行速度遠慢於任務平均速度。為拖后腿任務啟動一個備份任務,同時運行。誰先運行完,則采用誰的結果。

3.執行推測任務的前提條件

(1)每個Task只能有一個備份任務

(2)當前Job已完成的Task必須不小於0.05(5%)

(3)開啟推測執行參數設置。mapred-site.xml文件中默認是打開的。

<property>

    <name>mapreduce.map.speculative</name>

    <value>true</value>

    <description>If true, then multiple instances of some map tasks may be executed in parallel.</description>

</property>

 

<property>

    <name>mapreduce.reduce.speculative</name>

    <value>true</value>

    <description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description>

</property>

4.不能啟用推測執行機制情況

(1)任務間存在嚴重的負載傾斜;

(2)特殊任務,比如任務向數據庫中寫數據。

5.算法原理,如圖4-20所示

圖4-30 推測執行算法原理

第6章 Hadoop企業優化

6.1 MapReduce 跑的慢的原因

6.2 MapReduce優化方法

MapReduce優化方法主要從六個方面考慮:數據輸入、Map階段、Reduce階段、IO傳輸、數據傾斜問題和常用的調優參數。

6.2.1 數據輸入

6.2.2 Map階段

6.2.3 Reduce階段

6.2.4 I/O傳輸

6.2.5 數據傾斜問題

6.2.6 常用的調優參數

1.資源相關參數

(1)以下參數是在用戶自己的MR應用程序中配置就可以生效(mapred-default.xml)

4-12

配置參數

參數說明

mapreduce.map.memory.mb

一個MapTask可使用的資源上限(單位:MB),默認為1024。如果MapTask實際使用的資源量超過該值,則會被強制殺死。

mapreduce.reduce.memory.mb

一個ReduceTask可使用的資源上限(單位:MB),默認為1024。如果ReduceTask實際使用的資源量超過該值,則會被強制殺死。

mapreduce.map.cpu.vcores

每個MapTask可使用的最多cpu core數目,默認值: 1

mapreduce.reduce.cpu.vcores

每個ReduceTask可使用的最多cpu core數目,默認值: 1

mapreduce.reduce.shuffle.parallelcopies

每個Reduce去Map中取數據的並行數。默認值是5

mapreduce.reduce.shuffle.merge.percent

Buffer中的數據達到多少比例開始寫入磁盤。默認值0.66

mapreduce.reduce.shuffle.input.buffer.percent

Buffer大小占Reduce可用內存的比例。默認值0.7

mapreduce.reduce.input.buffer.percent

指定多少比例的內存用來存放Buffer中的數據,默認值是0.0

(2)應該在YARN啟動之前就配置在服務器的配置文件中才能生效(yarn-default.xml)

4-13

配置參數

參數說明

yarn.scheduler.minimum-allocation-mb    

給應用程序Container分配的最小內存,默認值:1024

yarn.scheduler.maximum-allocation-mb    

給應用程序Container分配的最大內存,默認值:8192

yarn.scheduler.minimum-allocation-vcores    

每個Container申請的最小CPU核數,默認值:1

yarn.scheduler.maximum-allocation-vcores    

每個Container申請的最大CPU核數,默認值:32

yarn.nodemanager.resource.memory-mb

給Containers分配的最大物理內存,默認值:8192

(3)Shuffle性能優化的關鍵參數,應在YARN啟動之前就配置好(mapred-default.xml)

4-14

配置參數

參數說明

mapreduce.task.io.sort.mb

Shuffle的環形緩沖區大小,默認100m

mapreduce.map.sort.spill.percent

環形緩沖區溢出的閾值,默認80%

2.容錯相關參數(MapReduce性能優化)

4-15

配置參數

參數說明

mapreduce.map.maxattempts

每個Map Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。

mapreduce.reduce.maxattempts

每個Reduce Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。

mapreduce.task.timeout

Task超時時間,經常需要設置的一個參數,該參數表達的意思為:如果一個Task在一定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認為該Task處於Block狀態,可能是卡住了,也許永遠會卡住,為了防止因為用戶程序永遠Block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是600000。如果你的程序對每條輸入數據的處理時間過長(比如會訪問數據庫,通過網絡拉取數據等),建議將該參數調大,該參數過小常出現的錯誤提示是"AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster."。

6.3 HDFS小文件優化方法

6.3.1 HDFS小文件弊端

HDFS上每個文件都要在NameNode上建立一個索引,這個索引的大小約為150byte,這樣當小文件比較多的時候,就會產生很多的索引文件,一方面會大量占用NameNode的內存空間另一方面就是索引文件過大使得索引速度變慢。

6.3.2 HDFS小文件解決方案

小文件的優化無非以下幾種方式:

1)在數據采集的時候,就將小文件或小批數據合成大文件再上傳HDFS

2)在業務處理之前,在HDFS上使用MapReduce程序對小文件進行合並。

3)在MapReduce處理時,可采用CombineTextInputFormat提高效率。

第7章 MapReduce擴展案例

7.1 倒排索引案例(多job串聯)

1.需求

有大量的文本(文檔、網頁),需要建立搜索索引,如圖4-31所示。

(1)數據輸入

(2)期望輸出數據

atguigu    c.txt-->2    b.txt-->2    a.txt-->3    

pingping    c.txt-->1    b.txt-->3    a.txt-->1    

ss    c.txt-->1    b.txt-->1    a.txt-->2    

2.需求分析

3.第一次處理

(1)第一次處理,編寫OneIndexMapper類

package com.atguigu.mapreduce.index;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

public class OneIndexMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

    

    String name;

    Text k = new Text();

    IntWritable v = new IntWritable();

    

    @Override

    protected void setup(Context context)throws IOException, InterruptedException {

 

        // 獲取文件名稱

        FileSplit split = (FileSplit) context.getInputSplit();

        

        name = split.getPath().getName();

    }

    

    @Override

    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {

 

        // 1 獲取1

        String line = value.toString();

        

        // 2 切割

        String[] fields = line.split(" ");

        

        for (String word : fields) {

 

            // 3 拼接

            k.set(word+"--"+name);

            v.set(1);

            

            // 4 寫出

            context.write(k, v);

        }

    }

}

(2)第一次處理,編寫OneIndexReducer類

package com.atguigu.mapreduce.index;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class OneIndexReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    

IntWritable v = new IntWritable();

 

    @Override

    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

        

        int sum = 0;

 

        // 1 累加求和

        for(IntWritable value: values){

            sum +=value.get();

        }

          

v.set(sum);

 

        // 2 寫出

        context.write(key, v);

    }

}

(3)第一次處理,編寫OneIndexDriver類

package com.atguigu.mapreduce.index;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class OneIndexDriver {

 

    public static void main(String[] args) throws Exception {

 

// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置

        args = new String[] { "e:/input/inputoneindex", "e:/output5" };

 

        Configuration conf = new Configuration();

 

        Job job = Job.getInstance(conf);

        job.setJarByClass(OneIndexDriver.class);

 

        job.setMapperClass(OneIndexMapper.class);

        job.setReducerClass(OneIndexReducer.class);

 

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(IntWritable.class);

        

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

 

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

        job.waitForCompletion(true);

    }

}

(4)查看第一次輸出結果

atguigu--a.txt    3

atguigu--b.txt    2

atguigu--c.txt    2

pingping--a.txt    1

pingping--b.txt    3

pingping--c.txt    1

ss--a.txt    2

ss--b.txt    1

ss--c.txt    1

4.第二次處理

(1)第二次處理,編寫TwoIndexMapper類

package com.atguigu.mapreduce.index;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class TwoIndexMapper extends Mapper<LongWritable, Text, Text, Text>{

 

    Text k = new Text();

    Text v = new Text();

    

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        

        // 1 獲取1行數據

        String line = value.toString();

        

        // 2用"--"切割

        String[] fields = line.split("--");

        

        k.set(fields[0]);

        v.set(fields[1]);

        

        // 3 輸出數據

        context.write(k, v);

    }

}

(2)第二次處理,編寫TwoIndexReducer類

package com.atguigu.mapreduce.index;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class TwoIndexReducer extends Reducer<Text, Text, Text, Text> {

 

Text v = new Text();

 

    @Override

    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        // atguigu a.txt 3

        // atguigu b.txt 2

        // atguigu c.txt 2

 

        // atguigu c.txt-->2 b.txt-->2 a.txt-->3

 

        StringBuilder sb = new StringBuilder();

 

// 1 拼接

        for (Text value : values) {

            sb.append(value.toString().replace("\t", "-->") + "\t");

        }

 

v.set(sb.toString());

 

        // 2 寫出

        context.write(key, v);

    }

}

(3)第二次處理,編寫TwoIndexDriver類

package com.atguigu.mapreduce.index;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class TwoIndexDriver {

 

    public static void main(String[] args) throws Exception {

 

// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設置

args = new String[] { "e:/input/inputtwoindex", "e:/output6" };

 

        Configuration config = new Configuration();

        Job job = Job.getInstance(config);

 

job.setJarByClass(TwoIndexDriver.class);

        job.setMapperClass(TwoIndexMapper.class);

        job.setReducerClass(TwoIndexReducer.class);

 

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(Text.class);

        

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

 

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

        boolean result = job.waitForCompletion(true);

System.exit(result?0:1);

    }

}

(4)第二次查看最終結果

atguigu    c.txt-->2    b.txt-->2    a.txt-->3    

pingping    c.txt-->1    b.txt-->3    a.txt-->1    

ss    c.txt-->1    b.txt-->1    a.txt-->2    

7.2 TopN案例

1.需求

對需求2.3輸出結果進行加工,輸出流量使用量在前10的用戶信息

(1)輸入數據            (2)輸出數據

            

2.需求分析

3.實現代碼

(1)編寫FlowBean類

package com.atguigu.mr.top;

 

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

 

import org.apache.hadoop.io.WritableComparable;

 

public class FlowBean implements WritableComparable<FlowBean>{

 

    private long upFlow;

    private long downFlow;

    private long sumFlow;

      

    

    public FlowBean() {

        super();

    }

 

    public FlowBean(long upFlow, long downFlow) {

        super();

        this.upFlow = upFlow;

        this.downFlow = downFlow;

    }

 

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeLong(upFlow);

        out.writeLong(downFlow);

        out.writeLong(sumFlow);

    }

 

    @Override

    public void readFields(DataInput in) throws IOException {

        upFlow = in.readLong();

        downFlow = in.readLong();

        sumFlow = in.readLong();

    }

 

    public long getUpFlow() {

        return upFlow;

    }

 

    public void setUpFlow(long upFlow) {

        this.upFlow = upFlow;

    }

 

    public long getDownFlow() {

        return downFlow;

    }

 

    public void setDownFlow(long downFlow) {

        this.downFlow = downFlow;

    }

 

    public long getSumFlow() {

        return sumFlow;

    }

 

    public void setSumFlow(long sumFlow) {

        this.sumFlow = sumFlow;

    }

 

    @Override

    public String toString() {

        return upFlow + "\t" + downFlow + "\t" + sumFlow;

    }

 

    public void set(long downFlow2, long upFlow2) {

        downFlow = downFlow2;

        upFlow = upFlow2;

        sumFlow = downFlow2 + upFlow2;

    }

 

    @Override

    public int compareTo(FlowBean bean) {

        

        int result;

        

        if (this.sumFlow > bean.getSumFlow()) {

            result = -1;

        }else if (this.sumFlow < bean.getSumFlow()) {

            result = 1;

        }else {

            result = 0;

        }

        

        return result;

    }

}

(2)編寫TopNMapper類

package com.atguigu.mr.top;

 

import java.io.IOException;

import java.util.Iterator;

import java.util.TreeMap;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class TopNMapper extends Mapper<LongWritable, Text, FlowBean, Text>{

    

    // 定義一個TreeMap作為存儲數據的容器(天然按key排序)

    private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

    private FlowBean kBean;

    

    @Override

    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {

        

        kBean = new FlowBean();

        Text v = new Text();

        

        // 1 獲取一行

        String line = value.toString();

        

        // 2 切割

        String[] fields = line.split("\t");

        

        // 3 封裝數據

        String phoneNum = fields[0];

        long upFlow = Long.parseLong(fields[1]);

        long downFlow = Long.parseLong(fields[2]);

        long sumFlow = Long.parseLong(fields[3]);

        

        kBean.setDownFlow(downFlow);

        kBean.setUpFlow(upFlow);

        kBean.setSumFlow(sumFlow);

        

        v.set(phoneNum);

        

        // 4 TreeMap中添加數據

        flowMap.put(kBean, v);

        

        // 5 限制TreeMap的數據量,超過10條就刪除掉流量最小的一條數據

        if (flowMap.size() > 10) {

//        flowMap.remove(flowMap.firstKey());

            flowMap.remove(flowMap.lastKey());        

}

    }

    

    @Override

    protected void cleanup(Context context) throws IOException, InterruptedException {

        

        // 6 遍歷treeMap集合,輸出數據

        Iterator<FlowBean> bean = flowMap.keySet().iterator();

 

        while (bean.hasNext()) {

 

            FlowBean k = bean.next();

 

            context.write(k, flowMap.get(k));

        }

    }

}

(3)編寫TopNReducer類

package com.atguigu.mr.top;

 

import java.io.IOException;

import java.util.Iterator;

import java.util.TreeMap;

 

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class TopNReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

 

    // 定義一個TreeMap作為存儲數據的容器(天然按key排序)

    TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

 

    @Override

    protected void reduce(FlowBean key, Iterable<Text> values, Context context)throws IOException, InterruptedException {

 

        for (Text value : values) {

 

             FlowBean bean = new FlowBean();

             bean.set(key.getDownFlow(), key.getUpFlow());

 

             // 1 treeMap集合中添加數據

            flowMap.put(bean, new Text(value));

 

            // 2 限制TreeMap數據量,超過10條就刪除掉流量最小的一條數據

            if (flowMap.size() > 10) {

                // flowMap.remove(flowMap.firstKey());

flowMap.remove(flowMap.lastKey());

            }

        }

    }

 

    @Override

    protected void cleanup(Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {

 

        // 3 遍歷集合,輸出數據

        Iterator<FlowBean> it = flowMap.keySet().iterator();

 

        while (it.hasNext()) {

 

            FlowBean v = it.next();

 

            context.write(new Text(flowMap.get(v)), v);

        }

    }

}

(4)編寫TopNDriver類

package com.atguigu.mr.top;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class TopNDriver {

 

    public static void main(String[] args) throws Exception {

        

        args = new String[]{"e:/output1","e:/output3"};

        

        // 1 獲取配置信息,或者job對象實例

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration);

 

        // 6 指定本程序的jar包所在的本地路徑

        job.setJarByClass(TopNDriver.class);

 

        // 2 指定本業務job要使用的mapper/Reducer業務類

        job.setMapperClass(TopNMapper.class);

        job.setReducerClass(TopNReducer.class);

 

        // 3 指定mapper輸出數據的kv類型

        job.setMapOutputKeyClass(FlowBean.class);

        job.setMapOutputValueClass(Text.class);

 

        // 4 指定最終輸出的數據的kv類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(FlowBean.class);

 

        // 5 指定job的輸入原始文件所在目錄

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

 

        // 7 job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

7.3 找博客共同好友案例

1.需求

以下是博客的好友列表數據,冒號前是一個用戶,冒號后是該用戶的所有好友(數據中的好友關系是單向的)

求出哪些人兩兩之間有共同好友,及他倆的共同好友都有誰?

(1)數據輸入

2.需求分析

先求出A、B、C、….等是誰的好友

第一次輸出結果

A    I,K,C,B,G,F,H,O,D,

B    A,F,J,E,

C    A,E,B,H,F,G,K,

D    G,C,K,A,L,F,E,H,

E    G,M,L,H,A,F,B,D,

F    L,M,D,C,G,A,

G    M,

H    O,

I    O,C,

J    O,

K    B,

L    D,E,

M    E,F,

O    A,H,I,J,F,

第二次輸出結果

A-B    E C

A-C    D F

A-D    E F

A-E    D B C

A-F    O B C D E

A-G    F E C D

A-H    E C D O

A-I    O

A-J    O B

A-K    D C

A-L    F E D

A-M    E F

B-C    A

B-D    A E

B-E    C

B-F    E A C

B-G    C E A

B-H    A E C

B-I    A

B-K    C A

B-L    E

B-M    E

B-O    A

C-D    A F

C-E    D

C-F    D A

C-G    D F A

C-H    D A

C-I    A

C-K    A D

C-L    D F

C-M    F

C-O    I A

D-E    L

D-F    A E

D-G    E A F

D-H    A E

D-I    A

D-K    A

D-L    E F

D-M    F E

D-O    A

E-F    D M C B

E-G    C D

E-H    C D

E-J    B

E-K    C D

E-L    D

F-G    D C A E

F-H    A D O E C

F-I    O A

F-J    B O

F-K    D C A

F-L    E D

F-M    E

F-O    A

G-H    D C E A

G-I    A

G-K    D A C

G-L    D F E

G-M    E F

G-O    A

H-I    O A

H-J    O

H-K    A C D

H-L    D E

H-M    E

H-O    A

I-J    O

I-K    A

I-O    A

K-L    D

K-O    A

L-M    E F

3.代碼實現

(1)第一次Mapper類

package com.atguigu.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class OneShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{

    

    @Override

    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)

            throws IOException, InterruptedException {

 

        // 1 獲取一行 A:B,C,D,F,E,O

        String line = value.toString();

        

        // 2 切割

        String[] fields = line.split(":");

        

        // 3 獲取person和好友

        String person = fields[0];

        String[] friends = fields[1].split(",");

        

        // 4寫出去

        for(String friend: friends){

 

            // 輸出 <好友,人>

            context.write(new Text(friend), new Text(person));

        }

    }

}

(2)第一次Reducer類

package com.atguigu.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class OneShareFriendsReducer extends Reducer<Text, Text, Text, Text>{

    

    @Override

    protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {

        

        StringBuffer sb = new StringBuffer();

 

        //1 拼接

        for(Text person: values){

            sb.append(person).append(",");

        }

        

        //2 寫出

        context.write(key, new Text(sb.toString()));

    }

}

(3)第一次Driver類

package com.atguigu.mapreduce.friends;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class OneShareFriendsDriver {

 

    public static void main(String[] args) throws Exception {

        

// 1 獲取job對象

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration);

        

        // 2 指定jar包運行的路徑

        job.setJarByClass(OneShareFriendsDriver.class);

 

        // 3 指定map/reduce使用的類

        job.setMapperClass(OneShareFriendsMapper.class);

        job.setReducerClass(OneShareFriendsReducer.class);

        

        // 4 指定map輸出的數據類型

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(Text.class);

        

        // 5 指定最終輸出的數據類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        

        // 6 指定job的輸入原始所在目錄

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        

        // 7 提交

        boolean result = job.waitForCompletion(true);

        

        System.exit(result?0:1);

    }

}

(4)第二次Mapper類

package com.atguigu.mapreduce.friends;

import java.io.IOException;

import java.util.Arrays;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class TwoShareFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{

    

    @Override

    protected void map(LongWritable key, Text value, Context context)

            throws IOException, InterruptedException {

 

        // A I,K,C,B,G,F,H,O,D,

        // 人,人,人

        String line = value.toString();

        String[] friend_persons = line.split("\t");

 

        String friend = friend_persons[0];

        String[] persons = friend_persons[1].split(",");

 

        Arrays.sort(persons);

 

        for (int i = 0; i < persons.length - 1; i++) {

            

            for (int j = i + 1; j < persons.length; j++) {

                // 發出 <-人,好友> ,這樣,相同的"人-人"對的所有好友就會到同1reduce中去

                context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));

            }

        }

    }

}

(5)第二次Reducer類

package com.atguigu.mapreduce.friends;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class TwoShareFriendsReducer extends Reducer<Text, Text, Text, Text>{

    

    @Override

    protected void reduce(Text key, Iterable<Text> values, Context context)    throws IOException, InterruptedException {

        

        StringBuffer sb = new StringBuffer();

 

        for (Text friend : values) {

            sb.append(friend).append(" ");

        }

        

        context.write(key, new Text(sb.toString()));

    }

}

(6)第二次Driver類

package com.atguigu.mapreduce.friends;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class TwoShareFriendsDriver {

 

    public static void main(String[] args) throws Exception {

        

// 1 獲取job對象

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration);

        

        // 2 指定jar包運行的路徑

        job.setJarByClass(TwoShareFriendsDriver.class);

 

        // 3 指定map/reduce使用的類

        job.setMapperClass(TwoShareFriendsMapper.class);

        job.setReducerClass(TwoShareFriendsReducer.class);

        

        // 4 指定map輸出的數據類型

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(Text.class);

        

        // 5 指定最終輸出的數據類型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        

        // 6 指定job的輸入原始所在目錄

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        

        // 7 提交

        boolean result = job.waitForCompletion(true);

        System.exit(result?0:1);

    }

}

第8章 常見錯誤及解決方案

1)導包容易出錯。尤其Text和CombineTextInputFormat。

2)Mapper中第一個輸入的參數必須是LongWritable或者NullWritable,不可以是IntWritable. 報的錯誤是類型轉換異常。

3)java.lang.Exception: java.io.IOException: Illegal partition for 13926435656 (4),說明Partition和ReduceTask個數沒對上,調整ReduceTask個數。

4)如果分區數不是1,但是reducetask為1,是否執行分區過程。答案是:不執行分區過程。因為在MapTask的源碼中,執行分區的前提是先判斷ReduceNum個數是否大於1。不大於1肯定不執行。

5)在Windows環境編譯的jar包導入到Linux環境中運行,

hadoop jar wc.jar com.atguigu.mapreduce.wordcount.WordCountDriver /user/atguigu/ /user/atguigu/output

報如下錯誤:

Exception in thread "main" java.lang.UnsupportedClassVersionError: com/atguigu/mapreduce/wordcount/WordCountDriver : Unsupported major.minor version 52.0

原因是Windows環境用的jdk1.7,Linux環境用的jdk1.8。

解決方案:統一jdk版本。

6)緩存pd.txt小文件案例中,報找不到pd.txt文件

原因:大部分為路徑書寫錯誤。還有就是要檢查pd.txt.txt的問題。還有個別電腦寫相對路徑找不到pd.txt,可以修改為絕對路徑。

7)報類型轉換異常。

通常都是在驅動函數中設置Map輸出和最終輸出時編寫錯誤。

Map輸出的key如果沒有排序,也會報類型轉換異常。

8)集群中運行wc.jar時出現了無法獲得輸入文件。

原因:WordCount案例的輸入文件不能放用HDFS集群的根目錄。

9)出現了如下相關異常

Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)

    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)

    at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)

java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

    at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:356)

    at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:371)

    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:364)

解決方案:拷貝hadoop.dll文件到Windows目錄C:\Windows\System32。個別同學電腦還需要修改Hadoop源碼。

方案二:創建如下包名,並將NativeIO.java拷貝到該包名下

10)自定義Outputformat時,注意在RecordWirter中的close方法必須關閉流資源。否則輸出的文件內容中數據為空。

@Override

public void close(TaskAttemptContext context) throws IOException, InterruptedException {

        if (atguigufos != null) {

            atguigufos.close();

        }

        if (otherfos != null) {

            otherfos.close();

        }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM