spark實現BulkLoad批量加載方式導入Hbase數據


1.文檔編寫目的

在項目中有需求將數據存儲在HBase中。但是原有的方式是通過HBase的API接口批量的將數據寫入HBase,但是這種方式的效率並不高,如果數據量過大,可能耗時會比較嚴重或者占用HBase集群資源較多(如磁盤IO、HBase Handler數等)。Hbase BulkLoad實現的方式有多種,一是使用MapReduce將數據源轉換位Hfile數據應格式,二是通過hive的內置函數加載。今天這篇博客筆者將為大家分享使用spark-HBase BulkLoad的方式來進行海量數據批量寫入到HBase集群。

在使用BulkLoad之前,我們先來了解一下HBase的存儲機制。HBase存儲數據其底層使用的是HDFS來作為存儲介質,HBase的每一張表對應的HDFS目錄上的一個文件夾,文件夾名以HBase表進行命名(如果沒有使用命名空間,則默認在default目錄下),在表文件夾下存放在若干個Region命名的文件夾,Region文件夾中的每個列簇也是用文件夾進行存儲的,每個列簇中存儲就是實際的數據,以HFile的形式存在。

2.版本信息

spark 2.6.0

hbase 1.2.0

maven 依賴

<dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.1.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive_2.11</artifactId>
          <version>2.1.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
        <version>1.2.0</version>
       </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.0</version>
        </dependency>
    </dependencies>

 實現代碼

package hbase


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, KeyValue, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Author: wl
  * Description:Hbase批量加載   同一列族多列
  * Create: 2020/12/31 14:14
  */

object BulkLoads {
  val zookeeperQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181"//zookeeper信息
  val dataSourcePath = "hdfs://hadoop01:9000/tmp/2.txt"//源文件
  val hFilePath = "hdfs://hadoop01:9000/tmp/result"//hfile的存儲路徑
  val hdfsRootPath = "hdfs://hadoop01:9000/tmp/"//根路徑
  val tableName = "person"//表名
  val familyName = "basic"//列族
  val arr = Array("tmp","name", "age")//列的名字集合
  def main(args: Array[String]): Unit = {
    //獲取content
    val sparkConf = new SparkConf()
      .setAppName(s"${this.getClass.getSimpleName}")
      .setMaster("local")
      //指定序列化格式,默認是java序列化
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      //告知哪些類型需要序列化
      .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))

    val sc = new SparkContext(sparkConf)

    //hadoop配置
    val hadoopConf = new Configuration()
    hadoopConf.set("fs.defaultFS", hdfsRootPath)
    //獲取輸出路徑
    val fileSystem = FileSystem.get(hadoopConf)
    //獲取hbase配置
    val hconf = HBaseConfiguration.create()
    //設置zookeeper集群
    hconf.set("hbase.zookeeper.quorum", zookeeperQuorum)
    //設置端口
    hconf.set("hbase.zookeeper.property.clientPort", "2181");
    //設置hfile最大個數
    hconf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily","3200")
    //設置hfile的大小
    hconf.set("hbase.hregion.max.filesize","10737418240")

    hconf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    //獲取hbase連接
    val hbaseConn = ConnectionFactory.createConnection(hconf)
    val admin = hbaseConn.getAdmin
 /** * 保存生成的HFile文件 * 注:bulk load 生成的HFile文件需要落地 * 然后再通過LoadIncrementalHFiles類load進Hbase * 此處關於 sortBy 操作詳解: * 0. Hbase查詢是根據rowkey進行查詢的,並且rowkey是有序, * 某種程度上來說rowkey就是一個索引,這是Hbase查詢高效的一個原因, * 這就要求我們在插入數據的時候,要插在rowkey該在的位置。 * 1. Put方式插入數據,會有WAL,同時在插入Hbase的時候會根據RowKey的值選擇合適的位置,此方式本身就可以保證RowKey有序 * 2. bulk load 方式沒有WAL,它更像是hive通過load方式直接將底層文件HFile移動到制定的Hbase路徑下,所以,在不東HFile的情況下,要保證本身有序才行 * 之前寫的時候只要rowkey有序即可,但是2.0.2版本的時候發現clounm也要有序,所以會有sortBy(x => (x._1, x._2.getKeyString), true) * * @param hfileRDD */

// 0. 准備程序運行的環境 // 如果 HBase 表不存在,就創建一個新表 if (!admin.tableExists(TableName.valueOf(tableName))) { val desc = new HTableDescriptor(TableName.valueOf(tableName)) val hcd = new HColumnDescriptor(familyName) desc.addFamily(hcd) admin.createTable(desc) print("創建了一個新表") } // 如果存放 HFile文件的路徑已經存在,就刪除掉 if(fileSystem.exists(new Path(hFilePath))) { fileSystem.delete(new Path(hFilePath), true) print("刪除hdfs上存在的路徑") } // 1. 清洗需要存放到 HFile 中的數據,rowKey 一定要排序,否則會報錯: // java.io.IOException: Added a key not lexically larger than previous. val data: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = sc.textFile(dataSourcePath) .map(row => { // 處理數據的邏輯 val arrs = row.split(" ") var kvlist: Seq[KeyValue] = List()//存儲多個列 var rowkey: Array[Byte] = null var cn: Array[Byte] = null var v: Array[Byte] = null var kv: KeyValue = null val cf = familyName.getBytes //列族 rowkey = Bytes.toBytes(arrs(0)) //key for (i <- 1 to (arrs.length - 1)) { cn = arr(i).getBytes() //列的名稱 v = Bytes.toBytes(arrs(i)) //列的值 //將rdd轉換成HFile需要的格式,我們上面定義了Hfile的key是ImmutableBytesWritable,那么我們定義的RDD也是要以ImmutableBytesWritable的實例為key kv = new KeyValue(rowkey, cf, cn, v) //封裝一下 rowkey, cf, clounmVale, value kvlist = kvlist :+ kv //將新的kv加在kvlist后面(不能反 需要整體有序) } (new ImmutableBytesWritable(rowkey), kvlist) }) val hfileRDD: RDD[(ImmutableBytesWritable, KeyValue)] = data .flatMapValues(_.iterator) // 2. Save Hfiles on HDFS val table = hbaseConn.getTable(TableName.valueOf(tableName)) val job = Job.getInstance(hconf) job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) job.setMapOutputValueClass(classOf[KeyValue]) HFileOutputFormat2.configureIncrementalLoadMap(job, table) hfileRDD .sortBy(x => (x._1, x._2.getKeyString), true) //要保持 整體有序 .saveAsNewAPIHadoopFile(hFilePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], hconf) print("成功生成HFILE") hbaseConn.close() sc.stop() } }

 然后,將上述代碼編譯打包成jar,上傳到Spark集群進行執行,執行命令如下:

spark-submit --master yarn --conf spark.default.parallelism=60 \
--deploy-mode client --driver-memory 1G  --executor-memory 2G  \
--num-executors 5  --executor-cores 1 \
--class hbase.BulkLoads test.jar

 使用BulkLoad導入到HBase

然后,在使用BulkLoad的方式將生成的HFile文件導入到HBase集群中,這里有2種方式。一種是寫代碼實現導入,另一種是使用HBase命令進行導入。

代碼實現導入

//  3. Bulk load Hfiles to Hbase
    val bulkLoader = new LoadIncrementalHFiles(hconf)
    val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))
    bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)

 使用hbase命令方式導入hfile

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /tmp/result person

 注:已有Hbase表“outPutTable”,想要查看hbase數據除了hbase shell 還可以關聯hive表,

參考 https://www.cnblogs.com/lillcol/p/11542618.html

問題及異常

Hbase查詢是根據rowkey進行查詢的,並且rowkey是有序,某種程度上來說rowkey就是一個索引,這是Hbase查詢高效的一個原因。
一開始代碼中只是對key排序,在舊的版本測試沒問題,但是2.0.2出問題了。
此處報錯的意思是當前列CN_TAG 比 上一列FIRST_DT小,
猜測同一個key下clounm也需要有序,
於是對key,clounm排序解決了這個問題。

解決方法:

hfileRDD
      .sortBy(x => (x._1, x._2.getKeyString), true) //要保持 整體有序
      .saveAsNewAPIHadoopFile(savePath,
        classOf[ImmutableBytesWritable],
        classOf[KeyValue],
        classOf[HFileOutputFormat2],
        conf)

 HBase 根目錄不存在

java.util.concurrent.ExecutionException: org.apache.phoenix.shaded.org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /hbase/hbaseid
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at org.apache.hadoop.hbase.client.ConnectionImplementation.retrieveClusterId(ConnectionImplementation.java:549)
        at org.apache.hadoop.hbase.client.ConnectionImplementation.<init>(ConnectionImplementation.java:287)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:219)
        at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:114)
        at com.aaa.TestHbase$.main(TestHbase.scala:99)
        at com.aaa.TestHbase.main(TestHbase.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 

默認為:/hbase
如果修改了需要指定,否則找不到該路徑

修改方式有兩個:

1 修改配置文件bhase-site.xml

<configuration>
  <property>
    <name>hbase.unsafe.stream.capability.enforce</name>
    <value>false</value>
  </property>

  <property>
    <name>zookeeper.znode.parent</name>
    <value>/hbase</value>
  </property>
</configuration>

 2 代碼中設置參數
代碼中執行要使用此方法

conf.set("hbase.unsafe.stream.capability.enforce", "false") //hbase  根目錄設定
conf.set("zookeeper.znode.parent", "/hbase") //設置成真實的值

 一個family下超過了默認的32個hfile

Exception in thread "main" java.io.IOException: Trying to load more than 32 hfiles to one family of one region
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:288)
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.run(LoadIncrementalHFiles.java:842)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
        at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.main(LoadIncrementalHFiles.java:847)

 

解決辦法有兩個:

  • 修改配置文件bhase-site.xml
<property>
    <name>hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily</name>
    <value>400</value>
  </property>
  • 代碼中設置參數
    代碼中執行要使用此方法
conf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "400")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM