一個完整的hadoop程序開發過程


目的

說明hadoop程序開發過程

前提條件

ubuntu或同類OS

java1.6.0_45

eclipse-indigo

hadoop-0.20.2

hadoop-0.20.2-eclipse-plugin.jar

各項版本一定要匹配,否則出了問題都不知道是什么原因。

配置

配置Java

詳見:Ubuntu下搭建JAVA開發環境及卸載

配置分布式Hadoop

詳見:hadoop 0.20.2偽分布式安裝詳解

偽分布式與分布式有兩點主要區別:

  1. 在namenode節點配置完成hadoop以后,需要用scp把hadoop復制到datanode節點,為了方便,最好全部機器的路徑都是一樣的,比如都在/opt/hadoop-0.20.2中。
  2. conf目錄下的masters文件要把默認的localhost改成namenode節點的主機名或IP地址,Slaves文件中,要把localhost改成datanode節點的主機名或IP

 

eclipse的hadoop插件配置

hadoop-0.20.2-eclipse-plugin.jar是一個 eclipse中的hadoop插件。

它的作用是實現了HDFS的可視化操作,如果沒有它,就要在大量地在終端輸入命令,每個命令都是以bin/hadoop dfs開頭。

如果你是新手,可能還覺得很新鮮,如果很熟悉命令的話,就會覺得很煩。新手總會變成老手,所以這個插件還是有必要的。

下面簡單說一下配置過程:

eclipse和hadoop-eclipse-plugin這套插件的版本要求非常高,一定要高度匹配才能用。另一篇博文寫了一部分對應關系:https://www.cnblogs.com/Sabre/p/10621064.html

1.下載hadoop-0.20.2-eclipse-plugin.jar,自行搜索。官網不太容易找舊版本。

2.把此jar放到eclipse插件目錄下,一般是plugins目錄

重新啟動eclipse,如果版本正確,此時在eclipse中的project exporer中應該可以看到DFS Locations項。如果沒有出現,很可能是版本的問題。

 

3.配置Hadoop所在目錄。eclipse-->window菜單-->Preferences-->Hadoop Map/Reduce,右側輸入或選擇你的Hadoop目錄

4.顯示Map/Reduce Locations窗口。eclipse-->window菜單-->Open Perspective-->Other,選擇藍色的小象圖標Map/Reduce,會在下面出黃色的小象窗口,Map/Reduce Locations

5.配置Hadoop LocationMap/Reduce Locations中右鍵,New Hadoop Location,出現配置窗口,location name隨便你寫。下面的Map/Reduce Master框中的host,如果是分布式就用IP或主機名,不要用默認的localhost。port改成9000。DFS Master框中的Use M/R Master host默認打勾保持不變,下面的Port改成9001 。user name 一般默認中不中 ,    

 至此,eclipse的hadoop插件就配置完成了。

編寫程序

以下的程序是從《hadoop實戰》中脫胎出來的,之所以說脫胎,是因為原書中的代碼缺少很多條件,不加以完善是無法運行的。這本書寫得不好,感覺是為了評職稱之類的事情,讓學生給湊的,里面很多硬傷。之所以還在硬着頭皮看下去,是因為多少還是講了一些東西,同時也挑戰一下自己,面對不那么完善的環境時,能否解決問題,而不是一味地尋找更好的教材,這是在豆瓣上寫的一篇書評:https://book.douban.com/review/10071283/

 1.打開eclipse,新建java項目。右鍵項目,properties,Java Builder Path,Libraries,Add External JARS,找到hadoop的目錄,把根目錄下的幾個jar包都添加進來。

2.新建類,Score_process.java,復制粘貼以下代碼:

package pkg1;

import java.net.URI;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public  class Score_process extends Configured implements Tool {
    
    //內部類Map
    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
        
        //map方法
        public void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
            
            System.out.println("key值:" + key);
            String line = value.toString();//將輸入的純文本文件的數據轉化為string
            
            //將輸入的數據按行分割
            StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
            
            //分別對每一行進行處理
            while (tokenizerArticle.hasMoreTokens()) {
                
                //每行按空格划分
                StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
                String nameString = tokenizerLine.nextToken();
                String scoreString = tokenizerLine.nextToken();
                Text name = new Text(nameString); 
                int scoreInt = Integer.parseInt(scoreString);
                context.write(name, new IntWritable(scoreInt));//輸出姓名和成績
            }
        };
    }
    
    //內部類Reduce
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        
        //reduce方法
        public void reduce(Text key, java.lang.Iterable<IntWritable> values, Context context) throws java.io.IOException ,InterruptedException {
            
            int sum=0;
            int count=0;
            Iterator<IntWritable> iterator = values.iterator();
            
            while (iterator.hasNext()) {
                sum += iterator.next().get();
                count++;
            }
            
            int average = (int)sum/count;
            context.write(key, new IntWritable(average));
        };
        
    }    
    
    public int run(String[] args) throws Exception {
        
        Configuration configuration = getConf();
        
        //configuration.set("mapred", "Score_Process.jar");
        
        //准備環境,刪除已經存在的output2目錄,保證輸出目錄不存在**開始************
        final String uri = "hdfs://192.168.1.8:9000/";
        FileSystem fs = FileSystem.get(URI.create(uri),configuration);
        final String path = "/user/grid/output2";
        boolean exists = fs.exists(new Path(path));
        if(exists){
            fs.delete(new Path(path),true);
        }
        //准備環境,刪除已經存在的output2目錄,保證輸出目錄不存在**結束************
        
        Job job= new Job(configuration);
        
        job.setJobName("Score_process");
        job.setJarByClass(Score_process.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        job.setMapperClass(Map.class);
        
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
//        System.out.println(new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        boolean success = job.waitForCompletion(true);    
        
        return success ? 0:1;

    }
    
    
    public static void main(String[] args) throws Exception {
        
        int ret = ToolRunner.run(new Score_process1(), args);
        
        System.exit(ret);
    }
}

 

以上的代碼中,有不少是套路,固定的模板。

Map是處理輸入參數中給定的文本文件,處理完畢后,輸出到HDFS,供reduce調用。 context.write(name, new IntWritable(scoreInt));這一句是關鍵。

Reduce調用map方法的結果,reduce后,寫到OS文件系統。context.write(key, new IntWritable(average));這一句是關鍵。

整個run方法,需要改的只有setJobName和setJarByClass類的名字,其他的不用動。

整個main方法,不用動。

程序部分基本上就是這樣。

 編譯

終端中輸入

javac -classpath /opt/hadoop-0.20.2/hadoop-0.20.2-core.jar -d ~/allTest/ScoreProcessFinal/class ~/workspace-indigo/test5/src/pkg1/Score_process.java

如果沒有報錯,就說明編譯成功。

打包

jar -cvf ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar -C ~/allTest/ScoreProcessFinal/class .

可以用以下命令查看包里的文件:
jar vtf ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar

執行

執行可以分為兩種方式,一種在eclipse中,另一種在終端。

eclipse中運行

配置運行參數。run configurations,arguments,Program arguments:

文本框中輸入:hdfs://host-thinkpad:9000/user/grid/input2 hdfs://host-thinkpad:9000/user/grid/output2

就是輸入目錄和輸出目錄,注意中間有個空格。

 

終端中運行

/opt/hadoop-0.20.2/bin/hadoop jar ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar pkg1.Score_process1 input2 output2

 

這就是hadoop開發的全過程框架。

 

 

其實在此期間發生了很多各種各樣的問題,分別記錄在各個博文中了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM