hbase-spark bulk load(二)


概述

之前寫過spark批量導入Hbase的案例:Spark、BulkLoad Hbase、單列、多列,實現了多列的操作。整個過程涉及到排序、分解等操作相對復雜。

最近看官網的文檔,發現有兩種方法:
73節的Bulk Loading中的為我之前實現的方法
111節的Bulk Load為hbase-spark中自帶的方法

但是在測試過程中發現官網的案例缺少某些關鍵代碼,無法直接測試,於是花了一點時間去實現一下對比兩種方法的效率


依賴包

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.11</scala.version>
        <spark.version>2.3.2.3.1.0.0-78</spark.version>
        <hbase.version>2.0.2</hbase.version>
        <hadoop.version>3.1.1</hadoop.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.7</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.45</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-it</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-spark</artifactId>
            <version>2.0.2.3.1.0.0-78</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-spark-it</artifactId>
            <version>2.0.2.3.1.0.0-78</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
    </dependencies>

實現代碼

import java.io.IOException
import java.util.UUID

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.spark.{ByteArrayWrapper, FamiliesQualifiersValues, FamilyHFileWriteOptions, HBaseContext}
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}

object BulkLoadDemo2 {
  var sourceTable: String = "HFlileOut"
  val spark: SparkSession = SparkSession
    .builder()
    .master("local")
    .appName("ExportToHBase")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.shuffle.file.buffer", "2048k")
    .config("spark.executor.cores", "2")
    .getOrCreate()
  val sc: SparkContext = spark.sparkContext

  val conf: Configuration = HBaseConfiguration.create()
  conf.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181")
  conf.setInt("zookeeper.recovery.retry", 0)
  conf.setInt("hbase.client.retries.number", 0)
  val hbaseContext = new HBaseContext(sc, conf)

  var savePath: Path = new Path(s"/warehouse/data/tmp/hbase/HFlileOut")
  val cf: String = "cf"
  val conn: Connection = ConnectionFactory.createConnection(conf)
  val tableName: TableName = createTable(conn, sourceTable, cf)

  def main(args: Array[String]): Unit = {
    val start = System.currentTimeMillis()
    //生成HFile
    generateHFiles
    //將HFile導入Hbase
    loadHFileToHbase
    val end = System.currentTimeMillis()
    println("耗時: " + (end - start) / (1000 * 60).toDouble + " min...")
  }

  /**
   * 生成hbase table的rowkey
   * 隨機加個UUID
   *
   * @param baseInfo rowkey中的組成部分
   * @return
   */
  def generateRowKey(baseInfo: String): String = {
    val uuid = UUID.randomUUID.toString
    val idx = uuid.lastIndexOf("-")
    new StringBuffer(baseInfo).append("_").append(uuid.substring(idx + 1)).toString
  }

  /**
   * 讀取文件並生成hfile
   */
  def generateHFiles(): Unit = {
    //先刪除可能存在的目錄
    delete_hdfspath(savePath.toUri.getPath)
    //一定要導入這個包才能使用hbaseBulkLoadThinRows
    import org.apache.hadoop.hbase.spark.HBaseRDDFunctions.GenericHBaseRDDFunctions
    //獲取數據
    val sourceDataFrame: DataFrame = ...
    val columnsName: Array[String] = sourceDataFrame.columns //獲取所有列名
    sourceDataFrame
      .rdd
      .map(row => {
        val familyQualifiersValues: FamiliesQualifiersValues = new FamiliesQualifiersValues
        val rowkey: String = generateRowKey(row.getAs[Int]("UID") + "")
        //對每一列進行處理
        for (i <- 0 until columnsName.length - 1) {
          try {
            familyQualifiersValues += (Bytes.toBytes(cf), Bytes.toBytes(columnsName(i)), Bytes.toBytes(row.getAs[String](columnsName(i)) + ""))
          } catch {
            case e: ClassCastException =>
              familyQualifiersValues += (Bytes.toBytes(cf), Bytes.toBytes(columnsName(i)), Bytes.toBytes(row.getAs[BigInt](columnsName(i)) + ""))
            case e: Exception =>
              e.printStackTrace()
          }
        }
        (new ByteArrayWrapper(Bytes.toBytes(rowkey)), familyQualifiersValues)
      }).hbaseBulkLoadThinRows(hbaseContext, tableName, t => t, savePath.toUri.getPath, new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions], compactionExclude = false, HConstants.DEFAULT_MAX_FILE_SIZE)
  }

  /**
   * 創建HBase表
   *
   * @param conn
   * @param tableName
   * @param cf
   * @return
   */
  def createTable(conn: Connection, tableName: String, cf: String): TableName = {
    val tn = TableName.valueOf(tableName)
    var admin: Admin = null
    try {
      admin = conn.getAdmin
      val flag = admin.tableExists(tn)
      if (!flag) { //表不存在,則創建
        val builder = TableDescriptorBuilder.newBuilder(tn)
        val cfd: ColumnFamilyDescriptor = ColumnFamilyDescriptorBuilder.of(cf)
        builder.setColumnFamily(cfd)
        admin.createTable(builder.build)
      }
    } catch {
      case e: IOException =>
        e.printStackTrace()
    }
    tn
  }

  /**
   * 將File文件導入HBase,本質是移動HFile到HBase目錄下
   */
  def loadHFileToHbase() = {
    //開始即那個HFile導入到Hbase,此處都是hbase的api操作
    val load: LoadIncrementalHFiles = new LoadIncrementalHFiles(conf)

    //創建hbase的鏈接,利用默認的配置文件,實際上讀取的hbase的master地址
    val conn: Connection = ConnectionFactory.createConnection(conf)

    //根據表名獲取表
    val table: Table = conn.getTable(TableName.valueOf(sourceTable))

    //獲取hbase表的region分布
    val regionLocator: RegionLocator = conn.getRegionLocator(TableName.valueOf(sourceTable))

    //創建一個hadoop的mapreduce的job
    val job: Job = Job.getInstance(conf)

    //設置job名稱
    job.setJobName(s"$sourceTable LoadIncrementalHFiles")

    //此處最重要,需要設置文件輸出的key,因為我們要生成HFil,所以outkey要用ImmutableBytesWritable
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])

    //輸出文件的內容KeyValue
    job.setMapOutputValueClass(classOf[KeyValue])

    //配置HFileOutputFormat2的信息
    HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)

    //開始導入
    load.doBulkLoad(savePath, conn.getAdmin, table, regionLocator)
    spark.stop()
  }

  /**
   * 刪除hdfs下的文件
   *
   * @param url 需要刪除的路徑
   */
  def delete_hdfspath(url: String) {
    val hdfs: FileSystem = FileSystem.get(new Configuration)
    val path: Path = new Path(url)
    if (hdfs.exists(path)) {
      hdfs.delete(path, true)
    }
  }
}

效率對比

對比兩種方法,在導入hbase方面其實是一樣的,不同點在於生成HFile的過程。
此方法不需要手動進行rowkey的排序,其內部已經做了該步驟。整體開發難度大幅降低。

開發效率提高了,但是對比相同資源、數據量的情況,執行效率如下:
方法一:生成HFile+導入Hbase耗時:53min
方法二:生成HFile耗時:63min

目前沒經過太多的測試,未發現效率低的原因,后續跟蹤看看

遇到問題

Could not access type Logging in package org.apache.spark

Error:scalac: missing or invalid dependency detected while loading class file 'HBaseContext.class'.
Could not access type Logging in package org.apache.spark,
because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
A full rebuild may help if 'HBaseContext.class' was compiled against an incompatible version of org.apache.spark.

經過查看HBase官網的一個JIRA找到問題所在

JIRA中的描述為:
在代碼中引用了HBaseContext時,使用Spark2編譯Spark應用程序將會失敗,因為HBaseContext模塊引用了org.apache.spark.Logging
在Spark2中,由於Logging被移動到一個私有的包(org.apache.spark.internal)下導致。

解決辦法:

  1. 在自己的工程下創建一個org.apache.spark的包 並在該報包下創建Trait類型的Logging.scala類型
    image.png

  2. 將spark-core工程下org.apache.spark.internal.Logging類內容拷貝至我們工程下創建的org.apache.spark.Logging類中
    image.png
    image.png


Could not access term streaming in package org.apache.spark
經過上述步驟后再次執行出現新的異常:

Error:scalac: missing or invalid dependency detected while loading class file 'HBaseContext.class'.
Could not access term streaming in package org.apache.spark,
because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
A full rebuild may help if 'HBaseContext.class' was compiled against an incompatible version of org.apache.spark.

問題類似,但是不再是Logging找不到,經過度娘找到一個方案Issue when trying to build #38

根據提示新增生spark-streaming_2.11的依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>

暫時沒搞明白為啥要加spark-streaming_2.11但是問題確實解決了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM