spark的bulkload報錯及解決


需求

將HDFS上的數據解析出來,然后通過hfile方式批量寫入Hbase(需要多列寫入)

寫入數據的關鍵api:

rdd.saveAsNewAPIHadoopFile(
        stagingFolder,
        classOf[ImmutableBytesWritable],
        classOf[KeyValue],
        classOf[HFileOutputFormat2],
        job.getConfiguration)

特殊地方:

1):

最初寫hfile警告⚠️:

Does it contain files in subdirectories that correspond to column family names

這個原因大概3種:

A:代碼問題

B:數據源問題

C:setMapOutputKeyClass 和 saveAsNewAPIHadoopFile中的Class不一致

(我的是數據源問題)

2):

正常些put操作的時候,服務端自動幫助排序,因此在使用put操作的時候沒有涉及到這樣的錯誤:

Added a key not lexically larger than previous

但是在寫hfile的時候如果出現報錯:

Added a key not lexically larger than previous

這樣的錯誤,一般會認為rowkey沒有做好排序,然后傻fufu的去驗證了一下,rowkey的確做了排序

真正原因:

spark寫hfile時候是按照rowkey+列族+列名進行排序的,因此在寫入數據的時候,要做到整體有序

(事情還沒完)

3):

因為需要多列寫入,最好的方式:要么反射來動態獲取列名稱和列值 、 要么通過datafame去獲取(df.columns)

反射方式:

val listData: RDD[(ImmutableBytesWritable, ListBuffer[KeyValue])] = rdd.map {
        line =>
          val rowkey = line.vintime
          val clazz = Class.forName(XXXXXXXXXXXXXXXX)
          val fields = clazz.getDeclaredFields
          var list = new ListBuffer[String]()
          var kvlist = new ListBuffer[KeyValue]()//
          if (fields != null && fields.size > 0) {
            for (field <- fields) {
              field.setAccessible(true)
              val column = field.getName
              list.append(column)
            }
          }

          val newList = list.sortWith(_ < _)
          val ik = new ImmutableBytesWritable(Bytes.toBytes(rowkey))
          for(column <- newList){
            val declaredField: Field = line.getClass.getDeclaredField(column)
            declaredField.setAccessible(true)
            val value = declaredField.get(line).toString
            val kv: KeyValue = new KeyValue(
              Bytes.toBytes(rowkey),
              Bytes.toBytes(columnFamily),
              Bytes.toBytes(column),
              Bytes.toBytes(value))
            kvlist.append(kv)
          }
          (ik, kvlist)
    }

datafame方式:

val tmpData: RDD[(ImmutableBytesWritable, util.LinkedList[KeyValue])] = df.rdd.map(
      line =>{
        val rowkey = line.getAs[String]("vintime")
        val ik = new ImmutableBytesWritable(Bytes.toBytes(rowkey))
        var linkedList = new util.LinkedList[KeyValue]()
        for (column <- columns) {
          val kv: KeyValue = new KeyValue(
            Bytes.toBytes(rowkey),
            Bytes.toBytes(columnFamily),
            Bytes.toBytes(column),
            Bytes.toBytes(line.getAs[String](column)))
          linkedList.add(kv)
        }
        (ik, linkedList)
      })

    val result: RDD[(ImmutableBytesWritable, KeyValue)] = tmpData.flatMapValues(
      s => {
        val values: Iterator[KeyValue] = JavaConverters.asScalaIteratorConverter(s.iterator()).asScala
        values
      }
    ).sortBy(x =>x._1 , true)

仔細觀察可以發現,其實兩者都做了排序操作,但是即便經過(1)步驟后仍然報錯:

Added a key not lexically larger than previous

那么在回想一下之前寫hfile的要求:

rowkey+列族+列都要有序,那么如果出現數據的重復,也不算是有序的操作!

因為,做一下數據的去重:

val key: RDD[(String, TransferTime)] = data.reduceByKey((x, y) => y)
val unitData: RDD[TransferTime] = key.map(line => line._2)

果然,這樣解決了:Added a key not lexically larger than previous這個異常

但是會報如下另一個異常:

Kryo serialization failed: Buffer overflow

這個是因為在對一些類做kryo序列化時候,數據量的緩存大小超過了默認值,做一下調整即可

sparkConf.set("spark.kryoserializer.buffer.max" , "256m")
sparkConf.set("spark.kryoserializer.buffer" , "64m")

完整代碼:

/**
  * Created by angel
  */
object WriteTransferTime extends WriteToHbase{
  /**
    * @param data      要插入的數據
    * @param tableName 表名
    **/
  override def bulkLoadData(data: RDD[Any], tableName: String , columnFamily:String): Unit = {

    val bean: RDD[TransferTime] = data.map(line => line.asInstanceOf[TransferTime])
    val map: RDD[(String, TransferTime)] = bean.map(line => (line.vintime , line))
    val key: RDD[(String, TransferTime)] = map.reduceByKey((x, y) => y)
    val map1: RDD[TransferTime] = key.map(line => line._2)
    val by1: RDD[TransferTime] = map1.sortBy(f => f.vintime)
    val listData: RDD[(ImmutableBytesWritable, ListBuffer[KeyValue])] = by1.map {
      line =>
        val rowkey = line.vintime
        val clazz = Class.forName("com.dongfeng.code.Bean.message.TransferTime")
        val fields = clazz.getDeclaredFields
        var list = new ListBuffer[String]()
        var kvlist = new ListBuffer[KeyValue]()//
        if (fields != null && fields.size > 0) {
          for (field <- fields) {
            field.setAccessible(true)
            val column = field.getName
            list.append(column)
          }
        }

        val newList = list.sortWith(_ < _)
        val ik = new ImmutableBytesWritable(Bytes.toBytes(rowkey))
        for(column <- newList){
          val declaredField: Field = line.getClass.getDeclaredField(column)
          declaredField.setAccessible(true)
          val value = declaredField.get(line).toString
          val kv: KeyValue = new KeyValue(
            Bytes.toBytes(rowkey),
            Bytes.toBytes(columnFamily),
            Bytes.toBytes(column),
            Bytes.toBytes(value))
          kvlist.append(kv)
        }
        (ik, kvlist)
    }
    val result: RDD[(ImmutableBytesWritable, KeyValue)] = listData.flatMapValues(
      s => {
        val values: Iterator[KeyValue] = s.iterator
        values
      }
    )
    val resultDD: RDD[(ImmutableBytesWritable, KeyValue)] = result.sortBy(x =>x._1 , true)
    WriteToHbaseDB.hfile_load(result , TableName.valueOf(tableName) , columnFamily)
    }
   }
WriteTransferTime
 def hfile_load(rdd:RDD[(ImmutableBytesWritable , KeyValue)] , tableName: TableName , columnFamily:String): Unit ={
    //聲明表的信息
    var table: Table = null
    try{
      val startTime = System.currentTimeMillis()
      println(s"開始時間:-------->${startTime}")
      //生成的HFile的臨時保存路徑
      val stagingFolder = "hdfs://cdh1:9000/hfile/"+tableName+new Date().getTime//

      table = connection.getTable(tableName)
      //如果表不存在,則創建表
      if(!admin.tableExists(tableName)){
        createTable(tableName , columnFamily)
      }

      //開始導入
      val job = Job.getInstance(config)
      job.setJobName("DumpFile")
      job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
      job.setMapOutputValueClass(classOf[KeyValue])

      rdd.sortBy(x => x._1, true).saveAsNewAPIHadoopFile(
        stagingFolder,
        classOf[ImmutableBytesWritable],
        classOf[KeyValue],
        classOf[HFileOutputFormat2],
        job.getConfiguration)



      val load = new LoadIncrementalHFiles(config)
      val regionLocator = connection.getRegionLocator(tableName)
      HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)
      load.doBulkLoad(new Path(stagingFolder), table.asInstanceOf[HTable])
//      load.doBulkLoad(new Path(stagingFolder) , connection.getAdmin , table , regionLocator)

      val endTime = System.currentTimeMillis()
      println(s"結束時間:-------->${endTime}")
      println(s"花費的時間:----------------->${(endTime - startTime)}ms")
    }catch{
      case e:IOException =>
        e.printStackTrace()
    }finally {
      if (table != null) {
        try {
          // 關閉HTable對象 table.close();
        } catch {
          case e: IOException =>
            e.printStackTrace();
        }
      }
      if (connection != null) {
        try { //關閉hbase連接.
          connection.close();
        } catch {
          case e: IOException =>
            e.printStackTrace();
        }
      }
    }
  }
}
落地部分

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM