Mapreduce實例——MapReduce自定義輸出格式


原理

1.輸出格式:提供給OutputCollector的鍵值對會被寫到輸出文件中,寫入的方式由輸出格式控制。OutputFormat的功能跟前面描述的InputFormat類很像,Hadoop提供的OutputFormat的實例會把文件寫在本地磁盤或HDFS上。在不做設置的情況下,計算結果會以part-000*輸出成多個文件,並且輸出的文件數量和reduce數量一樣,文件內容格式也不能隨心所欲。每一個reducer會把結果輸出寫在公共文件夾中一個單獨的文件內,這些文件的命名一般是part-nnnnnnnnnn是關聯到某個reduce任務的partitionid,輸出文件夾通過FileOutputFormat.setOutputPath() 來設置。你可以通過具體MapReduce作業的JobConf對象的setOutputFormat()方法來設置具體用到的輸出格式。下表給出了已提供的輸出格式:

Hadoop提供了一些OutputFormat實例用於寫入文件,基本的(默認的)實例是TextOutputFormat,它會以一行一個鍵值對的方式把數據寫入一個文本文件里。這樣后面的MapReduce任務就可以通過KeyValueInputFormat類簡單的重新讀取所需的輸入數據了,而且也適合於人的閱讀。還有一個更適合於在MapReduce作業間使用的中間格式,那就是SequenceFileOutputFormat,它可以快速的序列化任意的數據類型到文件中,而對應SequenceFileInputFormat則會把文件反序列化為相同的類型並提交為下一個Mapper的輸入數據,方式和前一個Reducer的生成方式一樣。NullOutputFormat不會生成輸出文件並丟棄任何通過OutputCollector傳遞給它的鍵值對,如果你在要reduce()方法中顯式的寫你自己的輸出文件並且不想Hadoop框架輸出額外的空輸出文件,那這個類是很有用的。

RecordWriter:這個跟InputFormat中通過RecordReader讀取單個記錄的實現很相似,OutputFormat類是RecordWriter對象的工廠方法,用來把單個的記錄寫到文件中,就像是OuputFormat直接寫入的一樣。

2.IntputFormat相似,當面對一些特殊情況時,如想要Reduce支持多個輸出,這時Hadoop本身提供的TextOutputFormatSequenceFileOutputFormatNullOutputFormat等肯定是無法滿足我們的需求,這時我們需要自定義輸出數據格式。類似輸入數據格式,自定義輸出數據格式同樣可以參考下面的步驟:

1自定義一個繼承OutputFormat的類,不過一般繼承FileOutputFormat即可;

2)實現其getRecordWriter方法,返回一個RecordWriter類型;

3)自定義一個繼承RecordWriter的類,定義其write方法,針對每個<key,value>寫入文件數據;

環境

Linux Ubuntu 14.04

jdk-7u75-linux-x64

hadoop-2.6.0-cdh5.4.5

hadoop-2.6.0-eclipse-cdh5.4.5.jar

eclipse-java-juno-SR2-linux-gtk-x86_64

 

 

內容

當面對一些特殊的<key,value>鍵值對時,要求開發人員繼承FileOutputFormat,用於實現一種新的輸出格式。同時還需繼承RecordWriter,用於實現新輸出格式keyvalue的寫入方法。現在我們有某電商數據表cat_group1,包含(分組id,分組名稱,分組碼,奢侈品標記)四個字段cat_group1的數據內容如下:

cat_group1group_idgroup_namegroup_codeflag

  1. 分組id 分組名稱 分組碼 奢侈品標記  
  2. 512     奢侈品    c        1  
  3. 675     箱包      1       1  
  4. 676     化妝品    2        1  
  5. 677     家電        3     1  
  6. 501     有機食品      1     0  
  7. 502     蔬菜水果      2     0  
  8. 503     肉禽蛋奶      3     0  
  9. 504     深海水產      4     0  
  10. 505     地方特產      5     0  
  11. 506     進口食品      6     0  

要求把相同奢侈品標記(flag)的數據放入到一個文件里,並且以該字段來命名文件的名稱,輸出時keyvalue ""分割,形如"keyvalue"

結果輸出0.txt1.txt兩文件。

0.txt

  1. 奢侈品標記:分組ID 分組名稱 分組碼  
  2.         0:506     進口食品  6  
  3.         0:505     地方特產  5  
  4.         0:504     深海水產  4  
  5.         0:503     肉禽蛋奶  3  
  6.         0:502     蔬菜水果  2  
  7.         0:501     有機食品  1  

1.txt

  1. 奢侈品標記:分組ID 分組名稱 分組碼  
  2.         1:677      家電     3  
  3.         1:676     化妝品    2  
  4.         1:675      箱包     1  
  5.         1:512     奢侈品    c  

實驗步驟

1.切換到/apps/hadoop/sbin目錄下,開啟Hadoop

  1. cd /apps/hadoop/sbin  
  2. ./start-all.sh  

2.Linux本地新建/data/mapreduce12目錄。

  1. mkdir -p /data/mapreduce12  

3.Linux中切換到/data/mapreduce12目錄下,用wget命令從http://192.168.1.100:60000/allfiles/mapreduce12/cat_group1網址上下載文本文件cat_group1

  1. cd /data/mapreduce12  
  2. wget http://192.168.1.100:60000/allfiles/mapreduce12/cat_group1  

然后在當前目錄下用wget命令從http://192.168.1.100:60000/allfiles/mapreduce12/hadoop2lib.tar.gz網址上下載項目用到的依賴包。

  1. wget http://192.168.1.100:60000/allfiles/mapreduce12/hadoop2lib.tar.gz  

hadoop2lib.tar.gz解壓到當前目錄下。

  1. tar zxvf hadoop2lib.tar.gz  

4.首先在HDFS上新建/mymapreduce12/in目錄,然后將Linux本地/data/mapreduce12目錄下的cat_group1文件導入到HDFS/mymapreduce12/in目錄中。

  1. hadoop fs -mkdir -p /mymapreduce12/in  
  2. hadoop fs -put /data/mapreduce12/cat_group1 /mymapreduce12/in  

5.新建Java Project項目,項目名為mapreduce12

mapreduce12項目下新建包,包名為mapreduce

mapredcue包下新建名為MyMultipleOutputFormat的類。

mapredcue包下新建名為FileOutputMR的類。

6.添加項目所需依賴的jar包,右鍵單擊項目名,新建一個文件夾hadoop2lib,用於存放項目所需的jar包。

/data/mapreduce12目錄下,hadoop2lib目錄中的jar包,拷貝到eclipsemapreduce12項目的hadoop2lib目錄下。

選中所有項目hadoop2lib目錄下所有jar包,並添加到Build Path中。

7.編寫程序代碼,並描述設計思路

自定義FileRecordWriter類命名為MyMultipleOutputFormat,它繼承了FileRecordWriter類,並且它里面主要包含三部分:類中的getRecordWritergetTaskOutputPathgenerateFileNameForKayValue方法和兩個內部類LineRecordWriterMutiRecordWriter

類中的方法代碼:

  1. private MultiRecordWriter writer=null;  
  2.     public RecordWriter<K,V> getRecordWriter(TaskAttemptContext job) throws IOException{  
  3.         if(writer==null){  
  4.             writer=new MultiRecordWriter(job,getTaskOutputPath(job));  
  5.         }  
  6.         return writer;  
  7.     }  
  8.     private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException{  
  9.         Path workPath=null;  
  10.         OutputCommitter committer=super.getOutputCommitter(conf);  
  11.         if(committer instanceof FileOutputCommitter){  
  12.             workPath=((FileOutputCommitter) committer).getWorkPath();  
  13.         }else{  
  14.             Path outputPath=super.getOutputPath(conf);  
  15.             if(outputPath==null){  
  16.                 throw new IOException("Undefined job output-path");  
  17.             }  
  18.             workPath=outputPath;  
  19.         }  
  20.         return workPath;  
  21.     }  
  22.     protected abstract String generateFileNameForKayValue(K key,V value,Configuration conf);  

getRecordWriter()方法判斷該類實例是否存在,若不存在則創建一個實例。getTaskOutputPath()方法獲取工作任務的輸出路徑。generateFileNameForKayValue()方法是抽象的,通過keyvalue conf三個參數確定key/value輸出的文件名,並將其返回。

LineRecordWriter類代碼:

  1. protected static class LineRecordWriter<K,V> extends RecordWriter<K, V> {  
  2.         private static final String utf8 = "UTF-8";  
  3.         private static final byte[] newline;  
  4.         private PrintWriter tt;  
  5.         static {  
  6.           try {  
  7.             newline = "\n".getBytes(utf8);  
  8.           } catch (UnsupportedEncodingException uee) {  
  9.             throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
  10.           }  
  11.         }  
  12.     
  13.         protected DataOutputStream out;  
  14.         private final byte[] keyValueSeparator;  
  15.     
  16.         public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {  
  17.           this.out = out;  
  18.           try {  
  19.             this.keyValueSeparator = keyValueSeparator.getBytes(utf8);  
  20.           } catch (UnsupportedEncodingException uee) {  
  21.             throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
  22.           }  
  23.         }  
  24.     
  25.         public LineRecordWriter(DataOutputStream out) {  
  26.           this(out, ":");  
  27.         }  
  28.         private void writeObject(Object o) throws IOException {  
  29.           if (o instanceof Text) {  
  30.             Text to = (Text) o;  
  31.             out.write(to.getBytes(), 0, to.getLength());  
  32.           } else {  
  33.             out.write(o.toString().getBytes(utf8));  
  34.           }  
  35.         }  
  36.     
  37.         public synchronized void write(K key, V value)  
  38.           throws IOException {  
  39.           boolean nullKey = key == null || key instanceof NullWritable;  
  40.           boolean nullValue = value == null || value instanceof NullWritable;  
  41.           if (nullKey && nullValue) {//  
  42.             return;  
  43.           }  
  44.           if (!nullKey) {  
  45.             writeObject(key);  
  46.           }  
  47.           if (!(nullKey || nullValue)) {  
  48.             out.write(keyValueSeparator);  
  49.           }  
  50.           if (!nullValue) {  
  51.             writeObject(value);  
  52.           }  
  53.           out.write(newline);  
  54.     
  55.         }  
  56.         public synchronized  
  57.         void close(TaskAttemptContext context) throws IOException {  
  58.           out.close();  
  59.         }  
  60.       }  

LineRecordWriter類主要是為<key,value>輸出時定義它的輸出格式。通過加線程同步關鍵字 synchronizedwrite()方法上鎖。write()方法首先從輸出流中寫入key-value,然后判斷鍵值對是否為空,如果k-v為空,則操作失敗返回空,如果key不為空,則寫入key,如果keyvalue 都不為空則,在中間寫入k-v分隔符,如果value不為空,則寫入value,最后寫入換行符。

MutiRecordWriter類代碼:

  1. public class MultiRecordWriter extends RecordWriter<K,V>{  
  2.         private HashMap<String,RecordWriter<K,V> >recordWriters=null;  
  3.         private TaskAttemptContext job=null;  
  4.         private Path workPath=null;  
  5.         public MultiRecordWriter(TaskAttemptContext job,Path workPath){  
  6.             super();  
  7.             this.job=job;  
  8.             this.workPath=workPath;  
  9.             recordWriters=new HashMap<String,RecordWriter<K,V>>();  
  10.     
  11.         }  
  12.         public void close(TaskAttemptContext context) throws IOException, InterruptedException{  
  13.             Iterator<RecordWriter<K,V>> values=this.recordWriters.values().iterator();  
  14.             while(values.hasNext()){  
  15.                 values.next().close(context);  
  16.             }  
  17.             this.recordWriters.clear();  
  18.         }  
  19.         public void write(K key,V value) throws IOException, InterruptedException{  
  20.             String baseName=generateFileNameForKayValue(key ,value,job.getConfiguration());  
  21.             RecordWriter<K,V> rw=this.recordWriters.get(baseName);  
  22.             if(rw==null){  
  23.                 rw=getBaseRecordWriter(job,baseName);  
  24.                 this.recordWriters.put(baseName,rw);  
  25.             }  
  26.             rw.write(key, value);  
  27.         }  
  28.     
  29.     
  30.         private RecordWriter<K,V> getBaseRecordWriter(TaskAttemptContext job,String baseName)throws IOException,InterruptedException{  
  31.             Configuration conf=job.getConfiguration();  
  32.             boolean isCompressed=getCompressOutput(job);  
  33.             String keyValueSeparator= ":";  
  34.             RecordWriter<K,V> recordWriter=null;  
  35.             if(isCompressed){  
  36.                 Class<? extends CompressionCodec> codecClass=getOutputCompressorClass(job,(Class<? extends CompressionCodec>) GzipCodec.class);  
  37.                 CompressionCodec codec=ReflectionUtils.newInstance(codecClass,conf);  
  38.                 Path file=new Path(workPath,baseName+codec.getDefaultExtension());  
  39.                 FSDataOutputStream fileOut=file.getFileSystem(conf).create(file,false);  
  40.                 recordWriter=new LineRecordWriter<K,V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);  
  41.             }else{  
  42.                 Path file=new Path(workPath,baseName);  
  43.                 FSDataOutputStream fileOut=file.getFileSystem(conf).create(file,false);  
  44.                 recordWriter =new LineRecordWriter<K,V>(fileOut,keyValueSeparator);  
  45.             }  
  46.             return recordWriter;  
  47.         }  
  48.     }  

write()方法得到輸出的文件名0.txt1.txt並將兩文件寫到hdfs上,close()方法關閉輸出文件的數據流。getBaseRecordWriter()方法首先用getCompressOutput(job) 從配置判斷輸出是否壓縮,根據是否壓縮獲取相應的LineRecordWriter

MyMultipleOutputFormat完整代碼:

  1. package mapreduce;  
  2. import java.io.DataOutputStream;  
  3. import java.io.IOException;  
  4. import java.io.PrintWriter;  
  5. import java.io.UnsupportedEncodingException;  
  6. import java.util.HashMap;  
  7. import java.util.Iterator;  
  8. import org.apache.hadoop.conf.Configuration;  
  9. import org.apache.hadoop.fs.FSDataOutputStream;  
  10. import org.apache.hadoop.fs.Path;  
  11. import org.apache.hadoop.io.NullWritable;  
  12. import org.apache.hadoop.io.Text;  
  13. import org.apache.hadoop.io.Writable;  
  14. import org.apache.hadoop.io.WritableComparable;  
  15. import org.apache.hadoop.io.compress.CompressionCodec;  
  16. import org.apache.hadoop.io.compress.GzipCodec;  
  17. import org.apache.hadoop.mapreduce.OutputCommitter;  
  18. import org.apache.hadoop.mapreduce.RecordWriter;  
  19. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  20. import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;  
  21. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  22. import org.apache.hadoop.util.ReflectionUtils;  
  23. public abstract class MyMultipleOutputFormat <K extends WritableComparable<?>,V extends Writable> extends FileOutputFormat<K,V>{  
  24.     private MultiRecordWriter writer=null;  
  25.     public RecordWriter<K,V> getRecordWriter(TaskAttemptContext job) throws IOException{  
  26.         if(writer==null){  
  27.             writer=new MultiRecordWriter(job,getTaskOutputPath(job));  
  28.         }  
  29.         return writer;  
  30.     }  
  31.     private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException{  
  32.         Path workPath=null;  
  33.         OutputCommitter committer=super.getOutputCommitter(conf);  
  34.         if(committer instanceof FileOutputCommitter){  
  35.             workPath=((FileOutputCommitter) committer).getWorkPath();  
  36.         }else{  
  37.             Path outputPath=super.getOutputPath(conf);  
  38.             if(outputPath==null){  
  39.                 throw new IOException("Undefined job output-path");  
  40.             }  
  41.             workPath=outputPath;  
  42.         }  
  43.         return workPath;  
  44.     }  
  45.     protected abstract String generateFileNameForKayValue(K key,V value,Configuration conf);  
  46.     protected static class LineRecordWriter<K,V> extends RecordWriter<K, V> {  
  47.         private static final String utf8 = "UTF-8";  
  48.         private static final byte[] newline;  
  49.         private PrintWriter tt;  
  50.         static {  
  51.           try {  
  52.             newline = "\n".getBytes(utf8);  
  53.           } catch (UnsupportedEncodingException uee) {  
  54.             throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
  55.           }  
  56.         }  
  57.     
  58.         protected DataOutputStream out;  
  59.         private final byte[] keyValueSeparator;  
  60.     
  61.         public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {  
  62.           this.out = out;  
  63.           try {  
  64.             this.keyValueSeparator = keyValueSeparator.getBytes(utf8);  
  65.           } catch (UnsupportedEncodingException uee) {  
  66.             throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
  67.           }  
  68.         }  
  69.     
  70.         public LineRecordWriter(DataOutputStream out) {  
  71.           this(out, ":");  
  72.         }  
  73.         private void writeObject(Object o) throws IOException {  
  74.           if (o instanceof Text) {  
  75.             Text to = (Text) o;  
  76.             out.write(to.getBytes(), 0, to.getLength());  
  77.           } else {  
  78.             out.write(o.toString().getBytes(utf8));  
  79.           }  
  80.         }  
  81.     
  82.         public synchronized void write(K key, V value)  
  83.           throws IOException {  
  84.           boolean nullKey = key == null || key instanceof NullWritable;  
  85.           boolean nullValue = value == null || value instanceof NullWritable;  
  86.           if (nullKey && nullValue) {//  
  87.             return;  
  88.           }  
  89.           if (!nullKey) {  
  90.             writeObject(key);  
  91.           }  
  92.           if (!(nullKey || nullValue)) {  
  93.             out.write(keyValueSeparator);  
  94.           }  
  95.           if (!nullValue) {  
  96.             writeObject(value);  
  97.           }  
  98.           out.write(newline);  
  99.     
  100.         }  
  101.     
  102.         public synchronized  
  103.         void close(TaskAttemptContext context) throws IOException {  
  104.           out.close();  
  105.         }  
  106.       }  
  107.     public class MultiRecordWriter extends RecordWriter<K,V>{  
  108.         private HashMap<String,RecordWriter<K,V> >recordWriters=null;  
  109.         private TaskAttemptContext job=null;  
  110.         private Path workPath=null;  
  111.         public MultiRecordWriter(TaskAttemptContext job,Path workPath){  
  112.             super();  
  113.             this.job=job;  
  114.             this.workPath=workPath;  
  115.             recordWriters=new HashMap<String,RecordWriter<K,V>>();  
  116.     
  117.         }  
  118.         public void close(TaskAttemptContext context) throws IOException, InterruptedException{  
  119.             Iterator<RecordWriter<K,V>> values=this.recordWriters.values().iterator();  
  120.             while(values.hasNext()){  
  121.                 values.next().close(context);  
  122.             }  
  123.             this.recordWriters.clear();  
  124.         }  
  125.         public void write(K key,V value) throws IOException, InterruptedException{  
  126.             String baseName=generateFileNameForKayValue(key ,value,job.getConfiguration());  
  127.             RecordWriter<K,V> rw=this.recordWriters.get(baseName);  
  128.             if(rw==null){  
  129.                 rw=getBaseRecordWriter(job,baseName);  
  130.                 this.recordWriters.put(baseName,rw);  
  131.             }  
  132.             rw.write(key, value);  
  133.         }  
  134.     
  135.     
  136.         private RecordWriter<K,V> getBaseRecordWriter(TaskAttemptContext job,String baseName)throws IOException,InterruptedException{  
  137.             Configuration conf=job.getConfiguration();  
  138.             boolean isCompressed=getCompressOutput(job);  
  139.             String keyValueSeparator= ":";  
  140.             RecordWriter<K,V> recordWriter=null;  
  141.             if(isCompressed){  
  142.                 Class<?extends CompressionCodec> codecClass=getOutputCompressorClass(job,(Class<?extends CompressionCodec>) GzipCodec.class);  
  143.                 CompressionCodec codec=ReflectionUtils.newInstance(codecClass,conf);  
  144.                 Path file=new Path(workPath,baseName+codec.getDefaultExtension());  
  145.                 FSDataOutputStream fileOut=file.getFileSystem(conf).create(file,false);  
  146.                 recordWriter=new LineRecordWriter<K,V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);  
  147.             }else{  
  148.                 Path file=new Path(workPath,baseName);  
  149.                 FSDataOutputStream fileOut=file.getFileSystem(conf).create(file,false);  
  150.                 recordWriter =new LineRecordWriter<K,V>(fileOut,keyValueSeparator);  
  151.             }  
  152.             return recordWriter;  
  153.         }  
  154.     }  
  155. }  

測試程序代碼也分為三部分Mapper部分reducer部分還有在里面添加一個靜態類AlphabetOutputFormat。另外要注意在主函數里面把job的輸出格式類設置為AlphabetOutputFormat類。

Mapper代碼:

  1. public static class TokenizerMapper extends Mapper<Object,Text,Text,Text>{  
  2.         private Text val=new Text();  
  3.         public void map(Object key,Text value,Context context)throws IOException,InterruptedException{  
  4.             String str[]=value.toString().split("\t");  
  5.             val.set(str[0]+" "+str[1]+" "+str[2]);  
  6.                 context.write(new Text(str[3]), val);  
  7.         }  
  8.     }  

split("\t")把數據截取出來,把代表flag的字段作為key,剩下的字段作為value,用contextwrite()方法將<key,value>直接輸出。

reducer代碼:

  1. public static class IntSumReducer extends Reducer<Text,Text,Text,Text>{  
  2.         public void reduce(Text key,Iterable<Text> values,Context context)  
  3.     throws IOException,InterruptedException{  
  4.     for(Text val:values){  
  5.     context.write(key,val);  
  6.     }  
  7.     }  
  8.     }  

map輸出的<key,value>鍵值對先經過shuffle,把key相同的value值放到一個迭代器中形成values,在將<key,values>傳遞給reduce函數,reduce函數將輸入的key直接復制給輸出的key,將輸入的values通過增強版for循環遍歷,並把里面的每個元素賦值給輸出的value,再用contextwrite()方法進行逐一輸出<key,value>,輸出的次數為循環的次數。

AlphabetOutputFormat代碼:

  1. public static class AlphabetOutputFormat extends MyMultipleOutputFormat<Text,Text>{  
  2.         protected String generateFileNameForKayValue(Text key,Text value,Configuration conf){  
  3.             return key+".txt";  
  4.         }  
  5.     }  

該類繼承MyMultipleOutputFormat類並重寫generateFileNameForKayValue()抽象方法,令其返回值為key+".txt"

測試類完整代碼:

  1. package mapreduce;  
  2. import java.io.IOException;  
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.Job;  
  7. import org.apache.hadoop.mapreduce.Mapper;  
  8. import org.apache.hadoop.mapreduce.Reducer;  
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  11. public class FileOutputMR {  
  12.     public static class TokenizerMapper extends Mapper<Object,Text,Text,Text>{  
  13.         private Text val=new Text();  
  14.         public void map(Object key,Text value,Context context)throws IOException,InterruptedException{  
  15.             String str[]=value.toString().split("\t");  
  16.             val.set(str[0]+" "+str[1]+" "+str[2]);  
  17.                 context.write(new Text(str[3]), val);  
  18.         }  
  19.     }  
  20.     public static class IntSumReducer extends Reducer<Text,Text,Text,Text>{  
  21.         public void reduce(Text key,Iterable<Text> values,Context context)  
  22.     throws IOException,InterruptedException{  
  23.     for(Text val:values){  
  24.     context.write(key,val);  
  25.     }  
  26.     }  
  27.     }  
  28.     public static class AlphabetOutputFormat extends MyMultipleOutputFormat<Text,Text>{  
  29.     protected String generateFileNameForKayValue(Text key,Text value,Configuration conf){  
  30.     return key+".txt";  
  31.     }  
  32.     }  
  33.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{  
  34.     Configuration conf=new Configuration();  
  35.     Job job=new Job(conf,"FileOutputMR");  
  36.     job.setJarByClass(FileOutputMR.class);  
  37.     job.setMapperClass(TokenizerMapper.class);  
  38.     job.setCombinerClass(IntSumReducer.class);  
  39.     job.setReducerClass(IntSumReducer.class);  
  40.     job.setOutputKeyClass(Text.class);  
  41.     job.setOutputValueClass(Text.class);  
  42.     job.setOutputFormatClass(AlphabetOutputFormat.class);  
  43.     FileInputFormat.addInputPath(job,new Path("hdfs://localhost:9000/mymapreduce12/in/cat_group1"));  
  44.     FileOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9000/mymapreduce12/out"));  
  45.     System.exit(job.waitForCompletion(true)?0:1);  
  46.     }  
  47.     }  

8.FileOutputMR類文件中,右鍵並點擊=>Run As=>Run on Hadoop選項,將MapReduce任務提交到Hadoop中。

9.待執行完畢后,進入命令模式,在HDFS上從mymapreduce12/out中查看實驗結果。

  1. hadoop fs -ls /mymapreduce12/out  
  2. hadoop fs -cat /mymapreduce12/out/0.txt  
  3. hadoop fs -cat /mymapreduce12/out/1.txt  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM