Hbase(七)hbase高級編程


一、Hbase結合mapreduce    

     為什么需要用 mapreduce 去訪問 hbase 的數據?
     ——加快分析速度和擴展分析能力
     Mapreduce 訪問 hbase 數據作分析一定是在離線分析的場景下應用

       

      1、HbaseToHDFS

         從 hbase 中讀取數據,分析之后然后寫入 hdfs,代碼實現:

package com.ghgj.hbase.hbase2hdfsmr;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 作用:從hbase中讀取user_info這個表的數據,然后寫出到hdfs
 */
public class HBaseToHDFSMR {
	
	private static final String ZK_CONNECT = "hadoop03:2181,hadoop04:2181,hadoop05:2181";

	public static void main(String[] args) throws Exception {
		
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", ZK_CONNECT);
		System.setProperty("HADOOP_USER_NAME", "hadoop");
//		conf.set("fs.defaultFS", "hdfs://myha01/");
		
		Job job = Job.getInstance(conf);
		job.setJarByClass(HBaseToHDFSMR.class);
		
		Scan scan = new Scan();
		scan.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("name"));
		/**
		 * TableMapReduceUtil:以util結尾:工具
		 * MapReduceFactory:以factory結尾,它是工廠類,最大作用就是管理對象的生成
		 */
		TableMapReduceUtil.initTableMapperJob("user_info", scan, 
				HBaseToHDFSMRMapper.class, Text.class, NullWritable.class, job);
		job.setReducerClass(HBaseToHDFSMRReducer.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		
		Path outputPath = new Path("/hbase2hdfs/output");
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(outputPath)){
			fs.delete(outputPath);
		}
		FileOutputFormat.setOutputPath(job, outputPath);
		
		boolean waitForCompletion = job.waitForCompletion(true);
		System.exit(waitForCompletion ? 0 : 1);
	}
	
	static class HBaseToHDFSMRMapper extends TableMapper<Text, NullWritable>{
		/**
		 * key:rowkey
		 * value:map方法每執行一次接收到的一個參數,這個參數就是一個Result實例
		 * 這個Result里面存的東西就是rowkey, family, qualifier, value, timestamp
		 */
		@Override
		protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, NullWritable>.Context context) throws IOException, InterruptedException {
			String rowkey = Bytes.toString(key.copyBytes());
			System.out.println(rowkey);
			List<Cell> cells = value.listCells();
			for (int i = 0; i < cells.size(); i++) {
				Cell cell = cells.get(i);
				String rowkey_result = Bytes.toString(cell.getRow()) + "\t"
						+ Bytes.toString(cell.getFamily()) + "\t"
						+ Bytes.toString(cell.getQualifier()) + "\t"
						+ Bytes.toString(cell.getValue()) + "\t"
						+ cell.getTimestamp();
				context.write(new Text(rowkey_result), NullWritable.get());
			}
		}
	}
	
	static class HBaseToHDFSMRReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
		@Override
		protected void reduce(Text key, Iterable<NullWritable> arg1, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
			context.write(key, NullWritable.get());
		}
	}
}

  2、HDFSToHbase

        從 hdfs 從讀入數據,處理之后寫入 hbase,代碼實現:

package com.ghgj.hbase.hbase2hdfsmr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class HDFSToHBaseMR {
	private static final String ZK_CONNECT = "hadoop03:2181,hadoop04:2181,hadoop05:2181";
	private static final String TABLE_NAME = "person_info";

	public static void main(String[] args) throws Exception {

		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", ZK_CONNECT);
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		Job job = Job.getInstance(conf);
		job.setJarByClass(HDFSToHBaseMR.class);

		// 以下這一段代碼是為了創建一張hbase表叫做 person_info
		HBaseAdmin admin = new HBaseAdmin(conf);
		HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
		htd.addFamily(new HColumnDescriptor("base_info"));
		if (admin.tableExists(TABLE_NAME)) {
			admin.disableTable(TABLE_NAME);
			admin.deleteTable(TABLE_NAME);
		}
		admin.createTable(htd);

		// 給job指定mapperclass 和  reducerclass
		job.setMapperClass(HDFSToHBaseMRMapper.class);
		TableMapReduceUtil.initTableReducerJob(TABLE_NAME, HDFSToHBaseMRReducer.class, job);
		
		// 給mapper和reducer指定輸出的key-value的類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setOutputKeyClass(ImmutableBytesWritable.class);
		job.setOutputValueClass(Mutation.class);

		// 指定輸入數據的路徑
		FileInputFormat.setInputPaths(job, new Path("/hbase2hdfs/output"));
		
		// job提交
		boolean boo = job.waitForCompletion(true);
		System.exit(boo ? 0 :1);
	}

	static class HDFSToHBaseMRMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
			context.write(value, NullWritable.get());
		}
	}

	/**
	 * TableReducer extends Reducer 這么做的唯一效果就是把valueout的類型確定為Mutation
	 */
	static class HDFSToHBaseMRReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> {

		/**
		 * baiyc_20150716_0001 base_info name baiyc1 1488348387443
		 */
		@Override
		protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException {

			String[] splits = key.toString().split("\t");
			String rowkeyStr = splits[0];
			ImmutableBytesWritable rowkey = new ImmutableBytesWritable(Bytes.toBytes(rowkeyStr));

			Put put = new Put(Bytes.toBytes(rowkeyStr));

			String family = splits[1];
			String qualifier = splits[2];
			String value = splits[3];
			String ts = splits[4];

			put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Long.parseLong(ts), Bytes.toBytes(value));

			context.write(rowkey, put);
		}
	}

}

二、Hbase和mysql數據庫數據進行互導

      1、mysql數據導入到hbase(用sqoop)

  命令:

sqoop import --connect jdbc:mysql://hadoop01/mytest --username root --password root
--table student --hbase-create-table --hbase-table studenttest --column-family name
--hbase-row-key id

 

其 中 會 報 錯 , 說 Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;)V 是由於版本不兼容引起,我們可以通過事先創建好表就可以使用了。
請使用下面的命令:

sqoop import --connect jdbc:mysql://hadoop01/mytest --username root --password root
--table student --hbase-table studenttest1 --column-family name --hbase-row-key id

 

--hbase-create-table 自動在 hbase 中創建表
--column-family name 指定列簇名字
--hbase-row-key id 指定 rowkey 對應的 mysql 當中的鍵

    2、hbase數據導入到mysql

目前沒有直接的命令將 Hbase 中的數據導出到 mysql,但是可以先將 hbase 中的數據導 出到 hdfs 中,再將數據導出 mysql

替代方案:
先將 hbase 的數據導入到 hdfs 或者 hive,然后再將數據導入到 mysql

三、hbase整合hive

     原理:

Hive 與 HBase 利用兩者本身對外的 API 來實現整合,主要是靠 HBaseStorageHandler 進 行通信,利用 HBaseStorageHandler, Hive 可以獲取到 Hive 表對應的 HBase 表名,列簇以及 列, InputFormat 和 OutputFormat 類,創建和刪除 HBase 表等。

Hive 訪問 HBase 中表數據,實質上是通過 MapReduce 讀取 HBase 表數據,其實現是在 MR 中,使用 HiveHBaseTableInputFormat 完成對 HBase 表的切分,獲取 RecordReader 對象來讀 取數據。

對 HBase 表的切分原則是一個 Region 切分成一個 Split,即表中有多少個 Regions,MR 中就有多 少個 Map。

讀取 HBase 表數據都是通過構建 Scanner,對表進行全表掃描,如果有過濾條件,則轉化為 Filter。當過濾條件為 rowkey 時,則轉化為對 rowkey 的過濾, Scanner 通過 RPC 調用  RegionServer 的 next()來獲取數據;

 1、准備hbase表 數據

create 'mingxing',{NAME => 'base_info',VERSIONS => 1},{NAME => 'extra_info',VERSIONS => 1}

插入數據:

put 'mingxing','rk001','base_info:name','huangbo'
put 'mingxing','rk001','base_info:age','33'
put 'mingxing','rk001','extra_info:math','44'
put 'mingxing','rk001','extra_info:province','beijing'
put 'mingxing','rk002','base_info:name','xuzheng'
put 'mingxing','rk002','base_info:age','44'
put 'mingxing','rk003','base_info:name','wangbaoqiang'
put 'mingxing','rk003','base_info:age','55'
put 'mingxing','rk003','base_info:gender','male'
put 'mingxing','rk004','extra_info:math','33'
put 'mingxing','rk004','extra_info:province','tianjin'
put 'mingxing','rk004','extra_info:children','3'
put 'mingxing','rk005','base_info:name','liutao'
put 'mingxing','rk006','extra_info:name','liujialing'

   2、hive端操作

 

三、hbasetohbase   byMR

package com.ghgj.hbase.hbase2hdfsmr;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class HBaseToHBaseByMR {

	private static final String ZK_CONNECT = "hadoop03:2181,hadoop04:2181,hadoop05:2181";
	private static final String OLD_TABLE_NAME = "user_info";
	private static final String NEW_TABLE_NAME = "person_info2";
	private static final String FAMILY = "base_info";
	private static final String QUALIFIER = "age";

	public static void main(String[] args) throws Exception {

		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", ZK_CONNECT);
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		// conf.set("fs.defaultFS", "hdfs://myha01/");

		Job job = Job.getInstance(conf);
		job.setJarByClass(HBaseToHDFSMR.class);

		// 以下這一段代碼是為了創建一張hbase表叫做 person_info
		HBaseAdmin admin = new HBaseAdmin(conf);
		HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(NEW_TABLE_NAME));
		htd.addFamily(new HColumnDescriptor(FAMILY));
		if (admin.tableExists(NEW_TABLE_NAME)) {
			admin.disableTable(NEW_TABLE_NAME);
			admin.deleteTable(NEW_TABLE_NAME);
		}
		admin.createTable(htd);

		Scan scan = new Scan();
		scan.addColumn(Bytes.toBytes(FAMILY), Bytes.toBytes(QUALIFIER));
		/**
		 * TableMapReduceUtil:以util結尾:工具
		 * MapReduceFactory:以factory結尾,它是工廠類,最大作用就是管理對象的生成
		 */
		TableMapReduceUtil.initTableMapperJob(OLD_TABLE_NAME, scan, HBaseToHBaseByMRMapper.class, Text.class, NullWritable.class, job);
		TableMapReduceUtil.initTableReducerJob(NEW_TABLE_NAME, HBaseToHBaseByMRReducer.class, job);

		// 給mapper和reducer指定輸出的key-value的類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		job.setOutputKeyClass(ImmutableBytesWritable.class);
		job.setOutputValueClass(Mutation.class);

		boolean waitForCompletion = job.waitForCompletion(true);
		System.exit(waitForCompletion ? 0 : 1);
	}

	static class HBaseToHBaseByMRMapper extends TableMapper<Text, NullWritable> {
		/**
		 * key:rowkey value:map方法每執行一次接收到的一個參數,這個參數就是一個Result實例
		 * 這個Result里面存的東西就是rowkey, family, qualifier, value, timestamp
		 */
		@Override
		protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, NullWritable>.Context context) throws IOException, InterruptedException {
			String rowkey = Bytes.toString(key.copyBytes());
			System.out.println(rowkey);
			List<Cell> cells = value.listCells();
			for (int i = 0; i < cells.size(); i++) {
				Cell cell = cells.get(i);
				String rowkey_result = Bytes.toString(cell.getRow()) + "\t" + Bytes.toString(cell.getFamily()) + "\t" + Bytes.toString(cell.getQualifier()) + "\t" + Bytes.toString(cell.getValue()) + "\t" + cell.getTimestamp();
				context.write(new Text(rowkey_result), NullWritable.get());
			}
		}
	}

	/**
	 * TableReducer extends Reducer 這么做的唯一效果就是把valueout的類型確定為Mutation
	 */
	static class HBaseToHBaseByMRReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> {

		/**
		 * baiyc_20150716_0001 base_info name baiyc1 1488348387443
		 */
		@Override
		protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException {

			String[] splits = key.toString().split("\t");
			String rowkeyStr = splits[0];
			ImmutableBytesWritable rowkey = new ImmutableBytesWritable(Bytes.toBytes(rowkeyStr));

			Put put = new Put(Bytes.toBytes(rowkeyStr));

			String family = splits[1];
			String qualifier = splits[2];
			String value = splits[3];
			String ts = splits[4];

			put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Long.parseLong(ts), Bytes.toBytes(value));

			context.write(rowkey, put);
		}
	}
}

  



 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM