在第一次建立Hbase表的時候,我們可能需要往里面一次性導入大量的初始化數據。我們很自然地想到將數據一條條插入到Hbase中,或者通過MR方式等。但是這些方式不是慢就是在導入的過程的占用Region資源導致效率低下,所以很不適合一次性導入大量數據。本文將針對這個問題介紹如何通過Hbase的BulkLoad方法來快速將海量數據導入到Hbase中。
總的來說,使用 Bulk Load 方式由於利用了 HBase 的數據信息是按照特定格式存儲在 HDFS 里的這一特性,直接在 HDFS 中生成持久化的 HFile 數據格式文件,然后完成巨量數據快速入庫的操作,配合 MapReduce 完成這樣的操作,不占用 Region 資源,不會產生巨量的寫入 I/O,所以需要較少的 CPU 和網絡資源。Bulk Load 的實現原理是通過一個 MapReduce Job 來實現的,通過 Job 直接生成一個 HBase 的內部 HFile 格式文件,用來形成一個特殊的 HBase 數據表,然后直接將數據文件加載到運行的集群中。與使用HBase API相比,使用Bulkload導入數據占用更少的CPU和網絡資源。
實現原理
Bulkload過程主要包括三部分:
1、從數據源(通常是文本文件或其他的數據庫)提取數據並上傳到HDFS。抽取數據到HDFS和Hbase並沒有關系,所以大家可以選用自己擅長的方式進行,本文就不介紹了。
2、利用MapReduce作業處理實現准備的數據 。這一步需要一個MapReduce作業,並且大多數情況下還需要我們自己編寫Map函數,而Reduce函數不需要我們考慮,由HBase提供。該作業需要使用rowkey(行鍵)作為輸出Key;KeyValue、Put或者Delete作為輸出Value。MapReduce作業需要使用HFileOutputFormat2
來生成HBase數據文件。為了有效的導入數據,需要配置HFileOutputFormat2
使得每一個輸出文件都在一個合適的區域中。為了達到這個目的,MapReduce作業會使用Hadoop的TotalOrderPartitioner
類根據表的key值將輸出分割開來。HFileOutputFormat2
的方法configureIncrementalLoad()
會自動的完成上面的工作。
3、告訴RegionServers數據的位置並導入數據。這一步是最簡單的,通常需要使用LoadIncrementalHFiles
(更為人所熟知是completebulkload
工具),將文件在HDFS上的位置傳遞給它,它就會利用RegionServer將數據導入到相應的區域。
整個過程圖如下:
代碼實現
上面我們已經介紹了Hbase的BulkLoad方法的原理,我們需要寫個Mapper和驅動程序,實現如下:
使用MapReduce生成HFile文件
public
class
IteblogBulkLoadMapper
extends
Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{
protected
void
map(LongWritable key, Text value, Context context)
throws
IOException, InterruptedException {
String line = value.toString();
String[] items = line.split(
"\t"
);
ImmutableBytesWritable rowKey =
new
ImmutableBytesWritable(items[
0
].getBytes());
Put put =
new
Put(Bytes.toBytes(items[
0
]));
//ROWKEY
put.addColumn(
"f1"
.getBytes(),
"url"
.getBytes(), items[
1
].getBytes());
put.addColumn(
"f1"
.getBytes(),
"name"
.getBytes(), items[
2
].getBytes());
context.write(rowkey, put);
}
}
驅動程序
public
class
IteblogBulkLoadDriver {
public
static
void
main(String[] args)
throws
IOException, ClassNotFoundException, InterruptedException {
final
String SRC_PATH=
"hdfs://iteblog:9000/user/iteblog/input"
;
final
String DESC_PATH=
"hdfs://iteblog:9000/user/iteblog/output"
;
Configuration conf = HBaseConfiguration.create();
Job job=Job.getInstance(conf);
job.setJarByClass(IteblogBulkLoadDriver.
class
);
job.setMapperClass(IteblogBulkLoadMapper.
class
);
job.setMapOutputKeyClass(ImmutableBytesWritable.
class
);
job.setMapOutputValueClass(Put.
class
);
job.setOutputFormatClass(HFileOutputFormat2.
class
);
HTable table =
new
HTable(conf,
"blog_info"
);
HFileOutputFormat2.configureIncrementalLoad(job,table,table.getRegionLocator());
FileInputFormat.addInputPath(job,
new
Path(SRC_PATH));
FileOutputFormat.setOutputPath(job,
new
Path(DESC_PATH));
System.exit(job.waitForCompletion(
true
)?
0
:
1
);
}
}
通過BlukLoad方式加載HFile文件
public
class
LoadIncrementalHFileToHBase {
public
static
void
main(String[] args)
throws
Exception {
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(configuration);
LoadIncrementalHFiles loder =
new
LoadIncrementalHFiles(configuration);
loder.doBulkLoad(
new
Path(
"hdfs://iteblog:9000/user/iteblog/output"
),
new
HTable(conf,
"blog_info"
));
}
}
由於Hbase的BulkLoad方式是繞過了Write to WAL,Write to MemStore及Flush to disk的過程,所以並不能通過WAL來進行一些復制數據的操作。后面我將會再介紹如何通過Spark來使用Hbase的BulkLoad方式來初始化數據。
BulkLoad的使用案例
1、首次將原始數據集載入 HBase- 您的初始數據集可能很大,繞過 HBase 寫入路徑可以顯著加速此進程。
2、遞增負載 - 要定期加載新數據,請使用 BulkLoad 並按照自己的理想時間間隔分批次導入數據。這可以緩解延遲問題,並且有助於您實現服務級別協議 (SLA)。但是,壓縮觸發器就是 RegionServer 上的 HFile 數目。因此,頻繁導入大量 HFile 可能會導致更頻繁地發生大型壓縮,從而對性能產生負面影響。您可以通過以下方法緩解此問題:調整壓縮設置,確保不觸發壓縮即可存在的最大 HFile 文件數很高,並依賴於其他因素,如 Memstore 的大小 觸發壓縮。
3、數據需要源於其他位置 - 如果當前系統捕獲了您想在 HBase 中包含的數據,且因業務原因需要保持活動狀態,您可從系統中將數據定期批量加載到 HBase 中,以便可以在不影響系統的前提下對其執行操作。
生成HFile程序說明:
①. 最終輸出結果,無論是map還是reduce,輸出部分key和value的類型必須是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
②. 最終輸出部分,Value類型是KeyValue 或Put,對應的Sorter分別是KeyValueSortReducer或PutSortReducer。
③. MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只適合一次對單列族組織成HFile文件。好像最新的版本可以多個列族.
④. MR例子中HFileOutputFormat.configureIncrementalLoad(job, table);自動對job進行配置。TotalOrderPartitioner是需要先對key進行整體排序,然后划分到每個reduce中,保證每一個reducer中的的key最小最大值區間范圍,是不會有交集的。因為入庫到HBase的時候,作為一個整體的Region,key是絕對有序的。
⑤. MR例子中最后生成HFile存儲在HDFS上,輸出路徑下的子目錄是各個列族。如果對HFile進行入庫HBase,相當於move HFile到HBase的Region中,HFile子目錄的列族內容沒有了。
說明: 因為在創建HBase表的時候,默認只有一個Region,只有等到這個Region的大小超過一定的閾值之后,才會進行split, 所以為了利用完全分布式加快生成HFile和導入HBase中以及數據負載均衡,所以需要在創建表的時候預先創建分區,可以查閱相關資料(關於HBase調優的資料), 而進行分區時要利用startKey與endKey進行rowKey區間划分(因為導入HBase中,需要rowKey整體有序),所以在導入之前,自己先寫一個MapReduce的Job求最小與最大的rowKey, 即startKey與endKey.
3、說明與注意事項:
(1)HFile方式在所有的加載方案里面是最快的,不過有個前提——數據是第一次導入,表是空的。如果表中已經有了數據。HFile再導入到hbase的表中會觸發split操作。
(2)最終輸出結果,無論是map還是reduce,輸出部分key和value的類型必須是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
否則報這樣的錯誤:
java.lang.IllegalArgumentException: Can't read partitions file ... Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable
(3)最終輸出部分,Value類型是KeyValue 或Put,對應的Sorter分別是KeyValueSortReducer或PutSortReducer,這個 SorterReducer 可以不指定,因為源碼中已經做了判斷:
if (KeyValue.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(KeyValueSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); } else { LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); }
(4) MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只適合一次對單列族組織成HFile文件,多列簇需要起多個 job,不過新版本的 Hbase 已經解決了這個限制。
(5) MR例子中最后生成HFile存儲在HDFS上,輸出路徑下的子目錄是各個列族。如果對HFile進行入庫HBase,相當於move HFile到HBase的Region中,HFile子目錄的列族內容沒有了。
(6)最后一個 Reduce 沒有 setNumReduceTasks 是因為,該設置由框架根據region個數自動配置的。
(7)下邊配置部分,注釋掉的其實寫不寫都無所謂,因為看源碼就知道configureIncrementalLoad方法已經把固定的配置全配置完了,不固定的部分才需要手動配置。
public class HFileOutput { //job 配置 public static Job configureJob(Configuration conf) throws IOException { Job job = new Job(configuration, "countUnite1"); job.setJarByClass(HFileOutput.class); //job.setNumReduceTasks(2); //job.setOutputKeyClass(ImmutableBytesWritable.class); //job.setOutputValueClass(KeyValue.class); //job.setOutputFormatClass(HFileOutputFormat.class); Scan scan = new Scan(); scan.setCaching(10); scan.addFamily(INPUT_FAMILY); TableMapReduceUtil.initTableMapperJob(inputTable, scan, HFileOutputMapper.class, ImmutableBytesWritable.class, LongWritable.class, job); //這里如果不定義reducer部分,會自動識別定義成KeyValueSortReducer.class 和PutSortReducer.class job.setReducerClass(HFileOutputRedcuer.class); //job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat.configureIncrementalLoad(job, new HTable( configuration, outputTable)); HFileOutputFormat.setOutputPath(job, new Path()); //FileOutputFormat.setOutputPath(job, new Path()); //等同上句 return job; } public static class HFileOutputMapper extends TableMapper<ImmutableBytesWritable, LongWritable> { public void map(ImmutableBytesWritable key, Result values, Context context) throws IOException, InterruptedException { //mapper邏輯部分 context.write(new ImmutableBytesWritable(Bytes()), LongWritable()); } } public static class HFileOutputRedcuer extends Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> { public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { //reducer邏輯部分 KeyValue kv = new KeyValue(row, OUTPUT_FAMILY, tmp[1].getBytes(), Bytes.toBytes(count)); context.write(key, kv); } } }
4、Refer:
1、Hbase幾種數據入庫(load)方式比較
http://blog.csdn.net/kirayuan/article/details/6371635
2、MapReduce生成HFile入庫到HBase及源碼分析
http://blog.pureisle.net/archives/1950.html
3、MapReduce生成HFile入庫到HBase