只使用Mapper不使用reduce會大大減少mapreduce程序的運行時間。
有時候程序會往多張hbase表寫數據。
所以有如題的需求。
下面給出的代碼,不是可以運行的代碼,只是展示driver中需要進行的必要項設置,mapper類需要實現的接口,map函數需要的參數以及函數內部的處理方式。
實現過程比較曲折,只貼代碼:
class Qos2HbaseDriver extends Configured implements Tool { private static Logger logger = LoggerFactory .getLogger(Qos2HbaseDriver.class); private static final int DEFAULT_NUM_REDUCE = 0; /** * args[0]輸入hdfs文件路徑,args[1]輸出表 */ @Override public int run(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("output", args[1]);//輸出表1 conf.set("output2", args[2]);//輸出表2 Job job = Job.getInstance(conf); job.setJobName("iplane_Qos2Hbase"); job.setMapperClass(Qos2HbaseMapper.class); FileInputFormat.setInputPaths(job, args[0]); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setOutputFormatClass(MultiTableOutputFormat.class); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(job.getConfiguration()); job.setJarByClass(Qos2Hbase.class); // 設置reduce個數,可調節 int numberReduceTasks = 0; job.setNumReduceTasks(numberReduceTasks); boolean b = job.waitForCompletion(true); if (!b) { logger.error("工作錯誤!"); return -1; } return 0; } } /** * @ClassName: Qos2HbaseMapper * @Description: 將結果入Hbase庫的mapper類 * @author xxx * @date 2014-9-16 下午1:18:49 * */ class Qos2HbaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { private static Logger logger = LoggerFactory .getLogger(Qos2HbaseMapper.class); @Override public void map(LongWritable key, Text line, Context context) throws IOException, InterruptedException { String output = context.getConfiguration().get("output"); String output2 = context.getConfiguration().get("output2"); // 組裝rowkey:ip_ip StringBuffer rowkeySb = "aaaa"; Put put = null; String family = "d"; String qualifier = ""; // 直接將結果存入hbase long ts = System.currentTimeMillis(); put = new Put(Bytes.toBytes(rowkeySb.toString())); qualifier = "del"; put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), ts, Bytes.toBytes(values[6]));// 組裝一條數據 if (!put.isEmpty()) { ImmutableBytesWritable ib = new ImmutableBytesWritable(); ib.set(Bytes.toBytes(output)); context.write(ib, put);// 將結果存入hbase表 } // 存歷史表 rowkeySb.append(rowkeySeparator).append(myDate); put = new Put(Bytes.toBytes(rowkeySb.toString())); qualifier = "del"; put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), ts, Bytes.toBytes(values[6]));// 組裝一條數據 if (!put.isEmpty()) { ImmutableBytesWritable ib = new ImmutableBytesWritable(); ib.set(Bytes.toBytes(output2)); context.write(ib, put);// 將結果存入hbase表 } } }