使用Java API方式的MapReduce練習


眾所周知,hadoop生態圈的多數組件都是使用java開發的。

那么使用Java API方式實現起來,顯得要比其它語言效率更高,更原生態。

前面有一個Hadoop學習筆記02_MapReduce練習 是在Linux下直接使用的python2.7實現的。這里我試試windows下用 java 來練習實現。

→_→ 確認過眼神~~ 我是新手,感覺IDEA創建maven要比eclipse方便,更加好用。更主要的是,我在eclipse里找了半天沒找到maven  >_<|||

 

練習一:單詞統計,wordcount

1. IntelliJ IDEA中New project, maven,SDK1.8 , Next,  輸入 Groupid : examplemr , ArtifactId : examplemr , Version: 1.0 , Next, 

  Project name : examplemr , Project location: D:\test\examplemr   Finish

  修改pom.xml  引入必要的dependency。 在IDE的提示中, 點擊 Import Changes 等待自動下載相關的依賴包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast</groupId>
    <artifactId>example-mr</artifactId>
    <version>1.0</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.4</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <!-- 打jar包插件 -->
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>cn.itcast.hadoop.mr.WordCountDriver</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>


展開src,main,java, 右擊, new,package,輸入cn.abc.hadoop.mr  再 New, Java Class, 輸入WordCountMapper

具體代碼如下, 注意import時的包名正確。

package cn.abc.hadoop.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * Created by abc .
 *
 * 這里就是mapreduce程序  mapper階段業務邏輯實現的類
 *
 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 *
 * KEYIN:表示mapper數據輸入的時候key的數據類型,在默認的讀取數據組件下,叫InputFormat,它的行為是一行一行的讀取待處理的數據
 *        讀取一行,返回一行給我們的mr程序,這種情況下  keyin就表示每一行的起始偏移量  因此數據類型是Long
 *
 * VALUEIN:表述mapper數據輸入的時候value的數據類型,在默認的讀取數據組件下 valuein就表示讀取的這一行內容  因此數據類型是String
 *
 * KEYOUT 表示mapper數據輸出的時候key的數據類型  在本案例當中 輸出的key是單詞  因此數據類型是 String
 *
 * VALUEOUT表示mapper數據輸出的時候value的數據類型  在本案例當中 輸出的key是單詞的次數  因此數據類型是 Integer
 *
 * 這里所說的數據類型String Long都是jdk自帶的類型   在序列化的時候  效率低下 因此hadoop自己封裝一套數據類型
 *   long---->LongWritable
 *   String-->Text
 *   Integer--->Intwritable
 *   null-->NullWritable
 *
 *
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

    /**
     *  這里就是mapper階段具體的業務邏輯實現方法  該方法的調用取決於讀取數據的組件有沒有給mr傳入數據
     *  如果有的話  每傳入一個《k,v》對  該方法就會被調用一次
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //拿到傳入進來的一行內容,把數據類型轉化為String
        String line = value.toString();

        //將這一行內容按照分隔符進行一行內容的切割 切割成一個單詞數組
        String[] words = line.split(" ");

        //遍歷數組,每出現一個單詞  就標記一個數字1  <單詞,1>
        for (String word : words) {
            //使用mr程序的上下文context 把mapper階段處理的數據發送出去
            //作為reduce節點的輸入數據
            context.write(new Text(word),new IntWritable(1));
            //hadoop hadoop spark -->   <hadoop,1><hadoop,1><spark,1>
        }
    }
}

 

繼續 New, Java Class, WordCountReducer 代碼如下:

package cn.abc.hadoop.mr;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created by abc
 *
 * 這里是MR程序 reducer階段處理的類
 *
 * KEYIN:就是reducer階段輸入的數據key類型,對應mapper的輸出key類型  在本案例中  就是單詞  Text
 *
 * VALUEIN就是reducer階段輸入的數據value類型,對應mapper的輸出value類型  在本案例中  就是單詞次數  IntWritable
 * .
 * KEYOUT就是reducer階段輸出的數據key類型 在本案例中  就是單詞  Text
 *
 * VALUEOUTreducer階段輸出的數據value類型 在本案例中  就是單詞的總次數  IntWritable
 */
public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

    /**
     * 這里是reduce階段具體業務類的實現方法
     * @param key
     * @param values
     * @param context
     * @throws IOException
     * @throws InterruptedException
     *
     * reduce接收所有來自map階段處理的數據之后,按照key的字典序進行排序
     * <hello,1><hadoop,1><spark,1><hadoop,1>
     * 排序后:
     * <hadoop,1><hadoop,1><hello,1><spark,1>
     *
     *按照key是否相同作為一組去調用reduce方法
     * 本方法的key就是這一組相同kv對的共同key
     * 把這一組所有的v作為一個迭代器傳入我們的reduce方法
     *
     * <hadoop,[1,1]>
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //定義一個計數器
        int count = 0;
        //遍歷一組迭代器,把每一個數量1累加起來就構成了單詞的總次數

        for(IntWritable value:values){
            count +=value.get();
        }

        //把最終的結果輸出
        context.write(key,new IntWritable(count));
    }
}

 

最后,WordCountDriver 代碼,寫完就可以右擊,Run 一下試試。

package cn.abc.hadoop.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * Created by abc .
 *
 * 這個類就是mr程序運行時候的主類,本類中組裝了一些程序運行時候所需要的信息
 * 比如:使用的是那個Mapper類  那個Reducer類  輸入數據在那 輸出數據在什么地方
 */
public class WordCountDriver {
    public static void main(String[] args) throws Exception{
        //通過Job來封裝本次mr的相關信息
        Configuration conf = new Configuration();
        // 即使沒有下面這行,也可以本地運行 因\hadoop-mapreduce-client-core-2.7.4.jar!\mapred-default.xml 中默認的參數就是 local
        //conf.set("mapreduce.framework.name","local");
        Job job = Job.getInstance(conf);

        //指定本次mr job jar包運行主類
        job.setJarByClass(WordCountDriver.class);

        //指定本次mr 所用的mapper reducer類分別是什么
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //指定本次mr mapper階段的輸出  k  v類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //指定本次mr 最終輸出的 k v類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // job.setNumReduceTasks(3); //ReduceTask個數

        //如果業務有需求,就可以設置combiner組件
        job.setCombinerClass(WordCountReducer.class);

        //指定本次mr 輸入的數據路徑 和最終輸出結果存放在什么位置
        FileInputFormat.setInputPaths(job,"D:\\wordcount\\input");
        FileOutputFormat.setOutputPath(job,new Path("D:\\wordcount\\output"));
        //如果出現0644錯誤或找不到winutils.exe,則需要設置windows環境和相關文件.

        //上面的路徑是本地測試時使用,如果要打包jar到hdfs上運行時,需要使用下面的路徑。
        //FileInputFormat.setInputPaths(job,"/wordcount/input");
        //FileOutputFormat.setOutputPath(job,new Path("/wordcount/output"));

        // job.submit(); //一般不要這個.
        //提交程序  並且監控打印程序執行情況
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

如果出現0644錯誤或找不到winutils.exe,則需要設置windows環境和相關文件.

設置環境變量HADOOP_HOME指向hadoop-2.7.5.tar.gz完整包解壓后的路徑,並且在Path中添加 %HADOOP_HOME%\bin;

然后下載win環境所需文件,解壓,將其復制到環境變量為HADOOP_HOME的真實路徑下的bin目錄中。

如果想要看看日志,可以在 examplemr\src\main\resources\下放入 log4j.properties 文件。運行后,將會看到\examplemr\mapreduce_test.log 日志。

相關文件: hadoop-win相關文件winutils.exe | log4j.properties

 如果要打成jar包,傳到HDFS上運行:

需要在pom.xml中添加 build 部分,詳見上面pom.xml中的 build部分,注意主類入口就是examplemr\src\main\java\cn\abc\hadoop\mr\下的 WordCountDriver.java

<mainClass>cn.abc.hadoop.mr.WordCountDriver</mainClass>

且需要修改 WordCountDriver.java中的路徑信息:

//如果要打包jar到hdfs上運行時,需要使用下面的路徑。
        FileInputFormat.setInputPaths(job,"/wordcount/input");
        FileOutputFormat.setOutputPath(job,new Path("/wordcount/output"));

在IDEA中, View, Tool Windows, Maven Project 顯示出工具,雙擊項目下Lifecycle中的 package 等待打包完成。

然后找到項目中 target下,example-mr-1.0.jar文件,傳入Linux,

然后在hadoop已成功啟動的前提下,執行:

hdfs dfs -mkdir -p /wordcount/input  # 創建對應的文件夾

hdfs dfs -put 1.txt 2.txt /wordcount/input/  # 上傳需要分析的數據文件

hadoop jar example-mr-1.0.jar  # 運行此MR程序。因為打包前在pom.xml中指定了主類入口,所以,這里不再指定入口參數。

等待執行結果,或者上 http://hadoop主機名:8088上查看進程與結果。

 

練習二:簡單流量統計,FlowSum

首先是如下這樣的日志文件:需要統計倒數第2(上行流量)、倒數第3(下行流量) 相加后的總流量,條件是基於手機號碼(第2字段)

1363157985066	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com	24	27	2481	24681	200
1361157912132	13726230513	00-FD-07-A4-72-B8:CMCC	120.196.40.8	4	7	248	0	200
1363157985033	13826230523	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com	24	27	2481	24681	200
1363157985012	13726230533	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com	24	27	2481	24681	200
1363157125076	13726230543	00-FD-07-A4-72-B8:CMCC	120.196.100.82	視頻網站	24	27	1527	2106	200
1363157985011	13926230553	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com	24	27	2481	24681	200
1363157985016	13826230563	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com	24	27	2481	24681	200
1363157985123	13926230573	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com	24	27	2481	24681	200
1363157985135	18912688533	00-FD-07-A4-72-B8:CMCC	220.196.100.82	綜合門戶	15	12	1938	2910	200
1363157985432	18912688533	00-FD-07-A4-72-B8:CMCC	220.196.100.82	i02.c.aliimg.com	24	27	3333	21321	200
1363157985321	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	搜索引擎	24	27	9531	9531	200
1363157985222	13826230523	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com	24	27	2481	24681	200
1363157983331	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com	24	27	2481	24681	200
View Code

 

pom.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast</groupId>
    <artifactId>example-mr</artifactId>
    <version>1.0</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.4</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <!-- 打jar包插件 -->
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>cn.abc.hadoop.mr.WordCountDriver</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

 

Map階段:

package cn.abc.hadoop.flowsum;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

    Text k = new Text();
    FlowBean v = new FlowBean(); // new一次對象,多次使用. 效率較高
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("\t"); //數據文件如果是兩個連續的 \t 會有問題.

        String phoneNum = fields[1];
        long upFlow = Long.parseLong(fields[fields.length-3]); //倒着獲取需要的字段
        long downFlow = Long.parseLong(fields[fields.length-2]);

        // context.write(new Text(phoneNum),new FlowBean(upFlow,downFlow)); //效率太低,每次都產生對象.

        //使用對象中的set方法來寫入數據,避免大量new對象
        k.set(phoneNum);
        v.set(upFlow,downFlow);
        context.write(k,v);
    }
}

 

Reduce階段:

package cn.abc.hadoop.flowsum;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * <手機號1, bean><手機號2, bean><手機號2, bean><手機號, bean>
 *
 *     <手機號1, bean><手機號1, bean>
 *     <手機號2, bean><手機號2, bean>
 */
public class FlowSumReducer extends Reducer<Text,FlowBean,Text,FlowBean>{

    FlowBean v = new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long upFlowCount = 0;
        long downFlowCount = 0;

        for(FlowBean bean:values){
            upFlowCount += bean.getUpFlow();
            downFlowCount += bean.getDownFlow();
        }
        v.set(upFlowCount,downFlowCount);
        context.write(key,v);
    }
}

 

驅動: 還是先在本地測試,寫本地路徑。

package cn.abc.hadoop.flowsum;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowSumDriver {
    public static void main(String[] args) throws Exception{
        //通過Job來封裝本次mr的相關信息
        Configuration conf = new Configuration();
        //conf.set("mapreduce.framework.name","local");
        Job job = Job.getInstance(conf);

        //指定本次mr job jar包運行主類
        job.setJarByClass(FlowSumDriver.class);

        //指定本次mr 所用的mapper reducer類分別是什么
        job.setMapperClass(FlowSumMapper.class);
        job.setReducerClass(FlowSumReducer.class);

        //指定本次mr mapper階段的輸出  k  v類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        //指定本次mr 最終輸出的 k v類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //指定本次mr 輸入的數據路徑 和最終輸出結果存放在什么位置
        FileInputFormat.setInputPaths(job,"D:\\flowsum\\input");
        FileOutputFormat.setOutputPath(job,new Path("D:\\flowsum\\output"));
        //FileInputFormat.setInputPaths(job,"/wordcount/input");
        //FileOutputFormat.setOutputPath(job,new Path("/wordcount/output"));

        // job.submit(); //一般不要這個.
        //提交程序  並且監控打印程序執行情況
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

 

用到的類:

package cn.abc.hadoop.flowsum;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable {
    //寫上這句{}之后,點擊Writable,點擊左邊的燈泡,Implement Mothods 選中write和readFields后OK。
    // Hadoop的Writable 比JAVA自帶的要高效很多。
    //手工輸入,定義三個屬性
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    // R click blank, Generator , Constructor, Select None 生成無參構造函數
    public FlowBean() {
    }

    //右擊類名,Generate, Constructor,選中所有子項,OK,生成有參構造函數
    // R click blank, Generator , Constructor, select All, OK
    public FlowBean(long upFlow, long downFlow, long sumFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = sumFlow;
    }

    //自己構造一個2個參數的方法。
    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    //自己構造一個有返回值的 set方法,
    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    //這是序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public String toString() {
        return upFlow+"\t"+downFlow+"\t"+sumFlow;
    }

    //這是反序列化方法
    //反序列化時候, 注意序列化的順序
    // 先序列化的先出來.
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    // 右擊類名,Generate, Getter and Setter, 選中全部,OK
    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}

 

最后在output文件夾下 part-r-00000 中得到類似於下面的結果:

13726230503	14493	58893	73386
13726230513	248	0	248
13726230533	2481	24681	27162
13726230543	1527	2106	3633
13826230523	4962	49362	54324
13826230563	2481	24681	27162
13926230553	2481	24681	27162
13926230573	2481	24681	27162
18912688533	5271	24231	29502
View Code

 

又碰到改需求的情況了, 現在要把前面得到的統計結果按總流量降序排列

也就是在前一次MR的基礎上,再來一次MR。

在前面flowsum包下再建一個包 sort。

修改FlowBean主類為FlowBean implements WritableComparable<FlowBean> 並且實現public int compareTo 方法

import org.apache.hadoop.io.WritableComparable;


public class FlowBean implements WritableComparable<FlowBean> {
    //寫上WritableComparable,右擊,Generate, Implement Mothods, compareTo, OK。

    // 原有代碼不變... ...

    //這里就是Bean比較大小的方法  默認是如果指定的數與參數相等返回0,指定的數小於參數返回-1, 大於返回1
    @Override
    public int compareTo(FlowBean o) { //來自主類中方法的重寫
        //實現按照總流量的倒序排序
        return this.sumFlow > o.getSumFlow() ? -1 : 1;
        //正常邏輯 return this.sumFlow > o.getSumFlow() ? 1 : -1;
    }

 

在sort包下添加類,FlowSumSort

package cn.abc.hadoop.flowsum.sort;

import cn.abc.hadoop.flowsum.FlowBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 又碰到改需求的情況了, 現在要把前面 FlowSum 得到的統計結果按總流量降序排列
 */
public class FlowSumSort {
    public static class FlowSumSortMapper extends Mapper<LongWritable,Text,FlowBean,Text>{

        Text v = new Text();
        FlowBean k = new FlowBean();

        protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
            String line = value.toString();
            String[] fields = line.split("\t");
            String phoneNum = fields[0];

            long upFlow = Long.parseLong(fields[1]);
            long downFlow = Long.parseLong(fields[2]);

            k.set(upFlow,downFlow);
            v.set(phoneNum);

            context.write(k,v);
        }
    }


    public static class FlowSumSortReducer extends Reducer<FlowBean,Text,Text,FlowBean>{
        @Override
        protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            context.write(values.iterator().next(),key);
        }
    }


    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //指定這個job所在的jar包位置
        job.setJarByClass(FlowSumSort.class);

        //指定使用的Mapper是哪個類,Reducer是哪個類
        job.setMapperClass(FlowSumSortMapper.class);
        job.setReducerClass(FlowSumSortReducer.class);

        //設置業務邏輯Mapper類的輸出key 和value的數據類型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        //設置業務邏輯Reducer類的輸出key 和value的數據類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job,"D:\\flowsum\\output"); //使用上一次MR的輸出結果為輸入數據
        //指定處理完成之后的結果保存位置
        FileOutputFormat.setOutputPath(job,new Path("D:\\flowsum\\outputsort"));

        //向yarn集群提交這個job
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);
    }
}

最后執行,看看得到的結果outputsort是不是已經按照總流量降序排列。

13726230503	14493	58893	73386
13826230523	4962	49362	54324
18912688533	5271	24231	29502
13726230533	2481	24681	27162
13826230563	2481	24681	27162
13926230553	2481	24681	27162
13926230573	2481	24681	27162
13726230543	1527	2106	3633
13726230513	248	0	248



繼續改需求, 現在要按手機歸屬地進行統計

修改重新來一次MR。

在flowsum下新建包,partitioner, 添加類ProvincePartitioner.java

package cn.abc.hadoop.flowsum.partitioner;

import cn.abc.hadoop.flowsum.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.HashMap;


public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    public static HashMap<String, Integer> provinceMap = new HashMap();

    static {
        provinceMap.put("137", 0); //模擬手機歸屬地
        provinceMap.put("138", 1);
        provinceMap.put("139", 2);
    }

    //這里就是實際分區方法,返回分區編號,分區編號就決定了數據到哪個分區中
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        Integer code = provinceMap.get(key.toString().substring(0, 3));

        if (code != null) {
            return code;
        }

        return 3; //不在上方列表中的
    }
}

 

MR程序寫在一個文件中 : FlowSumProvince

package cn.abc.hadoop.flowsum.partitioner;

import cn.abc.hadoop.flowsum.FlowBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 改需求:將流量統計結果按手機歸屬地不同省份輸出到不同文件中。
 */
public class FlowSumProvince {
    public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text,Text, FlowBean> {

        Text k = new Text();
        FlowBean v = new FlowBean();

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //拿取一行文本轉為String
            String line = value.toString();
            //按照分隔符 \t 進行分隔
            String[] fields = line.split("\t");
            //獲取用戶手機號
            String phoneNum = fields[1];

            long upFlow = Long.parseLong(fields[fields.length - 3]);
            long downFlow = Long.parseLong(fields[fields.length - 2]);

            k.set(phoneNum);
            v.set(upFlow, downFlow);

            context.write(k, v);
        }
    }


    public static class FlowSumProvinceReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

        FlowBean v = new FlowBean();

        @Override
        protected void reduce(Text key, Iterable<FlowBean> flowBeans, Context context) throws IOException, InterruptedException {

            long upFlowCount = 0;
            long downFlowCount = 0;

            for (FlowBean flowBean : flowBeans){
                upFlowCount += flowBean.getUpFlow();
                downFlowCount += flowBean.getDownFlow();
            }

            v.set(upFlowCount,downFlowCount);
            context.write(key,v);
        }
    }


    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //指定這個job所在的jar包位置
        job.setJarByClass(FlowSumProvince.class);

        //指定使用的Mapper是哪個類,Reducer是哪個類
        job.setMapperClass(FlowSumProvinceMapper.class);
        job.setReducerClass(FlowSumProvinceReducer.class);

        //設置業務邏輯Mapper類的輸出key 和value的數據類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        //設置業務邏輯Reducer類的輸出key 和value的數據類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        job.setNumReduceTasks(4); //設定ReduceTask數量 等於ProvincePartitioner中的provinceMap中數量
        // 分區個數 < ReduceTasks個數,正常執行,會有空結果文件產生
        // 分區個數 > ReduceTasks個數,錯誤 Illegal partition


        //這里指定自定義分區組件,如果不指定,默認就是hashcode
        job.setPartitionerClass(ProvincePartitioner.class);

        FileInputFormat.setInputPaths(job, "D:\\flowsum\\input"); //輸入數據
        //指定處理完成之后的結果保存位置
        FileOutputFormat.setOutputPath(job, new Path("D:\\flowsum\\outputProvince"));

        //向yarn集群提交這個job
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);
    }
}

 因為這次指定了分區組件ProvincePartitioner ,所以,得到了4個結果文件。也就是按照ProvincePartitioner里面的"137","138","139","其它"分區的結果。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM