MapReduce輸出文件名更改


1、默認情況下生成的文件名是part-r-00000格式,想要自定義生成輸出文件名可以使用org.apache.hadoop.mapreduce.lib.output.MultipleOutputs類用來寫出

2、MultipleOutputs類需要在Reduce的setup()方法初始化,最好在cleanup()中關閉

3、這個時候還會生產成part-r-000000這種文件,發現是里面是空的,需要 LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

4、MultipleOutputs類的write()方法有幾個重載的函數

write(KEYOUT key, VALUEOUT value, String baseOutputPath)
如果baseOutputPath帶/,那么輸出路徑就是baseOutputPath + -r-00000,比如baseOutpuPath="/PUFA/" + key.toString(),輸出文件路徑就是/PUFA/0000000001-r-000000
不帶/,輸出路徑就是FileOutputFormat.setOutputPath(job, outPutPath)下面,比如baseOutputPath=key.toString(),ouputPath=/trx,輸出文件的路徑就是/trx/0000000001-r-000000
 
        
write方法如果是帶namedOutput參數的,需要在運行主類上面指定namedOutput,
MultipleOutputs.addNamedOutput(job, "PFBANK", TextOutputFormat.class, JournalTrxDataSet.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, "ZSBANK", TextOutputFormat.class, JournalTrxDataSet.class, NullWritable.class);
write(String namedOutput, K key, V value, String baseOutputPath) 
write(String namedOutput, K key, V value) 
這兩種況和上面方法差不多,就是通過namedOutput對一組reduce處理的結果輸出到不同的文件夾中,如果沒有baseOutputPath,會輸出到FileOutputFormat.setOutputPath()目錄
  if (Integer.parseInt(key.toString()) >= 500000){
       mos.write("PFBANK", journalTrxDataSet, NullWritable.get(), "/PUFA/" + key.toString());
  }else if(Integer.parseInt(key.toString()) < 500000){
       mos.write("ZSBANK", journalTrxDataSet, NullWritable.get(), "/ZHAOSHANG/" + key.toString());
  }

注意:有的時候會出現_SUCCESS文件和reduce輸出的文件不在同一個目錄,這是因為FileOutputFormat.setOutputPaht()和MultipleOutputs類的write()方法設置的baseOutputPath不一樣所致,_SUCCESS文件始終在FileOutputFormat.setOutputPaht()設定的路徑上

 有的時候會報一些莫名其妙的錯的話,可能是LazyOutputFormat.setOutputFormatClass()和MultipleOutputs.addNamedOutput()的formatclass參數有關

 

最后附上完整代碼

當時我們的需求是,需要分析統計銀行各個終端的交易情況,當時我們數據量也不太多,領導說盡可能簡單點做,當時統計緯度有兩種,一種是按照機器,一種是按照分行,所以直接使用了同一個mapreduce來完成,當然,下面代碼只是統計機器的,分行的維度還沒寫上。當時mr程序是獨立的一個模塊,在數據采集完成后,會直接使用sh或cmd命令調用這個jar包,文件名當成啟動參數傳遞過來。文件需要自定義

package com.xhy.xgg.mapreduce;

import com.xhy.xgg.common.HDFSCommand;
import com.xhy.xgg.common.enums.StatisticDemensionEnums;
import com.xhy.xgg.entity.JournalTrxDataSet;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

@Slf4j
@Service
public class MapReduceTest {


    @Value("${data-set.output-path}")
    private String dataCollectionOutputPath;
    @Value("${data-collection.output-path}")
    private String dataCollectionIutputPath;

    @Value("${data-collection.hdfsURI}")
    private String hdfsURI;

    @Value("${mr-run-mode.value}")
    private String mrRunMode;

    @PostConstruct
    public void executeMRJob(StatisticDemensionEnums statisticDemensionEnums)
            throws IOException, ClassNotFoundException, InterruptedException {

        log.info(">> executeMRJob start execute");
        Configuration conf = new Configuration();
        conf.set("dfs.blocksize", "67108864");
        conf.set("dfs.replication", "1");
        conf.set("mapreduce.framework.name", mrRunMode);
        conf.set("fs.defaultFS", hdfsURI);
        Job job;
        job = Job.getInstance(conf, "JournalDataProcessService");
        job.setJarByClass(com.xhy.xgg.mapreduce.TerminalJournalDataService.class);
        job.getConfiguration().set("statisticDemensionEnums", String.valueOf(statisticDemensionEnums.getIndex()));
        job.setMapperClass(com.xhy.xgg.mapreduce.MapReduceTest.JournalDataMapper.class);
        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(JournalTrxDataSet.class);
        job.setReducerClass(com.xhy.xgg.mapreduce.MapReduceTest.JournalDataReducer.class);
        job.setOutputKeyClass(JournalTrxDataSet.class);
        job.setOutputValueClass(NullWritable.class);
        job.setPartitionerClass(com.xhy.xgg.mapreduce.MapReduceTest.JournalDataPartitioner.class);
        // job.setSortComparatorClass(JournalTrxDataComparator.class);
        job.setNumReduceTasks(3);
        Path inputPath = new Path(dataCollectionIutputPath + File.separator + "esbTrx" + File.separator
                + hdfsFileName);

        log.info("-- executeMRJob inputPath = {}", inputPath.toString());

        FileInputFormat.addInputPath(job, inputPath);

        String outPutPathData = "";

        outPutPathData = dataCollectionOutputPath + "_" + new SimpleDateFormat("yyyyMMdd").format(new Date());
        try {
            if (HDFSCommand.exists(conf, hdfsURI, outPutPathData)) {
                HDFSCommand.delete(conf, hdfsURI, outPutPathData);
            }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            log.error("<< executeMRJob exception, message {}", e.getMessage());
        }
        Path outPutPath = new Path(outPutPathData);
        FileOutputFormat.setOutputPath(job, outPutPath);
        MultipleOutputs.addNamedOutput(job, "PFBANK", TextOutputFormat.class, JournalTrxDataSet.class, NullWritable.class);
        MultipleOutputs.addNamedOutput(job, "ZSBANK", TextOutputFormat.class, JournalTrxDataSet.class, NullWritable.class);

        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public void start() throws Exception {
        executeMRJob(StatisticDemensionEnums.TERMINAL);
    }

    private String hdfsFileName = "journal_test.txt";

    private static class JournalDataReducer extends Reducer<Text, JournalTrxDataSet, JournalTrxDataSet, NullWritable> {

        private String statisticDemensionEnums = null;
        JournalTrxDataSet journalTrxDataSet = new JournalTrxDataSet();

        private MultipleOutputs mos;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            super.setup(context);
            mos = new MultipleOutputs<>(context);
            statisticDemensionEnums = context.getConfiguration().get("statisticDemensionEnums");

        }

        @Override
        protected void reduce(Text key, Iterable<JournalTrxDataSet> values,
                              Reducer<Text, JournalTrxDataSet, JournalTrxDataSet, NullWritable>.Context context)
                throws IOException, InterruptedException {

            String branchId = "";
            String branchName = "";
            String trxType = "";
            double trxAmt = 0.0;
            int trxCount = 0;

            for (JournalTrxDataSet rw : values) {
                branchId = rw.getBranchId();
                branchName = rw.getBranchName();
                trxType = rw.getTrxType();
                trxAmt += rw.getAmount();
                trxCount += rw.getCount();

            }
            journalTrxDataSet.Set(key.toString(), branchId, branchName, trxType, trxAmt, trxCount);
            if (Integer.parseInt(key.toString()) >= 500000) {
                mos.write("PFBANK", journalTrxDataSet, NullWritable.get(), "/PUFA/" + key.toString());
            } else if (Integer.parseInt(key.toString()) < 500000) {
                mos.write("ZSBANK", journalTrxDataSet, NullWritable.get(), "/ZHAOSHANG/" + key.toString());
            }
            //mos.write(journalTrxDataSet, NullWritable.get(), "/PUFA/"+  key.toString());
            //mos.write(journalTrxDataSet, NullWritable.get(), key.toString());
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            super.cleanup(context);
            mos.close();
        }

    }

    private static class JournalDataPartitioner extends Partitioner<Text, JournalTrxDataSet> {
        @Override
        public int getPartition(Text key, JournalTrxDataSet value, int arg2) {

            if ("706010101".equals(value.getBranchId())) {
                return 0;
            } else if ("706010106".equals(value.getBranchId())) {
                return 1;
            }
            return 2;
        }

    }

    private static class JournalDataMapper extends Mapper<Object, Text, Text, JournalTrxDataSet> {

        private String statisticDemensionEnums = null;

        JournalTrxDataSet journalTrxDataSet = new JournalTrxDataSet();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            super.setup(context);
            // 上面傳遞過來需要map reduce的緯度,一個按照終端來統計,一個按照分行來統計
            statisticDemensionEnums = context.getConfiguration().get("statisticDemensionEnums");

        }

        @Override
        protected void map(Object key, Text value, Mapper<Object, Text, Text, JournalTrxDataSet>.Context context)
                throws IOException, InterruptedException {

            String strContent = value.toString();
            String result[] = strContent.split("\\|");
            String terminalId = result[0]; // terminal id
            String branchId = result[1]; // branch id
            String branchName = result[2]; // branch id
            double amount = Double.parseDouble(result[4]); // transaction amount
            String trxType = result[5];
            journalTrxDataSet.Set(terminalId, branchId, branchName, trxType, amount, 1);
            context.write(new Text(terminalId), journalTrxDataSet);
        }

    }

    private static class JournalTrxDataComparator extends WritableComparator {


        protected JournalTrxDataComparator() {
            super(JournalTrxDataSet.class, true);
        }

        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {

            JournalTrxDataSet j1 = (JournalTrxDataSet) w1;
            JournalTrxDataSet j2 = (JournalTrxDataSet) w2;
            int resultCompare = 0;
            /*
             * if (j1.getAmount() == j2.getAmount()) { resultCompare = 0; } else if
             * (j1.getAmount() < j2.getAmount()) { resultCompare = -1; } else {
             * resultCompare = 1; }
             */
            return resultCompare;// return -1,0,1
        }

        public static void main(String[] args) {

            SpringApplication.run(com.xhy.xgg.mapreduce.TerminalJournalDataService.class, args);
        }

    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM