原理
1.輸出格式:提供給OutputCollector的鍵值對會被寫到輸出文件中,寫入的方式由輸出格式控制。OutputFormat的功能跟前面描述的InputFormat類很像,Hadoop提供的OutputFormat的實例會把文件寫在本地磁盤或HDFS上。在不做設置的情況下,計算結果會以part-000*輸出成多個文件,並且輸出的文件數量和reduce數量一樣,文件內容格式也不能隨心所欲。每一個reducer會把結果輸出寫在公共文件夾中一個單獨的文件內,這些文件的命名一般是part-nnnnn,nnnnn是關聯到某個reduce任務的partition的id,輸出文件夾通過FileOutputFormat.setOutputPath() 來設置。你可以通過具體MapReduce作業的JobConf對象的setOutputFormat()方法來設置具體用到的輸出格式。下表給出了已提供的輸出格式:
Hadoop提供了一些OutputFormat實例用於寫入文件,基本的(默認的)實例是TextOutputFormat,它會以一行一個鍵值對的方式把數據寫入一個文本文件里。這樣后面的MapReduce任務就可以通過KeyValueInputFormat類簡單的重新讀取所需的輸入數據了,而且也適合於人的閱讀。還有一個更適合於在MapReduce作業間使用的中間格式,那就是SequenceFileOutputFormat,它可以快速的序列化任意的數據類型到文件中,而對應SequenceFileInputFormat則會把文件反序列化為相同的類型並提交為下一個Mapper的輸入數據,方式和前一個Reducer的生成方式一樣。NullOutputFormat不會生成輸出文件並丟棄任何通過OutputCollector傳遞給它的鍵值對,如果你在要reduce()方法中顯式的寫你自己的輸出文件並且不想Hadoop框架輸出額外的空輸出文件,那這個類是很有用的。
RecordWriter:這個跟InputFormat中通過RecordReader讀取單個記錄的實現很相似,OutputFormat類是RecordWriter對象的工廠方法,用來把單個的記錄寫到文件中,就像是OuputFormat直接寫入的一樣。
2.與IntputFormat相似,當面對一些特殊情況時,如想要Reduce支持多個輸出,這時Hadoop本身提供的TextOutputFormat、SequenceFileOutputFormat、NullOutputFormat等肯定是無法滿足我們的需求,這時我們需要自定義輸出數據格式。類似輸入數據格式,自定義輸出數據格式同樣可以參考下面的步驟:
(1)自定義一個繼承OutputFormat的類,不過一般繼承FileOutputFormat即可;
(2)實現其getRecordWriter方法,返回一個RecordWriter類型;
(3)自定義一個繼承RecordWriter的類,定義其write方法,針對每個<key,value>寫入文件數據;
環境
Linux Ubuntu 14.04
jdk-7u75-linux-x64
hadoop-2.6.0-cdh5.4.5
hadoop-2.6.0-eclipse-cdh5.4.5.jar
eclipse-java-juno-SR2-linux-gtk-x86_64
內容
當面對一些特殊的<key,value>鍵值對時,要求開發人員繼承FileOutputFormat,用於實現一種新的輸出格式。同時還需繼承RecordWriter,用於實現新輸出格式key和value的寫入方法。現在我們有某電商數據表cat_group1,包含(分組id,分組名稱,分組碼,奢侈品標記)四個字段cat_group1的數據內容如下:
cat_group1(group_id,group_name,group_code,flag)
-
分組id 分組名稱 分組碼 奢侈品標記
-
512 奢侈品 c 1
-
675 箱包 1 1
-
676 化妝品 2 1
-
677 家電 3 1
-
501 有機食品 1 0
-
502 蔬菜水果 2 0
-
503 肉禽蛋奶 3 0
-
504 深海水產 4 0
-
505 地方特產 5 0
-
506 進口食品 6 0
要求把相同奢侈品標記(flag)的數據放入到一個文件里,並且以該字段來命名文件的名稱,輸出時key與value 以":"分割,形如"key:value"
結果輸出0.txt和1.txt兩文件。
0.txt
-
奢侈品標記:分組ID 分組名稱 分組碼
-
0:506 進口食品 6
-
0:505 地方特產 5
-
0:504 深海水產 4
-
0:503 肉禽蛋奶 3
-
0:502 蔬菜水果 2
-
0:501 有機食品 1
1.txt
-
奢侈品標記:分組ID 分組名稱 分組碼
-
1:677 家電 3
-
1:676 化妝品 2
-
1:675 箱包 1
-
1:512 奢侈品 c
實驗步驟
1.切換到/apps/hadoop/sbin目錄下,開啟Hadoop。
-
cd /apps/hadoop/sbin
-
./start-all.sh
2.在Linux本地新建/data/mapreduce12目錄。
-
mkdir -p /data/mapreduce12
3.在Linux中切換到/data/mapreduce12目錄下,用wget命令從http://192.168.1.100:60000/allfiles/mapreduce12/cat_group1網址上下載文本文件cat_group1。
-
cd /data/mapreduce12
-
wget http://192.168.1.100:60000/allfiles/mapreduce12/cat_group1
然后在當前目錄下用wget命令從http://192.168.1.100:60000/allfiles/mapreduce12/hadoop2lib.tar.gz網址上下載項目用到的依賴包。
-
wget http://192.168.1.100:60000/allfiles/mapreduce12/hadoop2lib.tar.gz
將hadoop2lib.tar.gz解壓到當前目錄下。
-
tar zxvf hadoop2lib.tar.gz
4.首先在HDFS上新建/mymapreduce12/in目錄,然后將Linux本地/data/mapreduce12目錄下的cat_group1文件導入到HDFS的/mymapreduce12/in目錄中。
-
hadoop fs -mkdir -p /mymapreduce12/in
-
hadoop fs -put /data/mapreduce12/cat_group1 /mymapreduce12/in
5.新建Java Project項目,項目名為mapreduce12。
在mapreduce12項目下新建包,包名為mapreduce。
在mapredcue包下新建名為MyMultipleOutputFormat的類。
在mapredcue包下新建名為FileOutputMR的類。
6.添加項目所需依賴的jar包,右鍵單擊項目名,新建一個文件夾hadoop2lib,用於存放項目所需的jar包。
將/data/mapreduce12目錄下,hadoop2lib目錄中的jar包,拷貝到eclipse中mapreduce12項目的hadoop2lib目錄下。
選中所有項目hadoop2lib目錄下所有jar包,並添加到Build Path中。
7.編寫程序代碼,並描述設計思路
自定義FileRecordWriter類命名為MyMultipleOutputFormat,它繼承了FileRecordWriter類,並且它里面主要包含三部分:類中的getRecordWriter、getTaskOutputPath、generateFileNameForKayValue方法和兩個內部類LineRecordWriter、MutiRecordWriter
類中的方法代碼:
-
private MultiRecordWriter writer=null;
-
public RecordWriter<K,V> getRecordWriter(TaskAttemptContext job) throws IOException{
-
if(writer==null){
-
writer=new MultiRecordWriter(job,getTaskOutputPath(job));
-
}
-
return writer;
-
}
-
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException{
-
Path workPath=null;
-
OutputCommitter committer=super.getOutputCommitter(conf);
-
if(committer instanceof FileOutputCommitter){
-
workPath=((FileOutputCommitter) committer).getWorkPath();
-
}else{
-
Path outputPath=super.getOutputPath(conf);
-
if(outputPath==null){
-
throw new IOException("Undefined job output-path");
-
}
-
workPath=outputPath;
-
}
-
return workPath;
-
}
-
protected abstract String generateFileNameForKayValue(K key,V value,Configuration conf);
getRecordWriter()方法判斷該類實例是否存在,若不存在則創建一個實例。getTaskOutputPath()方法獲取工作任務的輸出路徑。generateFileNameForKayValue()方法是抽象的,通過key、value 和conf三個參數確定key/value輸出的文件名,並將其返回。
LineRecordWriter類代碼:
-
protected static class LineRecordWriter<K,V> extends RecordWriter<K, V> {
-
private static final String utf8 = "UTF-8";
-
private static final byte[] newline;
-
private PrintWriter tt;
-
static {
-
try {
-
newline = "\n".getBytes(utf8);
-
} catch (UnsupportedEncodingException uee) {
-
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
-
}
-
}
-
-
protected DataOutputStream out;
-
private final byte[] keyValueSeparator;
-
-
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
-
this.out = out;
-
try {
-
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
-
} catch (UnsupportedEncodingException uee) {
-
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
-
}
-
}
-
-
public LineRecordWriter(DataOutputStream out) {
-
this(out, ":");
-
}
-
private void writeObject(Object o) throws IOException {
-
if (o instanceof Text) {
-
Text to = (Text) o;
-
out.write(to.getBytes(), 0, to.getLength());
-
} else {
-
out.write(o.toString().getBytes(utf8));
-
}
-
}
-
-
public synchronized void write(K key, V value)
-
throws IOException {
-
boolean nullKey = key == null || key instanceof NullWritable;
-
boolean nullValue = value == null || value instanceof NullWritable;
-
if (nullKey && nullValue) {//
-
return;
-
}
-
if (!nullKey) {
-
writeObject(key);
-
}
-
if (!(nullKey || nullValue)) {
-
out.write(keyValueSeparator);
-
}
-
if (!nullValue) {
-
writeObject(value);
-
}
-
out.write(newline);
-
-
}
-
public synchronized
-
void close(TaskAttemptContext context) throws IOException {
-
out.close();
-
}
-
}
LineRecordWriter類主要是為<key,value>輸出時定義它的輸出格式。通過加線程同步關鍵字 synchronized對write()方法上鎖。write()方法首先從輸出流中寫入key-value,然后判斷鍵值對是否為空,如果k-v為空,則操作失敗返回空,如果key不為空,則寫入key,如果key,value 都不為空則,在中間寫入k-v分隔符,如果value不為空,則寫入value,最后寫入換行符。
MutiRecordWriter類代碼:
-
public class MultiRecordWriter extends RecordWriter<K,V>{
-
private HashMap<String,RecordWriter<K,V> >recordWriters=null;
-
private TaskAttemptContext job=null;
-
private Path workPath=null;
-
public MultiRecordWriter(TaskAttemptContext job,Path workPath){
-
super();
-
this.job=job;
-
this.workPath=workPath;
-
recordWriters=new HashMap<String,RecordWriter<K,V>>();
-
-
}
-
public void close(TaskAttemptContext context) throws IOException, InterruptedException{
-
Iterator<RecordWriter<K,V>> values=this.recordWriters.values().iterator();
-
while(values.hasNext()){
-
values.next().close(context);
-
}
-
this.recordWriters.clear();
-
}
-
public void write(K key,V value) throws IOException, InterruptedException{
-
String baseName=generateFileNameForKayValue(key ,value,job.getConfiguration());
-
RecordWriter<K,V> rw=this.recordWriters.get(baseName);
-
if(rw==null){
-
rw=getBaseRecordWriter(job,baseName);
-
this.recordWriters.put(baseName,rw);
-
}
-
rw.write(key, value);
-
}
-
-
-
private RecordWriter<K,V> getBaseRecordWriter(TaskAttemptContext job,String baseName)throws IOException,InterruptedException{
-
Configuration conf=job.getConfiguration();
-
boolean isCompressed=getCompressOutput(job);
-
String keyValueSeparator= ":";
-
RecordWriter<K,V> recordWriter=null;
-
if(isCompressed){
-
Class<? extends CompressionCodec> codecClass=getOutputCompressorClass(job,(Class<? extends CompressionCodec>) GzipCodec.class);
-
CompressionCodec codec=ReflectionUtils.newInstance(codecClass,conf);
-
Path file=new Path(workPath,baseName+codec.getDefaultExtension());
-
FSDataOutputStream fileOut=file.getFileSystem(conf).create(file,false);
-
recordWriter=new LineRecordWriter<K,V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
-
}else{
-
Path file=new Path(workPath,baseName);
-
FSDataOutputStream fileOut=file.getFileSystem(conf).create(file,false);
-
recordWriter =new LineRecordWriter<K,V>(fileOut,keyValueSeparator);
-
}
-
return recordWriter;
-
}
-
}
write()方法得到輸出的文件名0.txt和1.txt並將兩文件寫到hdfs上,close()方法關閉輸出文件的數據流。getBaseRecordWriter()方法首先用getCompressOutput(job) 從配置判斷輸出是否壓縮,根據是否壓縮獲取相應的LineRecordWriter。
MyMultipleOutputFormat完整代碼:
-
package mapreduce;
-
import java.io.DataOutputStream;
-
import java.io.IOException;
-
import java.io.PrintWriter;
-
import java.io.UnsupportedEncodingException;
-
import java.util.HashMap;
-
import java.util.Iterator;
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.FSDataOutputStream;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.io.Writable;
-
import org.apache.hadoop.io.WritableComparable;
-
import org.apache.hadoop.io.compress.CompressionCodec;
-
import org.apache.hadoop.io.compress.GzipCodec;
-
import org.apache.hadoop.mapreduce.OutputCommitter;
-
import org.apache.hadoop.mapreduce.RecordWriter;
-
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.util.ReflectionUtils;
-
public abstract class MyMultipleOutputFormat <K extends WritableComparable<?>,V extends Writable> extends FileOutputFormat<K,V>{
-
private MultiRecordWriter writer=null;
-
public RecordWriter<K,V> getRecordWriter(TaskAttemptContext job) throws IOException{
-
if(writer==null){
-
writer=new MultiRecordWriter(job,getTaskOutputPath(job));
-
}
-
return writer;
-
}
-
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException{
-
Path workPath=null;
-
OutputCommitter committer=super.getOutputCommitter(conf);
-
if(committer instanceof FileOutputCommitter){
-
workPath=((FileOutputCommitter) committer).getWorkPath();
-
}else{
-
Path outputPath=super.getOutputPath(conf);
-
if(outputPath==null){
-
throw new IOException("Undefined job output-path");
-
}
-
workPath=outputPath;
-
}
-
return workPath;
-
}
-
protected abstract String generateFileNameForKayValue(K key,V value,Configuration conf);
-
protected static class LineRecordWriter<K,V> extends RecordWriter<K, V> {
-
private static final String utf8 = "UTF-8";
-
private static final byte[] newline;
-
private PrintWriter tt;
-
static {
-
try {
-
newline = "\n".getBytes(utf8);
-
} catch (UnsupportedEncodingException uee) {
-
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
-
}
-
}
-
-
protected DataOutputStream out;
-
private final byte[] keyValueSeparator;
-
-
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
-
this.out = out;
-
try {
-
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
-
} catch (UnsupportedEncodingException uee) {
-
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
-
}
-
}
-
-
public LineRecordWriter(DataOutputStream out) {
-
this(out, ":");
-
}
-
private void writeObject(Object o) throws IOException {
-
if (o instanceof Text) {
-
Text to = (Text) o;
-
out.write(to.getBytes(), 0, to.getLength());
-
} else {
-
out.write(o.toString().getBytes(utf8));
-
}
-
}
-
-
public synchronized void write(K key, V value)
-
throws IOException {
-
boolean nullKey = key == null || key instanceof NullWritable;
-
boolean nullValue = value == null || value instanceof NullWritable;
-
if (nullKey && nullValue) {//
-
return;
-
}
-
if (!nullKey) {
-
writeObject(key);
-
}
-
if (!(nullKey || nullValue)) {
-
out.write(keyValueSeparator);
-
}
-
if (!nullValue) {
-
writeObject(value);
-
}
-
out.write(newline);
-
-
}
-
-
public synchronized
-
void close(TaskAttemptContext context) throws IOException {
-
out.close();
-
}
-
}
-
public class MultiRecordWriter extends RecordWriter<K,V>{
-
private HashMap<String,RecordWriter<K,V> >recordWriters=null;
-
private TaskAttemptContext job=null;
-
private Path workPath=null;
-
public MultiRecordWriter(TaskAttemptContext job,Path workPath){
-
super();
-
this.job=job;
-
this.workPath=workPath;
-
recordWriters=new HashMap<String,RecordWriter<K,V>>();
-
-
}
-
public void close(TaskAttemptContext context) throws IOException, InterruptedException{
-
Iterator<RecordWriter<K,V>> values=this.recordWriters.values().iterator();
-
while(values.hasNext()){
-
values.next().close(context);
-
}
-
this.recordWriters.clear();
-
}
-
public void write(K key,V value) throws IOException, InterruptedException{
-
String baseName=generateFileNameForKayValue(key ,value,job.getConfiguration());
-
RecordWriter<K,V> rw=this.recordWriters.get(baseName);
-
if(rw==null){
-
rw=getBaseRecordWriter(job,baseName);
-
this.recordWriters.put(baseName,rw);
-
}
-
rw.write(key, value);
-
}
-
-
-
private RecordWriter<K,V> getBaseRecordWriter(TaskAttemptContext job,String baseName)throws IOException,InterruptedException{
-
Configuration conf=job.getConfiguration();
-
boolean isCompressed=getCompressOutput(job);
-
String keyValueSeparator= ":";
-
RecordWriter<K,V> recordWriter=null;
-
if(isCompressed){
-
Class<?extends CompressionCodec> codecClass=getOutputCompressorClass(job,(Class<?extends CompressionCodec>) GzipCodec.class);
-
CompressionCodec codec=ReflectionUtils.newInstance(codecClass,conf);
-
Path file=new Path(workPath,baseName+codec.getDefaultExtension());
-
FSDataOutputStream fileOut=file.getFileSystem(conf).create(file,false);
-
recordWriter=new LineRecordWriter<K,V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
-
}else{
-
Path file=new Path(workPath,baseName);
-
FSDataOutputStream fileOut=file.getFileSystem(conf).create(file,false);
-
recordWriter =new LineRecordWriter<K,V>(fileOut,keyValueSeparator);
-
}
-
return recordWriter;
-
}
-
}
-
}
測試程序代碼也分為三部分Mapper部分reducer部分還有在里面添加一個靜態類AlphabetOutputFormat。另外要注意在主函數里面把job的輸出格式類設置為AlphabetOutputFormat類。
Mapper代碼:
-
public static class TokenizerMapper extends Mapper<Object,Text,Text,Text>{
-
private Text val=new Text();
-
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
-
String str[]=value.toString().split("\t");
-
val.set(str[0]+" "+str[1]+" "+str[2]);
-
context.write(new Text(str[3]), val);
-
}
-
}
用split("\t")把數據截取出來,把代表flag的字段作為key,剩下的字段作為value,用context的write()方法將<key,value>直接輸出。
reducer代碼:
-
public static class IntSumReducer extends Reducer<Text,Text,Text,Text>{
-
public void reduce(Text key,Iterable<Text> values,Context context)
-
throws IOException,InterruptedException{
-
for(Text val:values){
-
context.write(key,val);
-
}
-
}
-
}
map輸出的<key,value>鍵值對先經過shuffle,把key相同的value值放到一個迭代器中形成values,在將<key,values>傳遞給reduce函數,reduce函數將輸入的key直接復制給輸出的key,將輸入的values通過增強版for循環遍歷,並把里面的每個元素賦值給輸出的value,再用context的write()方法進行逐一輸出<key,value>,輸出的次數為循環的次數。
AlphabetOutputFormat代碼:
-
public static class AlphabetOutputFormat extends MyMultipleOutputFormat<Text,Text>{
-
protected String generateFileNameForKayValue(Text key,Text value,Configuration conf){
-
return key+".txt";
-
}
-
}
該類繼承MyMultipleOutputFormat類並重寫generateFileNameForKayValue()抽象方法,令其返回值為key+".txt"。
測試類完整代碼:
-
package mapreduce;
-
import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
public class FileOutputMR {
-
public static class TokenizerMapper extends Mapper<Object,Text,Text,Text>{
-
private Text val=new Text();
-
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
-
String str[]=value.toString().split("\t");
-
val.set(str[0]+" "+str[1]+" "+str[2]);
-
context.write(new Text(str[3]), val);
-
}
-
}
-
public static class IntSumReducer extends Reducer<Text,Text,Text,Text>{
-
public void reduce(Text key,Iterable<Text> values,Context context)
-
throws IOException,InterruptedException{
-
for(Text val:values){
-
context.write(key,val);
-
}
-
}
-
}
-
public static class AlphabetOutputFormat extends MyMultipleOutputFormat<Text,Text>{
-
protected String generateFileNameForKayValue(Text key,Text value,Configuration conf){
-
return key+".txt";
-
}
-
}
-
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
-
Configuration conf=new Configuration();
-
Job job=new Job(conf,"FileOutputMR");
-
job.setJarByClass(FileOutputMR.class);
-
job.setMapperClass(TokenizerMapper.class);
-
job.setCombinerClass(IntSumReducer.class);
-
job.setReducerClass(IntSumReducer.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(Text.class);
-
job.setOutputFormatClass(AlphabetOutputFormat.class);
-
FileInputFormat.addInputPath(job,new Path("hdfs://localhost:9000/mymapreduce12/in/cat_group1"));
-
FileOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9000/mymapreduce12/out"));
-
System.exit(job.waitForCompletion(true)?0:1);
-
}
-
}
8.在FileOutputMR類文件中,右鍵並點擊=>Run As=>Run on Hadoop選項,將MapReduce任務提交到Hadoop中。
9.待執行完畢后,進入命令模式,在HDFS上從mymapreduce12/out中查看實驗結果。
-
hadoop fs -ls /mymapreduce12/out
-
hadoop fs -cat /mymapreduce12/out/0.txt
-
hadoop fs -cat /mymapreduce12/out/1.txt