在Spark上通過BulkLoad快速將海量數據導入到Hbase


我們在《通過BulkLoad快速將海量數據導入到Hbase[Hadoop篇]》文中介紹了一種快速將海量數據導入Hbase的一種方法,而本文將介紹如何在Spark上使用Scala編寫快速導入數據到Hbase中的方法。這里將介紹兩種方式:第一種使用Put普通的方法來倒數;第二種使用Bulk Load API。關於為啥需要使用Bulk Load本文就不介紹,更多的請參見《通過BulkLoad快速將海量數據導入到Hbase[Hadoop篇]》


如果想及時了解 Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

使用org.apache.hadoop.hbase.client.Put來寫數據

使用 org.apache.hadoop.hbase.client.Put 將數據一條一條寫入Hbase中,但是和Bulk加載相比效率低下,僅僅作為對比。

import org.apache.spark. _
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
  
val conf = HBaseConfiguration.create()
val tableName = "/iteblog"
conf.set(TableInputFormat.INPUT _ TABLE, tableName)
  
val myTable = new HTable(conf, tableName);
var p = new Put();
p = new Put( new String( "row999" ).getBytes());
p.add( "cf" .getBytes(), "column_name" .getBytes(), new String( "value999" ).getBytes());
myTable.put(p);
myTable.flushCommits();

批量導數據到Hbase

批量導數據到Hbase又可以分為兩種:(1)、生成Hfiles,然后批量導數據;
(2)、直接將數據批量導入到Hbase中。

批量將Hfiles導入Hbase

現在我們來介紹如何批量將數據寫入到Hbase中,主要分為兩步:
(1)、先生成Hfiles;
(2)、使用 org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles 將事先生成Hfiles導入到Hbase中。
實現的代碼如下:

import org.apache.spark. _
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
  
val conf = HBaseConfiguration.create()
val tableName = "iteblog"
val table = new HTable(conf, tableName)
  
conf.set(TableOutputFormat.OUTPUT _ TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)
  
// Generate 10 sample data:
val num = sc.parallelize( 1 to 10 )
val rdd = num.map(x = >{
     val kv : KeyValue = new KeyValue(Bytes.toBytes(x), "cf" .getBytes(), "c1" .getBytes(), "value_xxx" .getBytes() )
     ( new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})
  
// Save Hfiles on HDFS
rdd.saveAsNewAPIHadoopFile( "/tmp/iteblog" , classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)
  
//Bulk load Hfiles to Hbase
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad( new Path( "/tmp/iteblog" ), table)

運行完上面的代碼之后,我們可以看到Hbase中的iteblog表已經生成了10條數據,如下:

hbase(main):020:0> scan 'iteblog'
ROW                                                 COLUMN+CELL
  \x00\x00\x00\x01                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
  \x00\x00\x00\x02                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
  \x00\x00\x00\x03                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
  \x00\x00\x00\x04                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
  \x00\x00\x00\x05                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
  \x00\x00\x00\x06                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
  \x00\x00\x00\x07                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
  \x00\x00\x00\x08                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
  \x00\x00\x00\x09                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
  \x00\x00\x00\x0A                                   column=cf:c1, timestamp=1425128075675, value=value_xxx

直接Bulk Load數據到Hbase

這種方法不需要事先在HDFS上生成Hfiles,而是直接將數據批量導入到Hbase中。與上面的例子相比只有微小的差別,具體如下:

rdd.saveAsNewAPIHadoopFile( "/tmp/iteblog" , classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf) 

修改成:

rdd.saveAsNewAPIHadoopFile( "/tmp/iteblog" , classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())

完整的實現如下:

import org.apache.spark. _
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
  
val conf = HBaseConfiguration.create()
val tableName = "iteblog"
val table = new HTable(conf, tableName)
  
conf.set(TableOutputFormat.OUTPUT _ TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)
  
// Generate 10 sample data:
val num = sc.parallelize( 1 to 10 )
val rdd = num.map(x = >{
     val kv : KeyValue = new KeyValue(Bytes.toBytes(x), "cf" .getBytes(), "c1" .getBytes(), "value_xxx" .getBytes() )
     ( new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})
  
// Directly bulk load to Hbase/MapRDB tables.
rdd.saveAsNewAPIHadoopFile( "/tmp/iteblog" , classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())

其他

在上面的例子中我們使用了 saveAsNewAPIHadoopFile API來將數據寫到HBase中;事實上,我們還可以通過使用 saveAsNewAPIHadoopDataset API來實現同樣的目標,我們僅僅需要將下面代碼

rdd.saveAsNewAPIHadoopFile( "/tmp/iteblog" , classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())

修改成

job.getConfiguration.set( "mapred.output.dir" , "/tmp/iteblog" )
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)

剩下的和和之前完全一致。

 

彭佳君已瀏覽......


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM