一,介紹
1,舊API中有 org.apache.hadoop.mapred.lib.MultipleOutputFormat和org.apache.hadoop.mapred.lib.MultipleOutputs
MultipleOutputFormat allowing to write the output data to different output files.
MultipleOutputs creates multiple OutputCollectors. Each OutputCollector can have its own OutputFormat and types for the key/value pair. Your MapReduce program will decide what to output to each OutputCollector.
2,新API中 org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
整合了上面舊API兩個的功能,沒有了MultipleOutputFormat。
The MultipleOutputs class simplifies writing output data to multiple outputs
Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own OutputFormat, with its own key class and with its own value class.
Case two: to write data to different files provided by user
下面這段話來自Hadoop:The.Definitive.Guide(3rd,Early.Release)P251
“In the old MapReduce API there are two classes for producing multiple outputs: MultipleOutputFormat and MultipleOutputs. In a nutshell, MultipleOutputs is more fully featured, but MultipleOutputFormat has more control over the output directory structure and file naming. MultipleOutputs in the new API combines the best features of the two multiple output classes in the old API.”
二,應用
1,輸出到多個文件或多個文件夾:
驅動中不需要額外改變,只需要在MapClass或Reduce類中加入如下代碼
private MultipleOutputs<Text,IntWritable> mos;
public void setup(Context context) throws IOException,InterruptedException {
mos = new MultipleOutputs(context);
}
public void cleanup(Context context) throws IOException,InterruptedException {
mos.close();
}
然后就可以用mos.write(Key key,Value value,String baseOutputPath)代替context.write(key, value);
在MapClass或Reduce中使用,輸出時也會有默認的文件part-m-00*或part-r-00*,不過這些文件是無內容的,大小為0. 而且只有part-m-00*會傳給Reduce。
2,以多種格式輸出:
public class TestwithMultipleOutputs extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable,Text,Text,IntWritable> {
private MultipleOutputs<Text,IntWritable> mos;
protected void setup(Context context) throws IOException,InterruptedException {
mos = new MultipleOutputs<Text,IntWritable>(context);
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
String[] tokens = line.split("-");
mos.write("MOSInt",new Text(tokens[0]), new IntWritable(Integer.parseInt(tokens[1]))); //(第一處)
mos.write("MOSText", new Text(tokens[0]),tokens[2]); //(第二處)
mos.write("MOSText", new Text(tokens[0]),line,tokens[0]+"/"); //(第三處)同時也可寫到指定的文件或文件夾中
}
protected void cleanup(Context context) throws IOException,InterruptedException {
mos.close();
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf,"word count with MultipleOutputs");
job.setJarByClass(TestwithMultipleOutputs.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setNumReduceTasks(0);
MultipleOutputs.addNamedOutput(job,"MOSInt",TextOutputFormat.class,Text.class,IntWritable.class);
MultipleOutputs.addNamedOutput(job,"MOSText",TextOutputFormat.class,Text.class,Text.class);
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new TestwithMultipleOutputs(), args);
System.exit(res);
}
}
測試的數據:
abc-1232-hdf
abc-123-rtd
ioj-234-grjth
ntg-653-sdgfvd
kju-876-btyun
bhm-530-bhyt
hfter-45642-bhgf
bgrfg-8956-fmgh
jnhdf-8734-adfbgf
ntg-68763-nfhsdf
ntg-98634-dehuy
hfter-84567-drhuk
結果截圖:(結果輸出到/test/testMOSout)
遇到的一個問題:
如果沒有mos.close(), 程序運行中會出現異常:
12/05/21 20:12:47 WARN hdfs.DFSClient: DataStreamer Exception:
org.apache.hadoop.ipc.RemoteException:org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on
/test/mosreduce/_temporary/_attempt_local_0001_r_000000_0/h-r-00000 File does not exist. [Lease. Holder: DFSClient_-352105532, pendingcreates: 5]