Hbase和mapreduce結合
為什么需要用mapreduce去訪問hbase的數據?
——加快分析速度和擴展分析能力
Mapreduce訪問hbase數據作分析一定是在離線分析的場景下應用
案例1、HBase表數據的轉移
在Hadoop階段,我們編寫的MR任務分別進程了Mapper和Reducer兩個類,而在HBase中我們需要繼承的是TableMapper和TableReducer兩個類。
目標:將fruit表中的一部分數據,通過MR遷入到fruit_mr表中
Step1、構建ReadFruitMapper類,用於讀取fruit表中的數據
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
//將fruit的name和color提取出來,相當於將每一行數據讀取出來放入到Put對象中。
Put put = new Put(key.get());
//遍歷添加column行
for(Cell cell: value.rawCells()){
//添加/克隆列族:info
if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
//添加/克隆列:name
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//將該列cell加入到put對象中
put.add(cell);
//添加/克隆列:color
}else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//向該列cell加入到put對象中
put.add(cell);
}
}
}
//將從fruit讀取到的每行數據寫入到context中作為map的輸出
context.write(key, put);
}
}
Step2、構建WriteFruitMRReducer類,用於將讀取到的fruit表中的數據寫入到fruit_mr表中
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
throws IOException, InterruptedException {
//讀出來的每一行數據寫入到fruit_mr表中
for(Put put: values){
context.write(NullWritable.get(), put);
}
}
}
Step3、構建Fruit2FruitMRJob extends Configured implements Tool,用於組裝運行Job任務
//組裝Job
public int run(String[] args) throws Exception {
//得到Configuration
Configuration conf = this.getConf();
//創建Job任務
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(Fruit2FruitMRJob.class);
//配置Job
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500);
//設置Mapper,注意導入的是mapreduce包下的,不是mapred包下的,后者是老版本
TableMapReduceUtil.initTableMapperJob(
"fruit", //數據源的表名
scan, //scan掃描控制器
ReadFruitMapper.class,//設置Mapper類
ImmutableBytesWritable.class,//設置Mapper輸出key類型
Put.class,//設置Mapper輸出value值類型
job//設置給哪個JOB
);
//設置Reducer
TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRReducer.class, job);
//設置Reduce數量,最少1個
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true);
if(!isSuccess){
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}
Step4、主函數中調用運行該Job任務
public static void main( String[] args ) throws Exception{
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(conf, new Fruit2FruitMRJob(), args);
System.exit(status);
}
案例2:從Hbase中讀取數據、分析,寫入hdfs
/** public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> { } * @author duanhaitao@gec.cn * */ public class HbaseReader {
public static String flow_fields_import = "flow_fields_import"; static class HdfsSinkMapper extends TableMapper<Text, NullWritable>{
@Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
byte[] bytes = key.copyBytes(); String phone = new String(bytes); byte[] urlbytes = value.getValue("f1".getBytes(), "url".getBytes()); String url = new String(urlbytes); context.write(new Text(phone + "\t" + url), NullWritable.get());
}
}
static class HdfsSinkReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
@Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get()); } }
public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "spark01");
Job job = Job.getInstance(conf);
job.setJarByClass(HbaseReader.class);
// job.setMapperClass(HdfsSinkMapper.class); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob(flow_fields_import, scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job); job.setReducerClass(HdfsSinkReducer.class);
FileOutputFormat.setOutputPath(job, new Path("c:/hbasetest/output"));
job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class);
job.waitForCompletion(true); }
} |
2.3.2 從hdfs中讀取數據寫入Hbase
q
/** public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> { } * @author duanhaitao@gec.cn * */ public class HbaseSinker {
public static String flow_fields_import = "flow_fields_import"; static class HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); String[] fields = line.split("\t"); String phone = fields[0]; String url = fields[1];
FlowBean bean = new FlowBean(phone,url);
context.write(bean, NullWritable.get()); } }
static class HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable, ImmutableBytesWritable>{
@Override protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
Put put = new Put(key.getPhone().getBytes()); put.add("f1".getBytes(), "url".getBytes(), key.getUrl().getBytes());
context.write(new ImmutableBytesWritable(key.getPhone().getBytes()), put);
}
}
public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "spark01");
HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
boolean tableExists = hBaseAdmin.tableExists(flow_fields_import); if(tableExists){ hBaseAdmin.disableTable(flow_fields_import); hBaseAdmin.deleteTable(flow_fields_import); } HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(flow_fields_import)); HColumnDescriptor hColumnDescriptor = new HColumnDescriptor ("f1".getBytes()); desc.addFamily(hColumnDescriptor);
hBaseAdmin.createTable(desc);
Job job = Job.getInstance(conf);
job.setJarByClass(HbaseSinker.class);
job.setMapperClass(HbaseSinkMrMapper.class); TableMapReduceUtil.initTableReducerJob(flow_fields_import, HbaseSinkMrReducer.class, job);
FileInputFormat.setInputPaths(job, new Path("c:/hbasetest/data"));
job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Mutation.class);
job.waitForCompletion(true);
}
} |