Hadoop:Java API實現


Hadoop入門教程:Java API實現,對Java程序員來講,直接調用Hadoop的Java API來實現是最為方便的,要使用Java API至少需要實現三個重要組件:Map類、Reduce類、驅動Driver。下面將具體實現Java API的詞頻統計程序。

(1)實現Map類:WordcountMapper.java,核心代碼如下:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.Hadoop.MapReduce.Mapper;
 
public class WordcountMapper
        extends Mapper<object, intwritable= "" >{
     private f?inal static IntWritable one = new IntWritable( 1 );
     private Text word = new Text();
     public void map(Object key, Text value, Context context)
                     throws IOException, InterruptedException {
       StringTokenizer itr = new StringTokenizer(value.toString());
       while (itr.hasMoreTokens()) {
         word.set(itr.nextToken());
         context.write(word, one);
       }
     }
   }
 
</object,>

首先要實現Map需要繼承Hadoop的Mapper類,至少需要實現其中的map方法,其中Mapper中的map方法通過指定的輸入文件格式一次處理一行,value就是map函數接收到的輸入行,然后通過StringTokenizer以空格為分隔符將一行切分為若干tokens,之后,輸出形式的鍵值對並將它寫入org.apache.hadoop.mapred.OutputCollector中。為了更加清晰地認識Map階段的處理,我們假設有三個文本a、b、c,使用上述實現的處理流程如圖2-5所示。

從圖2-5中可以看到對於文件A的輸入,相應的Map處理之后還會進行sort,最終Map輸出如下:

<Hello, 1 >
<nuoline, 1 >
<nuoline, 1 >
<Welcome, 1 >

對於文件B,執行相應的sort之后最終Map輸出如下:

<hadoop, 1 >
<hadoop, 1 >
<Hello, 1 >
<Welcome, 1 >

對於文件C,執行相應的sort之后最終Map輸出如下:

<cloud, 1 >
<cloud, 1 >
<Hello, 1 >
<Welcome, 1 >
<welcome, 1 ></welcome, 1 >

 

 

(2)實現Reduce類:WordcountReducer.java,核心代碼如下:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
 
public class WordcountReducer
        extends Reducer<text,intwritable,text,intwritable> {
     private IntWritable result = new IntWritable();
     public void reduce(Text key, Iterable<intwritable> values,
                        Context context)
                          throws IOException, InterruptedException {
       int sum = 0 ;
       for (IntWritable val : values) {
         sum += val.get();
       }
       result.set(sum);
       context.write(key, result);
     }
   }
</intwritable></text,intwritable,text,intwritable>

實現WordcountReducer類需要繼承Reducer,至少需要實現其中的reduce方法,輸入參數中的key和values是由Map任務輸出的中間結果,values是一個Iterator,遍歷這個Iterator就可以得到屬於同一個key的所有value。此處key是一個單詞,values是詞頻。只需要將所有的values相加,就可以得到這個單詞總的出現次數。

對於圖2-5的Map輸出,Reduce處理的示意圖如圖2-6所示。

 

 

從圖2-6中可以看出,Reduce的輸入就是Map的輸出,然后會進行sort group,將Reduce的輸入變為>的形式,接着Hadoop框架會使用用戶指定的Reduce類處理數據,並最終輸出。當然用戶還可以指定combiner,每次Map運行之后,會按照key對輸出進行排序,然后把輸出傳遞給本地的combiner(可以指定和Reducer一樣),進行本地聚合。運行combiner能減少數據的通信量並降低Reduce的負載。

(3)實現運行驅動

運行驅動的目的就是在程序中指定用戶的Map類和Reduce類,並配置提交給Hadoop時的相關參數。例如實現一個詞頻統計的wordcount驅動類:MyWordCount.java,其核心代碼如下:

import org.apache.hadoop.conf.Conf?iguration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyWordCount {
    public static void main(String[] args) throws Exception {
      Conf?iguration conf = new Conf?iguration();
      Job job = new Job(conf, "word count" );
      job.setJarByClass(MyWordCount. class );
      job.setMapperClass(WordcountMapper. class );
      job.setCombinerClass(WordcountReducer. class );
      job.setReducerClass(WordcountReducer. class );
      job.setOutputKeyClass(Text. class );
      job.setOutputValueClass(IntWritable. class );
      FileInputFormat.addInputPath(job, new Path(args[ 0 ]));
      FileOutputFormat.setOutputPath(job, new Path(args[ 1 ]));
      System.exit(job.waitForCompletion( true ) ? 0 : 1 );
    }
}

從上述核心代碼中可以看出,需要在main函數中設置輸入/輸出路徑的參數,同時為了提交作業,需要job對象,並在job對象中指定作業名稱、Map類、Reduce類,以及鍵值的類型等參數。來源:CUUG官網


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM