大數據學習之七——MapReduce簡單代碼實例


1.關於MapReduce

MapReduce是一種可用於數據處理的編程模型,能夠支持java、Python、C++等語言。MapReduce程序本質上是並行運行的,因此可以處理大規模數據集,這也是它的優勢。

2.使用hadoop分析數據

hadoop提供了並行處理,我們將查詢表示成MapReduce作業。

MapReduce任務過程分為兩個處理階段:map階段和reduce階段。每個階段都以鍵/值作為輸入和輸出,並選擇它們的類型。程序員還需要定義兩個函數:map函數和reduce函數。

Java  MapReduce

我們需要三個東西:一個map函數,一個reduce函數和一些用來運行作業的代碼。map函數由mapper接口實現。

Mapper接口是一個泛型類型,有四個形參,分別指定map函數的輸入鍵、輸入值、輸出鍵和輸出值的類型。這些類型均可在org.apache.hadoop.io包中找到。其中,LongWritable類型相當於java中的Long類型、Text類型相當於java中的String類型、IntWritable類型相當於java中的Integer類型。

在主函數中經常使用的類有:

FileOutputFormat類中的靜態函數setOutputPath()來指定輸出路徑,該函數指定了reduce函數輸出文件的寫入目錄。在運行任務前該目錄不應該存在。接着通過setMapperClass()和setReducerClass()指定map和reduce類型。setOutputKeyClass()和setOutputValueClass()控制map和reduce函數的輸出類型。輸入的類型通過InputFormat類來控制,在設置定義map和reduce函數的類之后,JobClient類的靜態函數runJob()會提交作業並等待完成,最后將其進展情況寫到控制台。

3.統計單詞數量代碼實例

package mapreduce01;  //MapReduce工程名字

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//單詞計數

public class mytest {    

static String INPUT_PATH="hdfs://master:9000/input/mr.txt";   //待統計的文件路徑

static String OUTPUT_PATH="hdfs://master:9000/output/mr.txt";    //統計結果存放的路徑

static class MyMapper extends Mapper <Object,Object,Text,IntWritable> {     //定義繼承mapper類

protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{    //定義map方法

String[] arr=value.toString().split(",");      //文件中的單詞是以“,”分割的,並將每一行定義為一個數組

for(int i=0;i<arr.length;i++){      //遍歷循環每一行,統計單詞出現的數量

context.write(new Text(arr[i]),new IntWritable(1));  

  }

 }  

}    

static class MyReduce extends Reducer<Text,IntWritable,Text,IntWritable>{     //定義繼承reducer類

protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{      //定義reduce方法

int count=0;    

for(IntWritable c:values){     //統計同一個單詞的數量

count+=c.get();    

}    

IntWritable outValue=new IntWritable(count);    

context.write(key,outValue);    

}   

}  

 public static void main(String[] args) throws Exception{    //main函數

 Path outputpath=new Path(OUTPUT_PATH);    //輸出路徑

Configuration conf=new Configuration();   

Job job=Job.getInstance(conf);     //定義一個job,啟動任務

FileInputFormat.setInputPaths(job, INPUT_PATH);  

 FileOutputFormat.setOutputPath(job,outputpath);     

 job.setMapperClass(MyMapper.class);  

 job.setReducerClass(MyReduce.class);     

 job.setOutputKeyClass(Text.class);  

 job.setOutputValueClass(IntWritable.class);     

 job.waitForCompletion(true);  

}

}

4.統計去重代碼實例

package mapreduce01;  //MapReduce工程名字

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//單詞去重

public class testquchong {    

static String INPUT_PATH="hdfs://master:9000/quchong";   //待統計的文件

static String OUTPUT_PATH="hdfs://master:9000/quchong/qc";    //統計結果存放的路徑

static class MyMapper extends Mapper<Object,Text,Text,Text>{   

private static Text line=new Text();      //text相當於string

protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{    

line=value;   

 context.write(line,new Text(","));     //以“,”規定格式,空格不容易控制,統計key,因為key值是唯一的

 }

 }    

static class MyReduce extends Reducer<Text,Text,Text,Text>{

protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{       

context.write(key,new Text(""));    

 }  

 public static void main(String[] args) throws Exception{   

Path outputpath=new Path(OUTPUT_PATH);   

Configuration conf=new Configuration();   

Job job=Job.getInstance(conf);     

 job.setMapperClass(MyMapper.class);  

 job.setReducerClass(MyReduce.class);   

job.setCombinerClass(MyReduce.class);     

 job.setOutputKeyClass(Text.class);  

 job.setOutputValueClass(Text.class);     

 FileInputFormat.setInputPaths(job, INPUT_PATH);   

FileOutputFormat.setOutputPath(job,outputpath);  

 job.waitForCompletion(true);

 }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM