我們在《通過BulkLoad快速將海量數據導入到Hbase[Hadoop篇]》文中介紹了一種快速將海量數據導入Hbase的一種方法,而本文將介紹如何在Spark上使用Scala編寫快速導入數據到Hbase中的方法。這里將介紹兩種方式:第一種使用Put普通的方法來倒數;第二種使用Bulk Load API。關於為啥需要使用Bulk Load本文就不介紹,更多的請參見《通過BulkLoad快速將海量數據導入到Hbase[Hadoop篇]》。
文章目錄
使用org.apache.hadoop.hbase.client.Put來寫數據
使用 org.apache.hadoop.hbase.client.Put 將數據一條一條寫入Hbase中,但是和Bulk加載相比效率低下,僅僅作為對比。
import
org.apache.spark.
_
import
org.apache.spark.rdd.NewHadoopRDD
import
org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import
org.apache.hadoop.hbase.client.HBaseAdmin
import
org.apache.hadoop.hbase.mapreduce.TableInputFormat
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.hbase.HColumnDescriptor
import
org.apache.hadoop.hbase.util.Bytes
import
org.apache.hadoop.hbase.client.Put;
import
org.apache.hadoop.hbase.client.HTable;
val
conf
=
HBaseConfiguration.create()
val
tableName
=
"/iteblog"
conf.set(TableInputFormat.INPUT
_
TABLE, tableName)
val
myTable
=
new
HTable(conf, tableName);
var
p
=
new
Put();
p
=
new
Put(
new
String(
"row999"
).getBytes());
p.add(
"cf"
.getBytes(),
"column_name"
.getBytes(),
new
String(
"value999"
).getBytes());
myTable.put(p);
myTable.flushCommits();
|
批量導數據到Hbase
批量導數據到Hbase又可以分為兩種:(1)、生成Hfiles,然后批量導數據;
(2)、直接將數據批量導入到Hbase中。
批量將Hfiles導入Hbase
現在我們來介紹如何批量將數據寫入到Hbase中,主要分為兩步:
(1)、先生成Hfiles;
(2)、使用 org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles 將事先生成Hfiles導入到Hbase中。
實現的代碼如下:
import
org.apache.spark.
_
import
org.apache.spark.rdd.NewHadoopRDD
import
org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import
org.apache.hadoop.hbase.client.HBaseAdmin
import
org.apache.hadoop.hbase.mapreduce.TableInputFormat
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.hbase.HColumnDescriptor
import
org.apache.hadoop.hbase.util.Bytes
import
org.apache.hadoop.hbase.client.Put;
import
org.apache.hadoop.hbase.client.HTable;
import
org.apache.hadoop.hbase.mapred.TableOutputFormat
import
org.apache.hadoop.mapred.JobConf
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable
import
org.apache.hadoop.mapreduce.Job
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import
org.apache.hadoop.hbase.KeyValue
import
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import
org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
val
conf
=
HBaseConfiguration.create()
val
tableName
=
"iteblog"
val
table
=
new
HTable(conf, tableName)
conf.set(TableOutputFormat.OUTPUT
_
TABLE, tableName)
val
job
=
Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)
// Generate 10 sample data:
val
num
=
sc.parallelize(
1
to
10
)
val
rdd
=
num.map(x
=
>{
val
kv
:
KeyValue
=
new
KeyValue(Bytes.toBytes(x),
"cf"
.getBytes(),
"c1"
.getBytes(),
"value_xxx"
.getBytes() )
(
new
ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})
// Save Hfiles on HDFS
rdd.saveAsNewAPIHadoopFile(
"/tmp/iteblog"
, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)
//Bulk load Hfiles to Hbase
val
bulkLoader
=
new
LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(
new
Path(
"/tmp/iteblog"
), table)
|
運行完上面的代碼之后,我們可以看到Hbase中的iteblog表已經生成了10條數據,如下:
hbase(main):020:0> scan
'iteblog'
ROW COLUMN+CELL
\x00\x00\x00\x01 column=cf:c1, timestamp=1425128075586, value=value_xxx
\x00\x00\x00\x02 column=cf:c1, timestamp=1425128075586, value=value_xxx
\x00\x00\x00\x03 column=cf:c1, timestamp=1425128075586, value=value_xxx
\x00\x00\x00\x04 column=cf:c1, timestamp=1425128075586, value=value_xxx
\x00\x00\x00\x05 column=cf:c1, timestamp=1425128075586, value=value_xxx
\x00\x00\x00\x06 column=cf:c1, timestamp=1425128075675, value=value_xxx
\x00\x00\x00\x07 column=cf:c1, timestamp=1425128075675, value=value_xxx
\x00\x00\x00\x08 column=cf:c1, timestamp=1425128075675, value=value_xxx
\x00\x00\x00\x09 column=cf:c1, timestamp=1425128075675, value=value_xxx
\x00\x00\x00\x0A column=cf:c1, timestamp=1425128075675, value=value_xxx
|
直接Bulk Load數據到Hbase
這種方法不需要事先在HDFS上生成Hfiles,而是直接將數據批量導入到Hbase中。與上面的例子相比只有微小的差別,具體如下:
將
rdd.saveAsNewAPIHadoopFile(
"/tmp/iteblog"
, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)
|
修改成:
rdd.saveAsNewAPIHadoopFile(
"/tmp/iteblog"
, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
|
完整的實現如下:
import
org.apache.spark.
_
import
org.apache.spark.rdd.NewHadoopRDD
import
org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import
org.apache.hadoop.hbase.client.HBaseAdmin
import
org.apache.hadoop.hbase.mapreduce.TableInputFormat
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.hbase.HColumnDescriptor
import
org.apache.hadoop.hbase.util.Bytes
import
org.apache.hadoop.hbase.client.Put;
import
org.apache.hadoop.hbase.client.HTable;
import
org.apache.hadoop.hbase.mapred.TableOutputFormat
import
org.apache.hadoop.mapred.JobConf
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable
import
org.apache.hadoop.mapreduce.Job
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import
org.apache.hadoop.hbase.KeyValue
import
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import
org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
val
conf
=
HBaseConfiguration.create()
val
tableName
=
"iteblog"
val
table
=
new
HTable(conf, tableName)
conf.set(TableOutputFormat.OUTPUT
_
TABLE, tableName)
val
job
=
Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)
// Generate 10 sample data:
val
num
=
sc.parallelize(
1
to
10
)
val
rdd
=
num.map(x
=
>{
val
kv
:
KeyValue
=
new
KeyValue(Bytes.toBytes(x),
"cf"
.getBytes(),
"c1"
.getBytes(),
"value_xxx"
.getBytes() )
(
new
ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})
// Directly bulk load to Hbase/MapRDB tables.
rdd.saveAsNewAPIHadoopFile(
"/tmp/iteblog"
, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
|
其他
在上面的例子中我們使用了 saveAsNewAPIHadoopFile API來將數據寫到HBase中;事實上,我們還可以通過使用 saveAsNewAPIHadoopDataset API來實現同樣的目標,我們僅僅需要將下面代碼
rdd.saveAsNewAPIHadoopFile(
"/tmp/iteblog"
, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
|
修改成
job.getConfiguration.set(
"mapred.output.dir"
,
"/tmp/iteblog"
)
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
|
剩下的和和之前完全一致。
彭佳君已瀏覽......

