mapreduce 只使用Mapper往多個hbase表中寫數據


 

只使用Mapper不使用reduce會大大減少mapreduce程序的運行時間。

有時候程序會往多張hbase表寫數據。

所以有如題的需求。

下面給出的代碼,不是可以運行的代碼,只是展示driver中需要進行的必要項設置,mapper類需要實現的接口,map函數需要的參數以及函數內部的處理方式。

實現過程比較曲折,只貼代碼:

class Qos2HbaseDriver extends Configured implements Tool
{
    private static Logger logger = LoggerFactory
            .getLogger(Qos2HbaseDriver.class);
    private static final int DEFAULT_NUM_REDUCE = 0;

    /**
     * args[0]輸入hdfs文件路徑,args[1]輸出表
     */

    @Override
    public int run(String[] args) throws Exception
    {
        Configuration conf = HBaseConfiguration.create();
        conf.set("output", args[1]);//輸出表1
        conf.set("output2", args[2]);//輸出表2

        Job job = Job.getInstance(conf);
        job.setJobName("iplane_Qos2Hbase");
        job.setMapperClass(Qos2HbaseMapper.class);
        FileInputFormat.setInputPaths(job, args[0]);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setOutputFormatClass(MultiTableOutputFormat.class);

        TableMapReduceUtil.addDependencyJars(job);
        TableMapReduceUtil.addDependencyJars(job.getConfiguration());
        job.setJarByClass(Qos2Hbase.class);

        // 設置reduce個數,可調節
        int numberReduceTasks = 0;       
        job.setNumReduceTasks(numberReduceTasks);
        boolean b = job.waitForCompletion(true);
        if (!b)
        {
            logger.error("工作錯誤!");
            return -1;
        }
        return 0;
    }
}

/**
 * @ClassName: Qos2HbaseMapper
 * @Description: 將結果入Hbase庫的mapper類
 * @author xxx
 * @date 2014-9-16 下午1:18:49
 * 
 */
class Qos2HbaseMapper extends
        Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
{
    private static Logger logger = LoggerFactory
            .getLogger(Qos2HbaseMapper.class);

    @Override
    public void map(LongWritable key, Text line, Context context)
            throws IOException, InterruptedException
    {
        String output = context.getConfiguration().get("output");
        String output2 = context.getConfiguration().get("output2");

        // 組裝rowkey:ip_ip
        StringBuffer rowkeySb = "aaaa";

        Put put = null;
        String family = "d";
        String qualifier = "";
        // 直接將結果存入hbase
        long ts = System.currentTimeMillis();
        put = new Put(Bytes.toBytes(rowkeySb.toString()));

        qualifier = "del";
        put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), ts,
                Bytes.toBytes(values[6]));// 組裝一條數據    
            if (!put.isEmpty())
            {
                ImmutableBytesWritable ib = new ImmutableBytesWritable();
                ib.set(Bytes.toBytes(output));
                context.write(ib, put);// 將結果存入hbase表
            }      
            
        // 存歷史表
        rowkeySb.append(rowkeySeparator).append(myDate);
        put = new Put(Bytes.toBytes(rowkeySb.toString()));
        qualifier = "del";
        put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), ts,
                Bytes.toBytes(values[6]));// 組裝一條數據
            if (!put.isEmpty())
            {
                ImmutableBytesWritable ib = new ImmutableBytesWritable();
                ib.set(Bytes.toBytes(output2));
                context.write(ib, put);// 將結果存入hbase表
            }
      
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM