在 IDEA中運行 WordCount


一、新建一個maven項目

二、pom.xml 中內容

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>1</groupId>
    <artifactId>1</artifactId>
    <version>1.0-SNAPSHOT</version>

    <repositories>
        <repository>
            <id>apache</id>
            <url>http://maven.apache.org</url>
        </repository>
    </repositories>

    <dependencies>
        <!--<dependency>-->
            <!--<groupId>org.apache.hadoop</groupId>-->
            <!--<artifactId>hadoop-core</artifactId>-->
            <!--<version>2.7.2</version>-->
        <!--</dependency>-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <artifactId>maven-dependency-plugin</artifactId>
                <configuration>
                    <excludeTransitive>false</excludeTransitive>
                    <stripVersion>true</stripVersion>
                    <outputDirectory>./lib</outputDirectory>
                </configuration>

            </plugin>
        </plugins>
    </build>
</project>

三、准備數據文件

注意點:因為Windows當前用戶是 Administrator ,所以需要在 hdfs://master:8020/user/ 目錄下創建文件夾 Administrator ,以后進行本地測試都使用此文件夾。

文件夾創建好之后,還需要給與寫的權限。此處直接給最大權限。

su hdfs
hdfs dfs -mkdir -p /user/Administrator/input
hdfs dfs -chmod -R 777 /user/Administrator
hdfs dfs -put ./wordCountData.txt /user/Administrator/input
exit
 
        

四、創建 WordCount.java 文件
注意點: 因為是在 Windows 上提交 mapreduce 任務,需要在 conf 中設置下面內容。
  conf.set("mapreduce.app-submission.cross-platform", "true"); // 跨平台,保證在 Windows 下可以提交 mr job

否則報錯:/bin/bash: line 0: fg: no job control

 

package com.zjc.mr;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

// 下面的IntWritable 跟 Text 類是hadoop內部類,相當於 java 中的 int 與 String
// MapReduce 程序中互相傳遞的是這種類型的參數
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());//java 自帶的字符串分割函數
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
/*
*eg map output:
* hello 1
* word 1
* hello 1
* hadoop 1
*/
}
}
}

/*
* Reduce 輸入:
* key: hello
* value: [1,1]
*
* Hadoop負責將Map產生的<key,value>處理成{具有相同key的value集合},傳給Reducer
輸入:<key,(listof values)>
輸出:<key,value>
reduce函數(必須是這個名字)的參數,(輸入key,輸入具有相同key的value集合,Context)其中,
輸入的key,value必須類型與map的輸出<key,value>相同,這一點適用於map,reduce類及函數
*
*/
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
System.out.println("-----------------------------------------");
System.out.println("key: "+key);
for (IntWritable val : values) {
System.out.println("val: "+val);
sum += val.get();
}
result.set(sum);
System.out.println("result: "+result.toString());
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapreduce.app-submission.cross-platform", "true"); // 跨平台,保證在 Windows 下可以提交 mr job
Job job = Job.getInstance(conf, "word count"); // 任務名
job.setJarByClass(WordCount.class); // 指定Class
job.setMapperClass(TokenizerMapper.class); // 指定 Mapper Class
job.setCombinerClass(IntSumReducer.class); // 指定 Combiner Class,與 reduce 計算邏輯一樣
job.setReducerClass(IntSumReducer.class); // 指定Reucer Class
job.setOutputKeyClass(Text.class); // 指定輸出的KEY的格式
job.setOutputValueClass(IntWritable.class); // 指定輸出的VALUE的格式
job.setNumReduceTasks(1); //設置Reducer 個數默認1
// Mapper<Object, Text, Text, IntWritable> 輸出格式必須與繼承類的后兩個輸出類型一致
String args_0 = "hdfs://master:8020/user/Administrator/input";
String args_1 = "hdfs://master:8020/user/Administrator/output";
FileInputFormat.addInputPath(job, new Path(args_0)); // 輸入路徑
FileOutputFormat.setOutputPath(job, new Path(args_1)); // 輸出路徑
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
// 每次運行都需要先刪除hdfs中,上一次執行生成的 output 文件夾。 hdfs dfs -rm -R /user/Administrator/output

五、查看結果

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM