Hadoop 用Java編寫MapReduce詞頻統計程序並提交到Hadoop集群運行


一、MapReduce介紹

MapReduce是一個分布式計算框架,可以部署在Hadoop、Spark等大數據平台上,實現海量數據的並行計算。它采用“分而治之”的思想,將一個計算任務交給集群中的多台機器共同完成,之后再匯總成最終結果。

一般來說讀取一個TB,PB級的文件,普通計算機的速度是比較慢的,而要想提高速度就要提高計算機的硬件配置,這對於普通用戶來說是很難做到的,也提高了這一領域的門檻。而采用廉價的機器組成分布式系統,只要集群的機器數量足夠多,那么計算的速度就會足夠快。

二、MapReduce特點

優點

(1)易於編程

完全獨立完成一個MapReduce程序是一個很困難的事情,這需要很強的編程能力。好在MapReduce給我們提供了大量的方便開發的接口,我們只需要繼承一些接口,實現一些特定的函數就能完成一個MapReduce程序。

(2)高拓展性

這是一個分布式計算框架,我們可以簡單粗暴的,通過增加機器來提高計算性能。

(3)高容錯性

由於MapReduce集群采用的大多是廉價的機器,宕機,BUG等都是家常便飯。但MapReduce框架提供多種有效的錯誤檢測和恢復機制。如果一個結點出現了問題,其他結點會接替這個結點的工作,等結點恢復正常后,又可以繼續工作,這些都由Hadoop內部完成。

(4)高吞吐量

MapReduce可以對PB級以上也就是1024TB的數據進行離線計算。

缺點

(1)難以實時計算

MapReduce處理的是磁盤上的數據。

(2)不能流式計算

MapReduce處理的是磁盤上的靜態數據,而流式計算的輸入數據的動態的。

(3)難以用於DAG計算

DAG(有向無環圖)多個任務間存在依賴關系,后一個應用的輸入可能是前一個應用的輸出。而MapReduce的輸出結果都會寫在磁盤上,這會造成大量的磁盤IO,降低集群的性能。

三、MapReduce編程

以詞頻統計程序為例

pom依賴

https://mvnrepository.com/尋找以下幾個依賴
hadoop-common、hadoop-hdfs、hadoop-mapreduce-client-core、junit
插件:maven-compiler-plugin、maven-shade-plugin
下面是我的pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.pineapple</groupId>
    <artifactId>MapReduceTest</artifactId>
<!--    <packaging>pom</packaging>-->
    <version>1.0-SNAPSHOT</version>
    <properties>
        <hadoop.version>2.6.0</hadoop.version>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
<!--            <scope>test</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-shade-plugin -->
<!--        <dependency>-->
<!--            <groupId>org.apache.maven.plugins</groupId>-->
<!--            <artifactId>maven-shade-plugin</artifactId>-->
<!--            <version>2.4.3</version>-->
<!--        </dependency>-->
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

要做統計的文件內容為:

Hello,Hadoop,BigData
Hello,Hadoop,MapReduce
Hello,Hadoop,HDFS
BigData,Perfect

MapReduce由Map和Reduce兩個階段組成。

Map

在此之前還有一個讀取文件的操作,這個只要在主類中指定一下就好,不需要寫。

讀取后文件的內容為:

//K1	V1
0 		Hello,Hadoop,BigData
21 		Hello,Hadoop,MapReduce
44 		Hello,Hadoop,HDFS
62 		BigData,Perfect

Map后的文件內容為:

//K2		V2
Hello 		1
Hadoop 		1
BigData 	1
Hello 		1
Hadoop 		1
MapReduce 	1
Hello 		1
Hadoop 		1
HDFS 		1
BigData 	1
Perfect 	1

Map表示“映射”,將文件拆分成多個塊,然后發給集群上的機器統一計算
要繼承Mapper類並重寫map()函數,將K1,V1轉換成下面的K2,V2

package cn.pineapple.day1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 四個泛型的解釋:
 * <p>
 * KEYIN:K1的類型
 * <p>
 * VALUEIN:V1的類型
 * <p>
 * KEYOUT:K2的類型
 * <p>
 * VALUEOUT:V2的類型
 */

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    /**
     * 將K1,V1轉換成K2,V2
     *
     * @param key:     行偏移量
     * @param value:   一行文本內容
     * @param context: 上下文對象
     * @throws IOException:
     * @throws InterruptedException:
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text text = new Text();
        LongWritable longWritable = new LongWritable();
        //1:對一行的文本數據進行拆分
        String[] split = value.toString().split(",");
        //2:遍歷數組,組裝K2和V2
        for (String word : split) {
            //3:將K2和V2寫入上下文中
            text.set(word);
            longWritable.set(1);
            context.write(text, longWritable);
        }
    }
}

map()函數內的代碼,只需要針對文件一行考慮。所以首先要提取每個單詞,用value.toString()將value也就是這一行的內容轉換成String類型,再用split()方法進行拆分。K2的值默認為1。在寫入的時候,還要將K2和V2轉換成Text類型和LongWritable類型。

Reduce

在Map和Reduce中間還有一個shuffle,目前用不到這個shuffle,可以采取默認的方式,它會把K2,V2轉換成:

//K2		V2
Hello 		<1,1,1>
Hadoop 		<1,1,1>
BigData 	<1,1>
HDFS 		<1>
MapReduce 	<1>
Perfect 	<1>

接着Reduce的最終結果是:

//K3 		V3
BigData		2
HDFS		1
Hadoop		3
Hello		3
MapReduce	1
Perfect		1

Reduce表示“歸約”,將所有結果都統一起來,繼承Reducer類並重寫reduce()函數,將K2和V2轉換為K3和V3,也就是最終結果。

package cn.pineapple.day1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


/**
 * 四個泛型的解釋:
 * <p>
 * KEYIN:K2類型
 * <p>
 * VALUEIN:V2類型
 * <p>
 * KEYOUT:K3類型
 * <p>
 * VALUEOUT:V3類型
 */
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    /**
     * 將K2和V2轉換為K3和V3,將K3和V3寫入上下文中
     *
     * @param key:     新K2
     * @param values:  新V2
     * @param context: 上下文對象
     * @throws IOException:
     * @throws InterruptedException:
     */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0;
        //1:遍歷集合,將集合中的數字相加,得到V3
        for (LongWritable value : values) {
            count += value.get();
        }
        //2:將K3V3寫入上下文中
        context.write(key, new LongWritable(count));
    }
}

為了實現這個最終結果,K3和K2是一樣的,我們只需要遍歷這個集合,然后相加就能得到V3。value.get()可以將LongWritable類型轉換成long類型,最后在寫入的時候再進行一次轉換。

Main方法

光有這兩個類是不行的,我們還要寫一個Main方法,詳細的列一下任務流程,指定一下任務配置等。
要繼承Configured類並實現Tool接口。

package cn.pineapple.day1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool {

    /**
     * 指定一個job任務
     *
     * @param strings:
     * @return :
     * @throws Exception:
     */
    public int run(String[] strings) throws Exception {
        //1:創建一個job任務對象
        Job wordCount = Job.getInstance(super.getConf(), "WordCount");
        //如果打包運行出錯,則需要加改配置
        wordCount.setJarByClass(JobMain.class);

        //2:配置job任務對象(八個步驟)

        //第一步:指定文件的讀取方式和
        wordCount.setInputFormatClass(TextInputFormat.class);
        //      讀取路徑
        TextInputFormat.addInputPath(wordCount, new Path("hdfs://nsv:8020/input/wordcount"));

        //第二步:指定Map階段的處理方式和數據類型
        wordCount.setMapperClass(WordCountMapper.class);
        //      設置K2的類型
        wordCount.setMapOutputKeyClass(Text.class);
        //      設置V2的類型
        wordCount.setMapOutputValueClass(LongWritable.class);

        //第三、四、五、六shuffle階段采用默認的方式

        //第七步:指定Reduce階段的處理方式和數據類型
        wordCount.setReducerClass(WordCountReducer.class);
        //      設置K3的類型
        wordCount.setOutputKeyClass(Text.class);
        //      設置V3的類型
        wordCount.setOutputValueClass(LongWritable.class);

        //第八步:指定輸出類型
        wordCount.setOutputFormatClass(TextOutputFormat.class);
        //      輸出路徑
        TextOutputFormat.setOutputPath(wordCount, new Path("hdfs://nsv:8020/output/wordcount"));

        //等待任務結束
        boolean bl = wordCount.waitForCompletion(true);

        return bl ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //啟動job任務
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    }
}

run()方法里就是本次任務的詳細流程和配置,然后main()方法里要調用上面寫的run方法。它接受一個int類型作為退出代碼,0或1。

四、打包運行

雙擊package打包,出現BUILD SUCCESS表示打包成功

在這里插入圖片描述
出現了兩個jar包,origin包是因為用了maven-shade-plugin打包插件,不會包含依賴jar包,所有體積較小,可以選擇這個jar包運行。

將要統計的文件放到HDFS上
hdfs dfs -mkdir -p /input/wordcount
hdfs dfs -put wordcount.txt /input/wordcount

將jar包上傳到集群上后,跑一下嘍
hadoop jar original-MapReduceTest-1.0-SNAPSHOT.jar cn.pineapple.day1.JobMain,cn.pineapple.day1.JobMain是主類的全路徑

在這里插入圖片描述
completed successfully看來是成功了,去Web上看一下實際結果

在這里插入圖片描述
點擊Download可以查看下載結果

在這里插入圖片描述


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM