1.簡介
將數據插入HBase表中的方法很多,我們可以通過TableOutputFormat以Mapreduce on HBase的方式將數據插入,也可以單純的使用客戶端API將數據插入。但是以上方法效率並不高。
而使用BulkLoad特性能夠利用MR計算框架將源數據直接生成內部的hfile格式,然后可以在不重啟HBase集群的場景下數據load到對應表中。
BulkLoad方法能夠將數據快速的load到HBase中,打一個“生動”的比方:
使用API就好比將飯一口一口喂給HBase,而使用BulkLoad就相當於切開HBase的肚子直接將食物放到胃中。(重口味的比方)
2.限制
1. Bulkload方式由於並不是通過API來插入數據而是直接生成HFile文件所以並不會記錄WAL日志。如果集群直接是通過Replication機制來備份的話(Replication機制是通過讀取WAL日志來備份數據的),那么另外一個集群上就不會有Bulkload的數據。
3.前准備
1. 在HBase客戶端建立一張目標表:
hbase(main):003:0> create 'bulkload_text','f' 0 row(s) in 0.5230 seconds
2. 准備數據源,在hdfs上建立一個文本文件,假設取名為bulkload_reouse_file.txt內容如下:
rowKey1|a_1|b_1 rowKey2|a_2|b_2 rowKey3|a_3|b_3 rowKey4|a_4|b_4 rowKey5|a_5|b_5
並將其保存在hdfs上,作為bulkload的數據源:
[hadoop@xufeng-3 bulkload]$ hadoop fs -ls /testdata/bulkload 16/07/30 16:33:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 1 items -rw-r--r-- 1 hadoop supergroup 80 2016-07-30 16:33 /testdata/bulkload/bulkload_reouse_file.txt
4. 實現步驟
使用Bulkload需要經過兩個大步驟。
1.通過MR計算框架進行HFile文件的生成。
2.加載HFile文件到集群(表)
1.通過MR計算框架進行HFile文件的生成
命令格式:
HADOOP_CLASSPATH=`$HBASE_HOME/bin/hbase classpath` hadoop jar $HBASE_HOME/lib/hbase-server-version.jar importtsv -Dimporttsv.bulk.output=<輸出文件夾路徑> -Dimporttsv.separator=<分割符> -Dimporttsv.columns=<key和列映射> <目標表> <數據源路徑>
如本例中結合自身接群可以寫成:
HADOOP_CLASSPATH=`/opt/hadoop/hbase/bin/hbase classpath` hadoop jar /opt/hadoop/hbase/lib/hbase-server-1.0.0-cdh5.4.2.jar importtsv -Dimporttsv.bulk.output=hdfs://ns1/testdata/bulkload/result -Dimporttsv.separator='|' -Dimporttsv.columns=HBASE_ROW_KEY,f:a,f:b bulkload_text hdfs://ns1/testdata/bulkload/bulkload_reouse_file.txt
其中 -Dimporttsv.columns=HBASE_ROW_KEY,f:a,f:b的意思是通過'|'分隔符號分割的第一個元素作為rowkey,第二個元素作為f:a列值,第三個元素作為f:b值。
結果:
在目標路徑文件夾下生成了f列的hife文件:
[hadoop@xufeng-3 bulkload]$ hadoop fs -ls /testdata/bulkload/result 16/07/30 16:56:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 2 items -rw-r--r-- 1 hadoop supergroup 0 2016-07-30 16:46 /testdata/bulkload/result/_SUCCESS drwxr-xr-x - hadoop supergroup 0 2016-07-30 16:53 /testdata/bulkload/result/f
2.加載HFile文件到集群(表)
生成的HFile必須盡快的去load到表中,在第一個步驟中HFile生成的規則是一個region一個文件,如果不盡快加載一旦線上的region發生分裂就會造成加載的性能下降。
命令格式:
HADOOP_CLASSPATH=`$HBASE_HOME/bin/hbase classpath` hadoop jar $HBASE_HOME/lib/hbase-server-version.jar completebulkload <生成的HFile路徑> <目標表名稱>
如本例中結合自身接群可以寫成:
HADOOP_CLASSPATH=`/opt/hadoop/hbase/bin/hbase classpath` hadoop jar /opt/hadoop/hbase/lib/hbase-server-1.0.0-cdh5.4.2.jar completebulkload hdfs://ns1/testdata/bulkload/result bulkload_text
結果:
文件中信息按照key和列的映射關系load到了表中
hbase(main):002:0> scan 'bulkload_text' ROW COLUMN+CELL rowKey1 column=f:a, timestamp=1469911544908, value=a_1 rowKey1 column=f:b, timestamp=1469911544908, value=b_1 rowKey2 column=f:a, timestamp=1469911544908, value=a_2 rowKey2 column=f:b, timestamp=1469911544908, value=b_2 rowKey3 column=f:a, timestamp=1469911544908, value=a_3 rowKey3 column=f:b, timestamp=1469911544908, value=b_3 rowKey4 column=f:a, timestamp=1469911544908, value=a_4 rowKey4 column=f:b, timestamp=1469911544908, value=b_4 rowKey5 column=f:a, timestamp=1469911544908, value=a_5 rowKey5 column=f:b, timestamp=1469911544908, value=b_5 5 row(s) in 0.3520 seconds
5. 源碼中bulkload注釋信息:
Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir> Imports the given input directory of TSV data into the specified table. The column names of the TSV data must be specified using the -Dimporttsv.columns option. This option takes the form of comma-separated column names, where each column name is either a simple column family, or a columnfamily:qualifier. The special column name HBASE_ROW_KEY is used to designate that this column should be used as the row key for each imported record. You must specify exactly one column to be the row key, and you must specify a column name for every column that exists in the input data. Another special column HBASE_TS_KEY designates that this column should be used as timestamp for each record. Unlike HBASE_ROW_KEY, HBASE_TS_KEY is optional. You must specify atmost one column as timestamp key for each imported record. Record with invalid timestamps (blank, non-numeric) will be treated as bad record. Note: if you use this option, then 'importtsv.timestamp' option will be ignored. By default importtsv will load data directly into HBase. To instead generate HFiles of data to prepare for a bulk data load, pass the option: -Dimporttsv.bulk.output=/path/for/output Note: if you do not use this option, then the target table must already exist in HBase Other options that may be specified with -D include: -Dimporttsv.skip.bad.lines=false - fail if encountering an invalid line '-Dimporttsv.separator=|' - eg separate on pipes instead of tabs -Dimporttsv.timestamp=currentTimeAsLong - use the specified timestamp for the import -Dimporttsv.mapper.class=my.Mapper - A user-defined Mapper to use instead of TsvImporterMapper For performance consider the following options: -Dmapred.map.tasks.speculative.execution=false -Dmapred.reduce.tasks.speculative.execution=false
6. 總結
bulkload提供一種快速的數據加載方法,使得外部數據以資源最小化的方式加載的HBase中。
當你在加載數據的時候遇到如下情況,那么使用bulkload或許是一個不錯的選擇:
- You needed to tweak your MemStores to use most of the memory.
- You needed to either use bigger WALs or bypass them .
- Your compaction and flush queues are in the hundreds.
- Your GC is out of control because your inserts range in the MBs.
- Your latency goes out of your SLA when you import data.
7. 參考
http://blog.cloudera.com/blog/2013/09/how-to-use-hbase-bulk-loading-and-why/