將HDFS上的數據解析出來,然后通過hfile方式批量寫入Hbase(需要多列寫入)
寫入數據的關鍵api:
rdd.saveAsNewAPIHadoopFile(
stagingFolder,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
job.getConfiguration)
特殊地方:
Does it contain files in subdirectories that correspond to column family names
A:代碼問題
B:數據源問題
C:setMapOutputKeyClass 和 saveAsNewAPIHadoopFile中的Class不一致
(我的是數據源問題)
2):
正常些put操作的時候,服務端自動幫助排序,因此在使用put操作的時候沒有涉及到這樣的錯誤:
Added a key not lexically larger than previous
但是在寫hfile的時候如果出現報錯:
Added a key not lexically larger than previous
這樣的錯誤,一般會認為rowkey沒有做好排序,然后傻fufu的去驗證了一下,rowkey的確做了排序
spark寫hfile時候是按照rowkey+列族+列名進行排序的,因此在寫入數據的時候,要做到整體有序
(事情還沒完)
3):
因為需要多列寫入,最好的方式:要么反射來動態獲取列名稱和列值 、 要么通過datafame去獲取(df.columns)
反射方式:
val listData: RDD[(ImmutableBytesWritable, ListBuffer[KeyValue])] = rdd.map { line => val rowkey = line.vintime val clazz = Class.forName(XXXXXXXXXXXXXXXX) val fields = clazz.getDeclaredFields var list = new ListBuffer[String]() var kvlist = new ListBuffer[KeyValue]()// if (fields != null && fields.size > 0) { for (field <- fields) { field.setAccessible(true) val column = field.getName list.append(column) } } val newList = list.sortWith(_ < _) val ik = new ImmutableBytesWritable(Bytes.toBytes(rowkey)) for(column <- newList){ val declaredField: Field = line.getClass.getDeclaredField(column) declaredField.setAccessible(true) val value = declaredField.get(line).toString val kv: KeyValue = new KeyValue( Bytes.toBytes(rowkey), Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)) kvlist.append(kv) } (ik, kvlist) }
datafame方式:
val tmpData: RDD[(ImmutableBytesWritable, util.LinkedList[KeyValue])] = df.rdd.map( line =>{ val rowkey = line.getAs[String]("vintime") val ik = new ImmutableBytesWritable(Bytes.toBytes(rowkey)) var linkedList = new util.LinkedList[KeyValue]() for (column <- columns) { val kv: KeyValue = new KeyValue( Bytes.toBytes(rowkey), Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(line.getAs[String](column))) linkedList.add(kv) } (ik, linkedList) }) val result: RDD[(ImmutableBytesWritable, KeyValue)] = tmpData.flatMapValues( s => { val values: Iterator[KeyValue] = JavaConverters.asScalaIteratorConverter(s.iterator()).asScala values } ).sortBy(x =>x._1 , true)
仔細觀察可以發現,其實兩者都做了排序操作,但是即便經過(1)步驟后仍然報錯:
Added a key not lexically larger than previous
那么在回想一下之前寫hfile的要求:
rowkey+列族+列都要有序,那么如果出現數據的重復,也不算是有序的操作!
因為,做一下數據的去重:
val key: RDD[(String, TransferTime)] = data.reduceByKey((x, y) => y)
val unitData: RDD[TransferTime] = key.map(line => line._2)
果然,這樣解決了:Added a key not lexically larger than previous這個異常
但是會報如下另一個異常:
Kryo serialization failed: Buffer overflow
這個是因為在對一些類做kryo序列化時候,數據量的緩存大小超過了默認值,做一下調整即可
sparkConf.set("spark.kryoserializer.buffer.max" , "256m")
sparkConf.set("spark.kryoserializer.buffer" , "64m")
完整代碼:

/** * Created by angel */ object WriteTransferTime extends WriteToHbase{ /** * @param data 要插入的數據 * @param tableName 表名 **/ override def bulkLoadData(data: RDD[Any], tableName: String , columnFamily:String): Unit = { val bean: RDD[TransferTime] = data.map(line => line.asInstanceOf[TransferTime]) val map: RDD[(String, TransferTime)] = bean.map(line => (line.vintime , line)) val key: RDD[(String, TransferTime)] = map.reduceByKey((x, y) => y) val map1: RDD[TransferTime] = key.map(line => line._2) val by1: RDD[TransferTime] = map1.sortBy(f => f.vintime) val listData: RDD[(ImmutableBytesWritable, ListBuffer[KeyValue])] = by1.map { line => val rowkey = line.vintime val clazz = Class.forName("com.dongfeng.code.Bean.message.TransferTime") val fields = clazz.getDeclaredFields var list = new ListBuffer[String]() var kvlist = new ListBuffer[KeyValue]()// if (fields != null && fields.size > 0) { for (field <- fields) { field.setAccessible(true) val column = field.getName list.append(column) } } val newList = list.sortWith(_ < _) val ik = new ImmutableBytesWritable(Bytes.toBytes(rowkey)) for(column <- newList){ val declaredField: Field = line.getClass.getDeclaredField(column) declaredField.setAccessible(true) val value = declaredField.get(line).toString val kv: KeyValue = new KeyValue( Bytes.toBytes(rowkey), Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)) kvlist.append(kv) } (ik, kvlist) } val result: RDD[(ImmutableBytesWritable, KeyValue)] = listData.flatMapValues( s => { val values: Iterator[KeyValue] = s.iterator values } ) val resultDD: RDD[(ImmutableBytesWritable, KeyValue)] = result.sortBy(x =>x._1 , true) WriteToHbaseDB.hfile_load(result , TableName.valueOf(tableName) , columnFamily) } }

def hfile_load(rdd:RDD[(ImmutableBytesWritable , KeyValue)] , tableName: TableName , columnFamily:String): Unit ={ //聲明表的信息 var table: Table = null try{ val startTime = System.currentTimeMillis() println(s"開始時間:-------->${startTime}") //生成的HFile的臨時保存路徑 val stagingFolder = "hdfs://cdh1:9000/hfile/"+tableName+new Date().getTime// table = connection.getTable(tableName) //如果表不存在,則創建表 if(!admin.tableExists(tableName)){ createTable(tableName , columnFamily) } //開始導入 val job = Job.getInstance(config) job.setJobName("DumpFile") job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) job.setMapOutputValueClass(classOf[KeyValue]) rdd.sortBy(x => x._1, true).saveAsNewAPIHadoopFile( stagingFolder, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration) val load = new LoadIncrementalHFiles(config) val regionLocator = connection.getRegionLocator(tableName) HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator) load.doBulkLoad(new Path(stagingFolder), table.asInstanceOf[HTable]) // load.doBulkLoad(new Path(stagingFolder) , connection.getAdmin , table , regionLocator) val endTime = System.currentTimeMillis() println(s"結束時間:-------->${endTime}") println(s"花費的時間:----------------->${(endTime - startTime)}ms") }catch{ case e:IOException => e.printStackTrace() }finally { if (table != null) { try { // 關閉HTable對象 table.close(); } catch { case e: IOException => e.printStackTrace(); } } if (connection != null) { try { //關閉hbase連接. connection.close(); } catch { case e: IOException => e.printStackTrace(); } } } } }