概要:
hadoop和hbase導入環境變量:
要運行Hbase中自帶的MapReduce程序,需要運行如下指令,可在官網中找到:
如果遇到如下問題,則說明Hadoop的MapReduce沒有權限訪問Hbase的jar包:
參考官網可解決:
運行后解決:
導入數據運行指令:
tsv是指以制表符為分隔符的文件
先創建測試數據,創建user文件:
上傳至hdfs,並且啟動hbase shell:
創建表:
之后導入數據:
還有一些其他的方法,比如rowcounter統計行數:
接下來演示用sqoop將mysql數據考入hbase,構建測試數據:
使用import,需要先配置hbase環境變量:
Hbase表數據的遷移:
之后編寫MapReduce程序,代碼如下:
package com.tyx.hbase.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Tab2TabMapReduce extends Configured implements Tool { // mapper class public static class TabMapper extends TableMapper<Text, Put> { private Text rowkey = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value,Context context) throws IOException, InterruptedException { byte[] bytes = key.get(); rowkey.set(Bytes.toString(bytes)); Put put = new Put(bytes); for (Cell cell : value.rawCells()) { // add cell if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) { if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { put.add(cell); } } } context.write(rowkey, put); } } // reduce class public static class TabReduce extends TableReducer<Text,Put, ImmutableBytesWritable> { @Override protected void reduce(Text key, Iterable<Put> values,Context context) throws IOException, InterruptedException { for (Put put : values) { context.write(null, put); } } } @Override public int run(String[] args) throws Exception { //create job Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName()); // set run class job.setJarByClass(this.getClass()); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); // set mapper TableMapReduceUtil.initTableMapperJob( "tab1", // input table scan , // scan instance TabMapper.class, // set mapper class Text.class, // mapper output key Put.class, //mapper output value job // set job ); TableMapReduceUtil.initTableReducerJob( "tab2" , // output table TabReduce.class, // set reduce class job // set job ); job.setNumReduceTasks(1); boolean b = job.waitForCompletion(true); if(!b) { System.err.print("error with job!!!"); } return 0; } public static void main(String[] args) throws Exception { //create config Configuration config = HBaseConfiguration.create(); //submit job int status = ToolRunner.run(config, new Tab2TabMapReduce(), args); //exit System.exit(status); } }
運行指令:
接下來是hdfs中文件導入Hbase:
構造數據:
然后編寫MapReduce程序:
package com.jkxy.hbase.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class HDFS2TabMapReduce extends Configured implements Tool{ public static class HDFS2TabMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { ImmutableBytesWritable rowkey = new ImmutableBytesWritable(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\t"); //rk0001 zhangsan 33 Put put = new Put(Bytes.toBytes(words[0])); put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(words[1])); put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(words[2])); rowkey.set(Bytes.toBytes(words[0])); context.write(rowkey, put); } } @Override public int run(String[] args) throws Exception { // create job Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName()); // set class job.setJarByClass(this.getClass()); // set path FileInputFormat.addInputPath(job, new Path(args[0])); //set mapper job.setMapperClass(HDFS2TabMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); // set reduce TableMapReduceUtil.initTableReducerJob( "user", // set table null, job); job.setNumReduceTasks(0); boolean b = job.waitForCompletion(true); if(!b) { throw new IOException("error with job!!!"); } return 0; } public static void main(String[] args) throws Exception { //get configuration Configuration conf = HBaseConfiguration.create(); //submit job int status = ToolRunner.run(conf, new HDFS2TabMapReduce(), args); //exit System.exit(status); } }
運行指令
接下來演示使用BulkLaod將數據從Hdfs導入Hbase,使用該方式可以繞過WAL,memstor等步驟,加快海量數據的效率,代碼如下:
package com.jkxy.hbase.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.mapreduce.PutSortReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class HFile2TabMapReduce extends Configured implements Tool { public static class HFile2TabMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { ImmutableBytesWritable rowkey = new ImmutableBytesWritable(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\t"); Put put = new Put(Bytes.toBytes(words[0])); put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(words[1])); put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(words[2])); rowkey.set(Bytes.toBytes(words[0])); context.write(rowkey, put); } } @Override public int run(String[] args) throws Exception { //create job Job job = Job.getInstance(getConf(), this.getClass().getSimpleName()); // set run jar class job.setJarByClass(this.getClass()); // set input . output FileInputFormat.addInputPath(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2])); // set map job.setMapperClass(HFile2TabMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); // set reduce job.setReducerClass(PutSortReducer.class); HTable table = new HTable(getConf(), args[0]); // set hfile output HFileOutputFormat2.configureIncrementalLoad(job, table ); // submit job boolean b = job.waitForCompletion(true); if(!b) { throw new IOException(" error with job !!!"); } LoadIncrementalHFiles loader = new LoadIncrementalHFiles(getConf()); // load hfile loader.doBulkLoad(new Path(args[2]), table); return 0; } public static void main(String[] args) throws Exception { // get configuration Configuration conf = HBaseConfiguration.create(); //run job int status = ToolRunner.run(conf, new HFile2TabMapReduce(), args); // exit System.exit(status); } }
使用如下指令: