Java 實現MapReduce函數


明白了MapReduce程序的工作原理之后,下一步就是寫代碼來實現它。我們需要三樣東西:一個map函數、一個reduce函數和一些用來運行作業的代碼。map函數由Mapper類來表示,后者聲明一個map()虛方法。范例2-3顯示了我們的map函數實現。

范例2-3 查找最高氣溫的Mapper類

Import java.Io.IOException;
import org.apahce.hadoop.io.IntWritable;
import org.apahce.hadoop.io.LongWritable;
import org.apahce.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{

    private static final int MISSING = 9999;
            
    @Override
    public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
    String line = value.toString();
    String year = line.substring(15,19);
    int airTemperature;
    
    if(line.charAt(87) =='+'){
     airTemperature = Integer.parseInt(line.substring(88,92));
    else{
     airTemperature = Integer.parseInt(line.subtring(87,92));
}
    String quality = line.substring(92,93);
    if(airTemperature != MISSING &&quality.matches("[01459]")){
    context.write(new Text(year),new IntWritetable(airTemperature));
}
    
}
    

}    



}

 

這個Mapper類是一個泛型類型,他有四個行參類型,分別指定:map函數的輸入鍵,輸入值,輸出鍵和輸出值的類型。就現在的例子來說,輸入鍵是一個長整數偏移量,輸入值是一行文本,輸出鍵是年份,輸出值是氣溫(整數)。Hadoop本身提供了一套可優化網絡序列化傳輸的基本類型,而不直接使用Java內嵌的類型。這些類型都在org.apache.hadoop.io包中。這里使用LongWritable類型(相當於Java的Long類型)、Text類型(相當於Java的String類型)和IntWritable類型(相當於Java的Integer類型)。

map()方法的輸入時一個鍵和一個值。我們首先將包含有一行輸入的Text值轉換成Java的String類型,之后使用substring()方法提取我們感興趣的列。

map()方法還提供了Context實例用於輸出內容的寫入。在這種情況下,我們將年份按Text對象進行讀/寫(因為我們把年份當作鍵),將氣溫值封裝在IntWritable類型中。只有氣溫數據不缺並且對應質量代碼顯示為正確的氣溫讀數時,這些數據才會被寫入輸出記錄中。

 

以類似方法用Reducer來定義reduce函數,如范例2-4所示。

范例2-4.查找最高氣溫的Reducer類

import  java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apche.hadoop.io.Text;
import org.apche.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
    
@Overide

public void reduce(Text key,Interable<IntWritable> values,Context context){
    int maxValue = Integer.MIN_VALUE;
    for(IntWritable value:values){
     maxValue = Max.max(maxValue,value.get());
}
context.write(key,new IntWritable(maxValue));

}

}

同樣,reduce函數也有四個形式參數類型用於指定輸入和輸出類型。reduce函數的輸入類型必須匹配map函數的輸出類型:即Text類型和IntWritable類型。在這種情況下,reduce函數的輸出類型也必須是Text和IntWritable類型,分別輸出年份及其最高氣溫。這個最高氣溫是通過循環比較每個氣溫與當前所知最高氣溫所得到的。

第三部分代碼負責運行MapReduce作業(請參見范例2-5)

范例2-5 這個應用程序在氣象數據集中找出最高氣溫

import java.oo.IOException;
import org.apache.hadoop.fs.Path;
import org.apahce.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.input.FileOutputFormat;
import org.apache.hadoop.mapreduce.input.FileOutputFormat;

public class MaxTemperature{

    public static void main(String[] args) throws Exception{
      
      if(args.length !=2){
        System.err.printlin("Usage:MaxTemperature<input path> <output path>");
System.exit(-1);

}

Job job  = new Job();
job.serJarByClass(MaxTemperature.class);
job.setName("Max temperature");

FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path([args[1]));

job.setMapperClass(MaxTemperatureMapper,class);
job.setReducerClass(MaxTemperatureReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true)?0:1);


}

}

Job對象可以指定作業執行規范。我們可以用它來控制整個作業的運行。我們在Hadoop集群上運行這個作業時,要把代碼打包成一個Jar文件(Haoop在集群上發布在合格文件)。

不必明確指定JAR文件的名稱,在Job對象的setJarByClass()方法中傳遞一個類即可,Hadoop利用這個類來查找包含它的JAR文件,進而找到相關的Jar文件。

構造Job對象之后,需要指定輸入和輸出數據的路徑。調用FileInputFormat類的靜態方法addInputPath()來定義輸入數據的路徑,這個路徑可以是單個的文件、一個目錄(此時,將目錄下所有文件當做輸入)或符合特定文件模式的一系列文件。由函數名可知,可以多次調用addInputPath()來實現多路徑的輸入。

調用FileOutputFormat類中的靜態方法setOutputPath()來指定輸出路徑(只能有一個輸出路徑)。這個方法指定的是reduce函數輸出文件的寫入目錄。在運行作業前該目錄是不應該存在的,否則Hadoop會報錯並拒絕運行作業。這種預防措施的目的是放置數據丟失(長時間運行的作業如果結果被意外覆蓋,肯定是非常惱人的)。

接着,調用setMapperClas()和setReducerClass()指定map類型和reduce類型。
setOutputKeyClass()和setOutputValueClass()控制map和reduce函數的輸出類型,正如本例所示,這兩個輸出類型一般都是相同的。如果不同,則通過setMapOutputKeyClass()和setMapOutputValueClass()來設置mao函數的輸出類型。

輸入的類型通過InputFormat類來控制,我們的例子中沒有設置,因為使用的是默認的TextInputFormat(文本輸入格式)。

在設置定義map和reduce函數的類之后,可以開始運行作業。Job中的waitForCompletion()方法提交作業並等待執行完成。該方法中的布爾參數是個詳細標識,所以作業會把進度寫到控制台。

waitForCompletion()方法返回一個布爾值,表示執行的成(true)敗(false)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM