hadoop的第一個hello world程序(wordcount)


在hadoop生態中,wordcount是hadoop世界的第一個hello world程序。

wordcount程序是用於對文本中出現的詞計數,從而得到詞頻,本例中的詞以空格分隔。

關於mapper、combiner、shuffler、reducer等含義請參照Hadoop權威指南里的說明。

代碼參考:https://github.com/asker124143222/wordcount

1、hadoop平台搭建

參照之前的帖子搭一個偽分布式的hadoop就可以。鏈接:https://www.cnblogs.com/asker009/p/9126354.html

2、新建一個普通console程序,引入maven框架。

引入hadoop核心依賴,注意hadoop平台用的3.1版本,引入的依賴盡量使用這個版本,以免出現版本兼容問題

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.1.0</version>
<scope>provided</scope>
</dependency>

檢查版本

[hadoop@hp4411s ~]$ hadoop version
Hadoop 3.1.0
Source code repository https://github.com/apache/hadoop -r 16b70619a24cdcf5d3b0fcf4b58ca77238ccbe6d
Compiled by centos on 2018-03-30T00:00Z
Compiled with protoc 2.5.0
From source with checksum 14182d20c972b3e2105580a1ad6990
This command was run using /opt/hadoop/hadoop-3.1.0/share/hadoop/common/hadoop-common-3.1.0.jar

3、編寫mapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @Author: xu.dm
 * @Date: 2019/1/29 16:44
 * @Description: 讀取采用空格分隔的字符,並且每個詞計數為1
 */
public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            System.out.println(word);
            context.write(new Text(word), new IntWritable(1));
        }
    }
}
 
        
Mapper類是一個泛型類型,它有四個形參類型,分別指定map函數的輸入鍵、輸入值、輸出鍵、輸出值的類型。hadoop沒有直接使用Java內嵌的類型,而是自己開發了一套可以優化網絡序列化傳輸的基本類型。這些類型都在org.apache.hadoop.io包中。
Object類型,適用於字段需要使用多種類型的時候,Text類型相當於Java中的String類型,IntWritable類型相當於Java中的Integer類型
context它是mapper的一個內部類繼承自MapContext,是為了在map或是reduce任務中跟蹤task的狀態,記錄map執行的上下文。
在mapper類中,這個context可以存儲一些job conf的信息,比如job運行時參數等,程序可以在map函數中處理這個信息,這種方式是Hadoop中參數傳遞中很常見,context作為了map和reduce執行中各個函數的一個橋梁。

4、編寫reducer

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @Author: xu.dm
 * @Date: 2019/1/29 16:44
 * @Description:累加由map傳遞過來的計數
 */
public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable val:values)
        {
            sum+=val.get();
        }
        context.write(key,new IntWritable(sum));
    }
}

5、關於shuffle過程,shuffle過程是由hadoop系統內部完成,shuffle是在map和reduce之間,對map的結果進行清洗、組合的過程。

借用hadoop權威指南里的一個圖來類比說明

假設我們的數據樣本是:

那么在map階段形成的數據是:

hadoop 1
hadoop 1
abc 1
abc 1
test 1
test 1
wow 1
wow 1
wow 1
... ...

經過shuffle后大概是這樣:

abc [1,1]
hadoop [1,1] test [1,1] wow [1,1,1] ... ...

shuffle對Map的結果進行排序並傳輸到Reduce進行處理 Map的結果並直接存放到硬盤,而是利用緩存做一些預排序處理 Map會調用Combiner,壓縮,按key進行分區、排序等,盡量減少結果的大小 每個Map完成后都會通知Task,

然后Reduce就可以進行處理,shuffle其實就是性能關鍵點。shuffle的結果傳遞給reduce,reduce根據需求決定如何處理這些數據,本例中就是簡單的求和。

6、程序入口,任務調度執行等

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    public static void main(String[] args) throws Exception {
        if(args.length!=2)
        {
            System.err.println("使用格式:WordCount <input path> <output path>");
            System.exit(-1);
        }
        //Configuration類代表作業的配置,該類會加載mapred-site.xml、hdfs-site.xml、core-site.xml等配置文件。
        Configuration conf =new Configuration();

        Path outPath = new Path(args[1]);
        //FileSystem里面包括很多系統,不局限於hdfs
        FileSystem fileSystem = outPath.getFileSystem(conf);
        //刪除輸出路徑
        if(fileSystem.exists(outPath))
        {
            fileSystem.delete(outPath,true);
        }

        Job job = Job.getInstance(conf,"word count"); // new Job(conf, "word count");
        job.setJarByClass(WordCount.class);

        job.setMapperClass(WordCountMapper.class);
        //Combiner最終不能影響reduce輸出的結果
//        job.setCombinerClass(WordCountReducer.class);
        job.setReducerClass(WordCountReducer.class);

        //一般情況下mapper和reducer的輸出的數據類型是一樣的,如果不一樣,可以單獨指定mapper的輸出key、value的數據類型
        //job.setMapOutputKeyClass(Text.class);
        //job.setMapOutputValueClass(IntWritable.class);
//輸入類型通過InputFormat類來控制
//hadoop默認的是TextInputFormat和TextOutputFormat,本例就是對文本進行處理所以可以不用配置。 //job.setInputFormatClass(TextInputFormat.class); //job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定的這個路徑可以是單個文件、一個目錄或符合特定文件模式的一系列文件。 //從方法名稱可以看出,可以通過多次調用這個方法來實現多路徑的輸入。 FileInputFormat.addInputPath(job,new Path(args[0]));

     //在運行job前,這個目錄不應該存在,如果存在hadoop會拒絕執行。這種預防措施的目的是防止數據丟失(長時間的job被意外覆蓋) FileOutputFormat.setOutputPath(job,
new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

 

7、放入hadoop平台中執行

1、打成wordcount.jar包
2、上傳jar包到hadoop用戶目錄下
3、在hadoop用戶目錄下,用vi生成一個測試文檔wc.input,里面隨意填入一些詞,用空格分隔詞。本例中是:
[hadoop@hp4411s ~]$ cat wc.input
hadoop hadoop abc abc test test wow
wow wow
dnf dnf dnf dnf
wow
hd cd
ef hs
xudemin wow wow
xudemin dnf dnf
dnf mytest
4、將wc.input上傳到hdfs文件系統中的/demo/input
hadoop fs -mkdir -p /demo/input
hadoop fs -put wc.input /demo/input
hadoop fs -ls /demo/input

5、用hadoop執行jar包,輸出結果到/demo/output,注意output目錄不能存在,hadoop會自己建立這個目錄,這是hadoop內部的一個機制,如果有這個目錄,程序無法執行。
hadoop jar wordcount.jar /demo/input /demo/output

6、查看運行結果,目錄下有_SUCCESS文件,表示執行成功,結果在part-r-00000中
[hadoop@hp4411s ~]$ hadoop fs -ls /demo/output
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2019-01-30 03:42 /demo/output/_SUCCESS
-rw-r--r--   1 hadoop supergroup         73 2019-01-30 03:42 /demo/output/part-r-00000

7、查看part-r-00000
[hadoop@hp4411s ~]$ hadoop fs -cat /demo/output/part-r-00000
abc    2
cd    1
dnf    7
ef    1
hadoop    2
hd    1
hs    1
mytest    1
test    2
wow    6
xudemin    2

8、關於combiner,上述執行job的時候,程序注釋了一段代碼// job.setCombinerClass(WordCountReducer.class);

在Hadoop中,有一種處理過程叫Combiner,與Mapper和Reducer在處於同等地位,但其執行的時間介於Mapper和Reducer之間,其實就是Mapper和Reducer的中間處理過程,Mapper的輸出是Combiner的輸入,Combiner的輸出是Reducer的輸入。

combiner是什么作用?

因為hadoop的數據實際上是分布在各個不同的datanode,在mapper后,數據需要在從datanode上傳輸,如果數據很大很多,則會在網絡上花費不少時間,而combiner可以先對數據進行處理,減少傳輸量。
處理的方式是自定義的,本例中,每次map運行之后,會對輸出按照key進行排序,然后把輸出傳遞給本地的combiner(按照作業的配置與Reducer一樣),進行本地聚合。
combiner可以先對數據累加,實際上是執行了WordCountReducer類的內容,但是combine因為不是最后階段,所以它只是幫組程序先累加了部分數據(本地的),並沒有累加所有數據。
實際已經減少了mapper傳遞的kv數據量,最終到reducer階段需要累加的數據已經減少了。

注意:combine是不會改變最終的reducer的結果,它是一個優化手段

用hadoop權威指南里天氣數據的例子更深入解釋:

例如獲取歷年的最高溫度例子,以書中所說的1950年為例,在兩個不同分區上的Mapper計算獲得的結果分別如下:

第一個Mapper結果:(1950, [0, 10, 20])

第二個Mapper結果:(1950, [25, 15])

如果不考慮Combiner,按照正常思路,這兩個Mapper的結果將直接輸入到Reducer中處理,如下所示:

MaxTemperature:(1950, [0, 10, 20, 25, 15])

最終獲取的結果是25。

如果考慮Combiner,按照正常思路,這兩個Mapper的結果將分別輸入到兩個不同的Combiner中處理,獲得的結果分別如下所示:

第一個Combiner結果:(1950, [20])

第二個Combiner結果:(1950, [25])

然后這兩個Combiner的結果會輸出到Reducer中處理,如下所示

MaxTemperature:(1950, [20, 25])

最終獲取的結果是25。

由上可知:這兩種方法的結果是一致的,使用Combiner最大的好處是節省網絡傳輸的數據,這對於提高整體的效率是非常有幫助的。

但是,並非任何時候都可以使用Combiner處理機制,例如不是求歷年的最高溫度,而是求平均溫度,則會有另一種結果。同樣,過程如下,

如果不考慮Combiner,按照正常思路,這兩個Mapper的結果將直接輸入到Reducer中處理,如下所示:

AvgTemperature:(1950, [0, 10, 20, 25, 15])

最終獲取的結果是14。

如果考慮Combiner,按照正常思路,這兩個Mapper的結果將分別輸入到兩個不同的Combiner中處理,獲得的結果分別如下所示:

第一個Combiner結果:(1950, [10])

第二個Combiner結果:(1950, [20])

然后這兩個Combiner的結果會輸出到Reducer中處理,如下所示

AvgTemperature:(1950, [10, 20])

最終獲取的結果是15。

由上可知:這兩種方法的結果是不一致的,所以在使用Combiner時,一定是優化的思路,但是不能影響到最終結果。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM