主要分為:
- 純Java API連接HBase的方式;
- Spark連接HBase的方式;
- Flink連接HBase的方式;
- HBase通過Phoenix連接的方式;
第一種方式是HBase自身提供的比較原始的高效操作方式,而第二、第三則分別是Spark、Flink集成HBase的方式,最后一種是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark、Flink中調用。
注意:HBase2.1.2版本,以下代碼都是基於該版本開發的。
package com.qyb.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; /** * 在本地 C:\Windows\System32\drivers\etc目錄下文件hosts文件后添加虛擬機遠程機ip映射 * 192.168.126.128 qiaoyanbin * @author dell * */ public class HbaseJavaAPI { private static Configuration conf = null; static { conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.126.128"); // conf.set("hbase.zookeeper.property.clientPort", "2181"); } /** * 查詢hbase所有記錄 * * @param tableName * @throws Exception */ public static void getAllRows(String tableName) throws Exception { // 新建一個數據庫管理員 HTable table = new HTable(conf, tableName); Scan scan = new Scan(); ResultScanner results = table.getScanner(scan); // 輸出查詢結果 for (Result result : results) { for (KeyValue kv : result.raw()) { // 設置C:\Windows\System32\drivers\etc的hosts,添加上虛擬機的ip和名稱對應 System.out.print("Rowkey:" + new String(kv.getRow()) + "\t"); System.out.print("時間戳:" + kv.getTimestamp() + "\t"); System.out.print("列族:" + new String(kv.getFamily()) + "\t"); System.out.print("列名:" + new String(kv.getQualifier()) + "\t"); System.out.println("值:" + new String(kv.getValue()) + "\t"); } } } /** * 添加一條數據 * * @param tableName * @param row * @param columnFamily * @param column * @param value * @throws Exception */ public static void addRow(String tableName, String row, String columnFamily, String column, String value) throws Exception { // 新建一個數據庫管理員 HTable table = new HTable(conf, tableName); Put put = new Put(Bytes.toBytes(row)); put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)); table.put(put); } /** * 刪除一條記錄 * @param tableName * @param row * @throws Exception */ public static void delRow(String tableName, String row) throws Exception { // 新建一個數據庫管理員 HTable table = new HTable(conf, tableName); Delete delete = new Delete(Bytes.toBytes(row)); table.delete(delete); } public static void main(String[] args) { try { String tableName = "user"; /*HbaseJavaAPI.addRow(tableName, "004", "u", "name", "zhaozimo"); HbaseJavaAPI.addRow(tableName, "004", "u", "age", "23"); HbaseJavaAPI.addRow(tableName, "004", "u", "sex", "boy"); HbaseJavaAPI.addRow(tableName, "004", "u", "china", "48"); HbaseJavaAPI.addRow(tableName, "004", "u", "math", "59");*/ //HbaseJavaAPI.delRow(tableName, "003"); HbaseJavaAPI.getAllRows(tableName); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } }
2. Spark上連接HBase
Spark上讀寫HBase主要分為新舊兩種API,另外還有批量插入HBase的,通過Phoenix操作HBase的。
2.1 spark讀寫HBase的新舊API
2.1.1 spark寫數據到HBase
使用舊版本saveAsHadoopDataset保存數據到HBase上。
/**
* saveAsHadoopDataset
*/
def writeToHBase(): Unit ={
// 屏蔽不必要的日志顯示在終端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
/* spark2.0以前的寫法
val conf = new SparkConf().setAppName("SparkToHBase").setMaster("local")
val sc = new SparkContext(conf)
*/
val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local[4]").getOrCreate()
val sc = sparkSession.sparkContext
val tableName = "test"
//創建HBase配置
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201") //設置zookeeper集群,也可以通過將hbase-site.xml導入classpath,但是建議在程序里這樣設置
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") //設置zookeeper連接端口,默認2181
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
//初始化job,設置輸出格式,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的
val jobConf = new JobConf(hbaseConf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
val dataRDD = sc.makeRDD(Array("12,jack,16", "11,Lucy,15", "15,mike,17", "13,Lily,14"))
val data = dataRDD.map{ item =>
val Array(key, name, age) = item.split(",")
val rowKey = key.reverse
val put = new Put(Bytes.toBytes(rowKey))
/*一個Put對象就是一行記錄,在構造方法中指定主鍵
* 所有插入的數據 須用 org.apache.hadoop.hbase.util.Bytes.toBytes 轉換
* Put.addColumn 方法接收三個參數:列族,列名,數據*/
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(name))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(age))
(new ImmutableBytesWritable(), put)
}
//保存到HBase表
data.saveAsHadoopDataset(jobConf)
sparkSession.stop()
}
使用新版本saveAsNewAPIHadoopDataset保存數據到HBase上
a.txt文件內容為:
100,hello,20 101,nice,24 102,beautiful,26
/**
* saveAsNewAPIHadoopDataset
*/
def writeToHBaseNewAPI(): Unit ={
// 屏蔽不必要的日志顯示在終端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local[4]").getOrCreate()
val sc = sparkSession.sparkContext
val tableName = "test"
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.OUTPUT_TABLE, tableName)
val jobConf = new JobConf(hbaseConf)
//設置job的輸出格式
val job = Job.getInstance(jobConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]])
val input = sc.textFile("v2120/a.txt")
val data = input.map{item =>
val Array(key, name, age) = item.split(",")
val rowKey = key.reverse
val put = new Put(Bytes.toBytes(rowKey))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(name))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(age))
(new ImmutableBytesWritable, put)
}
//保存到HBase表
data.saveAsNewAPIHadoopDataset(job.getConfiguration)
sparkSession.stop()
}
2.1.2 spark從HBase讀取數據
使用newAPIHadoopRDD從hbase中讀取數據,可以通過scan過濾數據
/**
* scan
*/
def readFromHBaseWithHBaseNewAPIScan(): Unit ={
//屏蔽不必要的日志顯示在終端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val sparkSession = SparkSession.builder().appName("SparkToHBase").master("local").getOrCreate()
val sc = sparkSession.sparkContext
val tableName = "test"
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName)
val scan = new Scan()
scan.addFamily(Bytes.toBytes("cf1"))
val proto = ProtobufUtil.toScan(scan)
val scanToString = new String(Base64.getEncoder.encode(proto.toByteArray))
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN, scanToString)
//讀取數據並轉化成rdd TableInputFormat是org.apache.hadoop.hbase.mapreduce包下的
val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
val dataRDD = hbaseRDD
.map(x => x._2)
.map{result =>
(result.getRow, result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("name")), result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("age")))
}.map(row => (new String(row._1), new String(row._2), new String(row._3)))
.collect()
.foreach(r => (println("rowKey:"+r._1 + ", name:" + r._2 + ", age:" + r._3)))
}
2.2 spark利用BulkLoad往HBase批量插入數據
BulkLoad原理是先利用mapreduce在hdfs上生成相應的HFlie文件,然后再把HFile文件導入到HBase中,以此來達到高效批量插入數據。
/**
* 批量插入 多列
*/
def insertWithBulkLoadWithMulti(): Unit ={
val sparkSession = SparkSession.builder().appName("insertWithBulkLoad").master("local[4]").getOrCreate()
val sc = sparkSession.sparkContext
val tableName = "test"
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val conn = ConnectionFactory.createConnection(hbaseConf)
val admin = conn.getAdmin
val table = conn.getTable(TableName.valueOf(tableName))
val job = Job.getInstance(hbaseConf)
//設置job的輸出格式
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
job.setOutputFormatClass(classOf[HFileOutputFormat2])
HFileOutputFormat2.configureIncrementalLoad(job, table, conn.getRegionLocator(TableName.valueOf(tableName)))
val rdd = sc.textFile("v2120/a.txt")
.map(_.split(","))
.map(x => (DigestUtils.md5Hex(x(0)).substring(0, 3) + x(0), x(1), x(2)))
.sortBy(_._1)
.flatMap(x =>
{
val listBuffer = new ListBuffer[(ImmutableBytesWritable, KeyValue)]
val kv1: KeyValue = new KeyValue(Bytes.toBytes(x._1), Bytes.toBytes("cf1"), Bytes.toBytes("name"), Bytes.toBytes(x._2 + ""))
val kv2: KeyValue = new KeyValue(Bytes.toBytes(x._1), Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(x._3 + ""))
listBuffer.append((new ImmutableBytesWritable, kv2))
listBuffer.append((new ImmutableBytesWritable, kv1))
listBuffer
}
)
//多列的排序,要按照列名字母表大小來
isFileExist("hdfs://node1:9000/test", sc)
rdd.saveAsNewAPIHadoopFile("hdfs://node1:9000/test", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)
val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
bulkLoader.doBulkLoad(new Path("hdfs://node1:9000/test"), admin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
}
/**
* 判斷hdfs上文件是否存在,存在則刪除
*/
def isFileExist(filePath: String, sc: SparkContext): Unit ={
val output = new Path(filePath)
val hdfs = FileSystem.get(new URI(filePath), new Configuration)
if (hdfs.exists(output)){
hdfs.delete(output, true)
}
}
2.3 spark利用Phoenix往HBase讀寫數據
利用Phoenix,就如同msyql等關系型數據庫的寫法,需要寫jdbc
def readFromHBaseWithPhoenix: Unit ={
//屏蔽不必要的日志顯示在終端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val sparkSession = SparkSession.builder().appName("SparkHBaseDataFrame").master("local[4]").getOrCreate()
//表小寫,需要加雙引號,否則報錯
val dbTable = "\"test\""
//spark 讀取 phoenix 返回 DataFrame的第一種方式
val rdf = sparkSession.read
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("url", "jdbc:phoenix:192.168.187.201:2181")
.option("dbtable", dbTable)
.load()
val rdfList = rdf.collect()
for (i <- rdfList){
println(i.getString(0) + " " + i.getString(1) + " " + i.getString(2))
}
rdf.printSchema()
//spark 讀取 phoenix 返回 DataFrame的第二種方式
val df = sparkSession.read
.format("org.apache.phoenix.spark")
.options(Map("table" -> dbTable, "zkUrl" -> "192.168.187.201:2181"))
.load()
df.printSchema()
val dfList = df.collect()
for (i <- dfList){
println(i.getString(0) + " " + i.getString(1) + " " + i.getString(2))
}
//spark DataFrame 寫入 phoenix,需要先建好表
/*df.write
.format("org.apache.phoenix.spark")
.mode(SaveMode.Overwrite)
.options(Map("table" -> "PHOENIXTESTCOPY", "zkUrl" -> "jdbc:phoenix:192.168.187.201:2181"))
.save()
*/
sparkSession.stop()
}

