測試項目:本地hadoop環境使用IDEA創建mapreduce項目及調試


操作系統:Win7 64位

Hadoop:2.7.4

中文分詞工具包IKAnalyzer: 5.1.0

開發工具:Intellij IDEA 2017 Community

 

准備中文分詞工具包

項目需要引入中文分詞工具包IKAnalyzer,故第一步是對中文分詞工具包的打包並安裝到本地庫,在這過程中參考研究了以下文章及博客,非常感謝:

http://blog.csdn.net/zhu_tianwei/article/details/46607421

http://blog.csdn.net/cyxlzzs/article/details/7999212

http://blog.csdn.net/cyxlzzs/article/details/8000385

https://my.oschina.net/twosnail/blog/370744

1:下載中文分詞工具包,源代碼地址: https://github.com/linvar/IKAnalyzer

2:下載的源代碼工程的pom.xml文件有點小問題,字典文件不能打包進jar,后面在運行時會報錯誤,需要修改一下

增加 properties節點:

<properties>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   <jdk.version>1.8</jdk.version>
</properties>

增加dependency節點,加入lucene-analyzers-common庫:

<dependency>  
          <groupId>org.apache.lucene</groupId>  
          <artifactId>lucene-analyzers-common</artifactId>
          <version>5.1.0</version>  
</dependency> 

修改build節點,加入resources及maven-jar-plugin:

<build>
   <resources>
      <resource>
         <directory>src/main/java</directory>
         <includes>
            <include>**/*.dic</include>
         </includes>
      </resource>
   </resources>
   <plugins>
      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-compiler-plugin</artifactId>
         <version>3.1</version>
         <configuration>
            <source>${jdk.version}</source>
            <target>${jdk.version}</target>
         </configuration>
      </plugin>
      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-jar-plugin</artifactId>
         <version>2.4</version>
         <configuration>
            <archive>
               <manifest>
                  <addClasspath>true</addClasspath>
                  <classpathPrefix>lib/</classpathPrefix>
               </manifest>
            </archive>
            <!--過濾掉不希望包含在jar中的文件 -->
            <excludes>
               <exclude>${project.basedir}/xml/*</exclude>
            </excludes>
         </configuration>
      </plugin>
   </plugins>
</build>

完成修改后,可以打包安裝到本地庫了,使用mvn install 命令,可以在本地庫中看到

中文詞頻統計及排序:

1. 創建maven工程hdfstest,將前面中文分詞工具包的配置文件拷貝到放在resources目錄內,結構如下:

           

在分詞擴展字典 ext.dic中保存的是需要分詞的中文短語,在src同級目錄下創建input目錄,用於保存本地的測試輸入文件,在resources目錄下需要添加日志配置文件log4j.properties,否則會有如下所示警告信息,無法在窗口輸出mapreduce內容,

 

將以下行添加到log4j.properties配置文件后,在Idea中調試時,可以在底部Console窗口中輸出調試及mapreduce信息:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{1} - %m%n

 

2. 修改pom.xml 配置文件,引入分詞工具包及hadoop庫

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>hadoop.mapreduce</groupId>
    <artifactId>hdfstest</artifactId>
    <version>1.0</version>

    <repositories>
        <repository>
            <id>apache</id>
            <url>http://maven.apache.org</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.wltea.analyzer</groupId>
            <artifactId>IKAnalyzer</artifactId>
            <version>5.1.0</version>
        </dependency>
    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*</include>
                </includes>
            </resource>
        </resources>
        <plugins>
            <plugin>  
                <artifactId>maven-dependency-plugin</artifactId>  
                <executions>  
                    <execution>  
                        <id>copy-dependencies</id>  
                        <phase>prepare-package</phase>  
                        <goals>  
                            <goal>copy-dependencies</goal>  
                        </goals>  
                        <configuration>  
                            <!-- ${project.build.directory}為Maven內置變量,缺省為target -->   
                            <outputDirectory>${project.build.directory}/classes/lib</outputDirectory>
                            <!-- 表示是否不包含間接依賴的包  -->  
                            <excludeTransitive>false</excludeTransitive>  
                            <!-- 表示復制的jar文件去掉版本信息 -->   
                            <stripVersion>true</stripVersion>  
                        </configuration>  
                    </execution>  
                </executions>  
            </plugin> 
        </plugins>
    </build>
</project>

3. 添加java工程代碼 ChineseWordSplit

  • 引入hadoop及中文分詞包:
package examples;

import java.io.IOException;
import java.io.StringReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

 

  • 在ChineseWordSplit類中添加一個內部mapper類:TokenizerMapper, 從hadoop的Mapper類繼承,實現中文分詞的功能
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException
        {
            StringReader input = new StringReader(value.toString());
            IKSegmenter ikSeg = new IKSegmenter(input, true);
            for (Lexeme lexeme = ikSeg.next(); lexeme != null; lexeme = ikSeg.next()) {
                this.word.set(lexeme.getLexemeText());
                context.write(this.word, one);
            }
        }
    }
  • 在ChineseWordSplit類中添加一個內部Reducer類:IntSumReducer,從hadoop的Reducer類繼承
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException
        {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            this.result.set(sum);
            context.write(key, this.result);
        }
    }
  • 創建主程序入口main:在類ChineseWordSplit中添加main函數
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        
        //下述3行配置文件用於提交job到本地mapreduce運行,此時無法調試map及reduce函數
        //conf.set("mapreduce.framework.name", "yarn");
        //conf.set("yarn.resourcemanager.hostname", "localhost");
        //conf.set("mapreduce.job.jar", "D:\\temp\\hadooptest\\hdfstest\\target\\hdfstest-1.0.jar");

        String inputFile = args[0];
        Path outDir = new Path(args[1]);

        // 臨時目錄,保存第一個job的結果,用於第二個job的輸入
        Path tempDir = new Path(args[2] + System.currentTimeMillis());

        // first job
        System.out.println("start task...");
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(ChineseWordSplit.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(inputFile));
        FileOutputFormat.setOutputPath(job, tempDir);


        //second job, 第一個job的輸出作為第二個job的輸入
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        if (job.waitForCompletion(true)) {
            System.out.println("start sort...");
            Job sortJob = Job.getInstance(conf, "word sort");
            sortJob.setJarByClass(ChineseWordSplit.class);
             /*InverseMapper由hadoop庫提供,作用是實現map()之后的數據對的key和value交換*/
            sortJob.setMapperClass(InverseMapper.class);
            sortJob.setInputFormatClass(SequenceFileInputFormat.class);

            // 反轉map鍵值,計算詞頻並降序
            sortJob.setMapOutputKeyClass(IntWritable.class);
            sortJob.setMapOutputValueClass(Text.class);
            sortJob.setSortComparatorClass(IntWritableDecreasingComparator.class);
            sortJob.setNumReduceTasks(1); //設定reduce數量,輸出一個文件

            sortJob.setOutputKeyClass(IntWritable.class);
            sortJob.setOutputValueClass(Text.class);

            // 輸入及輸出
            FileInputFormat.addInputPath(sortJob, tempDir);
            FileSystem fileSystem = outDir.getFileSystem(conf);
            if (fileSystem.exists(outDir)) {
                fileSystem.delete(outDir, true);
            }
            FileOutputFormat.setOutputPath(sortJob, outDir);

            if (sortJob.waitForCompletion(true)) {
                System.out.println("finish job");
                System.exit(0);
            }
        }
    }
  • 添加降序比較類:在類ChineseWordSplit中添加降序比較類,在main函數中,串聯了2個mapreduce job,第一個job使用中文分詞工具將中文分詞並統計,結果放在中間目錄tempDir中,第二個job以前一個job為輸入,將K-V反轉,然后作降序排列,使用hadoop自帶的InverseMapper類作為Mapper類,沒有Reducer類,並需要一個排序比較類
    private static class IntWritableDecreasingComparator extends IntWritable.Comparator {
        public int compare(WritableComparable a, WritableComparable b) {
            return -super.compare(a, b);
        }

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return -super.compare(b1, s1, l1, b2, s2, l2);
        }
    }

4. 運行:

將項目打包為jar文件,保存到mapreduce目錄:D:\Application\hadoop-2.7.4\share\hadoop\mapreduce,進入到bin目錄,執行下面命令,3個參數分別表示文件輸入,輸出及中間目錄

hadoop jar /D:\Application\hadoop-2.7.4\share\hadoop\mapreduce\hdfstest-1.0.jar examples/ChineseWordSplit hdfs://localhost:9000/input/people.txt hdfs://localhost:9000/output hdfs://localhost:9000/tmp

在瀏覽器中查看運行狀態,可以看到有2個job:“word count”,“word sort”, 第二個job完成后,可以在hdfs輸出目錄看到文件

5:調試

  • 方法一:本機MapReduce調試,以本地目錄為輸入輸出

進入菜單 Run->Edit Configurations,添加Application,”WordSplit_local”,如下所示,此時可以直接在Idea中點擊運行或調試按鈕,不需要啟動hadoop mapreduce

方法二:本機MapReduce調試,以本地hdfs目錄為輸入及輸出

和上面類似,創建一個新的Application,只需修改Program arguments項, 配置為hdfs的文件目錄,但運行或調試前,需要啟動本地hadoop,在hadoop sbin命令行執行start-all.cmd 命令,這樣可以訪問並輸出到hdfs中

在Mapper類中打上斷點,調試時可以進入到map函數,如下圖所示(特別注意,要在類中IntWritable行打上斷點,我在調試時,如果不打上斷點,無法進入到map函數)

上述2個方法,是無法在瀏覽器中看到mapreduce job 狀態的,只能調試map及reduce,並在輸出目錄查看運行結果,在控制台中可以看到,job 地址是:Job - The url to track the job: http://localhost:8080/,如果想提交到本地的mapreduce運行,請使用下面第3個方法

  • 方法三:本地提交MapReduce,以hdfs目錄為輸入及輸出

如果想在mapreduce中查看job的狀態,可以添加如下代碼,在代碼中需要制定運行的jar包地址,此時,點擊運行按鈕,可以在mapreduce中看到job狀態

        Configuration conf = new Configuration();

        //下述3行配置文件用於提交job到本地mapreduce運行,此時無法調試map及reduce函數
        conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", "localhost");
        conf.set("mapreduce.job.jar", "D:\\temp\\hadooptest\\hdfstest\\target\\hdfstest-1.0.jar");

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM