1.文档编写目的
在项目中有需求将数据存储在HBase中。但是原有的方式是通过HBase的API接口批量的将数据写入HBase,但是这种方式的效率并不高,如果数据量过大,可能耗时会比较严重或者占用HBase集群资源较多(如磁盘IO、HBase Handler数等)。Hbase BulkLoad实现的方式有多种,一是使用MapReduce将数据源转换位Hfile数据应格式,二是通过hive的内置函数加载。今天这篇博客笔者将为大家分享使用spark-HBase BulkLoad的方式来进行海量数据批量写入到HBase集群。
在使用BulkLoad之前,我们先来了解一下HBase的存储机制。HBase存储数据其底层使用的是HDFS来作为存储介质,HBase的每一张表对应的HDFS目录上的一个文件夹,文件夹名以HBase表进行命名(如果没有使用命名空间,则默认在default目录下),在表文件夹下存放在若干个Region命名的文件夹,Region文件夹中的每个列簇也是用文件夹进行存储的,每个列簇中存储就是实际的数据,以HFile的形式存在。
2.版本信息
spark 2.6.0
hbase 1.2.0
maven 依赖
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
实现代码
package hbase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, KeyValue, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author: wl
* Description:Hbase批量加载 同一列族多列
* Create: 2020/12/31 14:14
*/
object BulkLoads {
val zookeeperQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181"//zookeeper信息
val dataSourcePath = "hdfs://hadoop01:9000/tmp/2.txt"//源文件
val hFilePath = "hdfs://hadoop01:9000/tmp/result"//hfile的存储路径
val hdfsRootPath = "hdfs://hadoop01:9000/tmp/"//根路径
val tableName = "person"//表名
val familyName = "basic"//列族
val arr = Array("tmp","name", "age")//列的名字集合
def main(args: Array[String]): Unit = {
//获取content
val sparkConf = new SparkConf()
.setAppName(s"${this.getClass.getSimpleName}")
.setMaster("local")
//指定序列化格式,默认是java序列化
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//告知哪些类型需要序列化
.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))
val sc = new SparkContext(sparkConf)
//hadoop配置
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", hdfsRootPath)
//获取输出路径
val fileSystem = FileSystem.get(hadoopConf)
//获取hbase配置
val hconf = HBaseConfiguration.create()
//设置zookeeper集群
hconf.set("hbase.zookeeper.quorum", zookeeperQuorum)
//设置端口
hconf.set("hbase.zookeeper.property.clientPort", "2181");
//设置hfile最大个数
hconf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily","3200")
//设置hfile的大小
hconf.set("hbase.hregion.max.filesize","10737418240")
hconf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
//获取hbase连接
val hbaseConn = ConnectionFactory.createConnection(hconf)
val admin = hbaseConn.getAdmin
/** * 保存生成的HFile文件 * 注:bulk load 生成的HFile文件需要落地 * 然后再通过LoadIncrementalHFiles类load进Hbase * 此处关于 sortBy 操作详解: * 0. Hbase查询是根据rowkey进行查询的,并且rowkey是有序, * 某种程度上来说rowkey就是一个索引,这是Hbase查询高效的一个原因, * 这就要求我们在插入数据的时候,要插在rowkey该在的位置。 * 1. Put方式插入数据,会有WAL,同时在插入Hbase的时候会根据RowKey的值选择合适的位置,此方式本身就可以保证RowKey有序 * 2. bulk load 方式没有WAL,它更像是hive通过load方式直接将底层文件HFile移动到制定的Hbase路径下,所以,在不东HFile的情况下,要保证本身有序才行 * 之前写的时候只要rowkey有序即可,但是2.0.2版本的时候发现clounm也要有序,所以会有sortBy(x => (x._1, x._2.getKeyString), true) * * @param hfileRDD */
// 0. 准备程序运行的环境 // 如果 HBase 表不存在,就创建一个新表 if (!admin.tableExists(TableName.valueOf(tableName))) { val desc = new HTableDescriptor(TableName.valueOf(tableName)) val hcd = new HColumnDescriptor(familyName) desc.addFamily(hcd) admin.createTable(desc) print("创建了一个新表") } // 如果存放 HFile文件的路径已经存在,就删除掉 if(fileSystem.exists(new Path(hFilePath))) { fileSystem.delete(new Path(hFilePath), true) print("删除hdfs上存在的路径") } // 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错: // java.io.IOException: Added a key not lexically larger than previous. val data: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = sc.textFile(dataSourcePath) .map(row => { // 处理数据的逻辑 val arrs = row.split(" ") var kvlist: Seq[KeyValue] = List()//存储多个列 var rowkey: Array[Byte] = null var cn: Array[Byte] = null var v: Array[Byte] = null var kv: KeyValue = null val cf = familyName.getBytes //列族 rowkey = Bytes.toBytes(arrs(0)) //key for (i <- 1 to (arrs.length - 1)) { cn = arr(i).getBytes() //列的名称 v = Bytes.toBytes(arrs(i)) //列的值 //将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key kv = new KeyValue(rowkey, cf, cn, v) //封装一下 rowkey, cf, clounmVale, value kvlist = kvlist :+ kv //将新的kv加在kvlist后面(不能反 需要整体有序) } (new ImmutableBytesWritable(rowkey), kvlist) }) val hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)] = data .flatMapValues(_.iterator) // 2. Save Hfiles on HDFS val table = hbaseConn.getTable(TableName.valueOf(tableName)) val job = Job.getInstance(hconf) job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) job.setMapOutputValueClass(classOf[KeyValue]) HFileOutputFormat2.configureIncrementalLoadMap(job, table) hfileRDD .sortBy(x => (x._1, x._2.getKeyString), true) //要保持 整体有序 .saveAsNewAPIHadoopFile(hFilePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], hconf) print("成功生成HFILE") hbaseConn.close() sc.stop() } }
然后,将上述代码编译打包成jar,上传到Spark集群进行执行,执行命令如下:
spark-submit --master yarn --conf spark.default.parallelism=60 \ --deploy-mode client --driver-memory 1G --executor-memory 2G \ --num-executors 5 --executor-cores 1 \ --class hbase.BulkLoads test.jar
使用BulkLoad导入到HBase
然后,在使用BulkLoad的方式将生成的HFile文件导入到HBase集群中,这里有2种方式。一种是写代码实现导入,另一种是使用HBase命令进行导入。
代码实现导入
// 3. Bulk load Hfiles to Hbase
val bulkLoader = new LoadIncrementalHFiles(hconf)
val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))
bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)
使用hbase命令方式导入hfile
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /tmp/result person
注:已有Hbase表“outPutTable”,想要查看hbase数据除了hbase shell 还可以关联hive表,
参考 https://www.cnblogs.com/lillcol/p/11542618.html
问题及异常
Hbase查询是根据rowkey进行查询的,并且rowkey是有序,某种程度上来说rowkey就是一个索引,这是Hbase查询高效的一个原因。
一开始代码中只是对key排序,在旧的版本测试没问题,但是2.0.2出问题了。
此处报错的意思是当前列CN_TAG 比 上一列FIRST_DT小,
猜测同一个key下clounm也需要有序,
于是对key,clounm排序解决了这个问题。
解决方法:
hfileRDD
.sortBy(x => (x._1, x._2.getKeyString), true) //要保持 整体有序
.saveAsNewAPIHadoopFile(savePath,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
conf)
HBase 根目录不存在
java.util.concurrent.ExecutionException: org.apache.phoenix.shaded.org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /hbase/hbaseid
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.hadoop.hbase.client.ConnectionImplementation.retrieveClusterId(ConnectionImplementation.java:549)
at org.apache.hadoop.hbase.client.ConnectionImplementation.<init>(ConnectionImplementation.java:287)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:219)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:114)
at com.aaa.TestHbase$.main(TestHbase.scala:99)
at com.aaa.TestHbase.main(TestHbase.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
默认为:/hbase
如果修改了需要指定,否则找不到该路径
修改方式有两个:
1 修改配置文件bhase-site.xml
<configuration>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase</value>
</property>
</configuration>
2 代码中设置参数
代码中执行要使用此方法
conf.set("hbase.unsafe.stream.capability.enforce", "false") //hbase 根目录设定
conf.set("zookeeper.znode.parent", "/hbase") //设置成真实的值
一个family下超过了默认的32个hfile
Exception in thread "main" java.io.IOException: Trying to load more than 32 hfiles to one family of one region
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:288)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.run(LoadIncrementalHFiles.java:842)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.main(LoadIncrementalHFiles.java:847)
解决办法有两个:
- 修改配置文件
bhase-site.xml
<property>
<name>hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily</name>
<value>400</value>
</property>
- 代码中设置参数
代码中执行要使用此方法
conf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "400")
