一、概述
HBase官方提供了基於Mapreduce的批量數據導入工具:Bulk load和ImportTsv。關於Bulk load大家能夠看下我還有一篇博文。
通常HBase用戶會使用HBase API導數,可是假設一次性導入大批量數據,可能占用大量Regionserver資源,影響存儲在該Regionserver上其它表的查詢,本文將會從源代碼上解析ImportTsv數據導入工具。探究怎樣高效導入數據到HBase。
二、ImportTsv介紹
ImportTsv是Hbase提供的一個命令行工具。能夠將存儲在HDFS上的自己定義分隔符(默認\t)的數據文件。通過一條命令方便的導入到HBase表中,對於大數據量導入很有用,當中包括兩種方式將數據導入到HBase表中:
第一種是使用TableOutputformat在reduce中插入數據;
另外一種是先生成HFile格式的文件,再運行一個叫做CompleteBulkLoad的命令,將文件move到HBase表空間文件夾下。同一時候提供給client查詢。
三、源代碼解析
本文基於CDH5 HBase0.98.1,ImportTsv的入口類是org.apache.hadoop.hbase.mapreduce.ImportTsv
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
if (hfileOutPath != null) {
if (!admin.tableExists(tableName)) {
LOG.warn(format("Table '%s' does not exist.", tableName));
// TODO: this is backwards. Instead of depending on the existence of a table,
// create a sane splits file for HFileOutputFormat based on data sampling.
createTable(admin, tableName, columns);
}
HTable table = new HTable(conf, tableName);
job.setReducerClass(PutSortReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
if (mapperClass.equals(TsvImporterTextMapper.class)) {
job.setMapOutputValueClass(Text.class);
job.setReducerClass(TextSortReducer.class);
} else {
job.setMapOutputValueClass(Put.class);
job.setCombinerClass(PutCombiner.class);
}
HFileOutputFormat.configureIncrementalLoad(job, table);
} else {
if (mapperClass.equals(TsvImporterTextMapper.class)) {
usage(TsvImporterTextMapper.class.toString()
+ " should not be used for non bulkloading case. use "
+ TsvImporterMapper.class.toString()
+ " or custom mapper whose value type is Put.");
System.exit(-1);
}
// No reducers. Just write straight to table. Call initTableReducerJob
// to set up the TableOutputFormat.
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
job.setNumReduceTasks(0);
}
從ImportTsv.createSubmittableJob方法中推斷參數BULK_OUTPUT_CONF_KEY開始。這步直接影響ImportTsv的Mapreduce作業終於以哪種方式入HBase庫
假設不為空而且用戶沒有自己定義Mapper實現類(參數importtsv.mapper.class)時,則使用PutSortReducer,當中會對Put排序,假設每行記錄有非常多column,則會占用Reducer大量的內存資源進行排序。
Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); job.setOutputFormatClass(TableOutputFormat.class);假設為空,調用TableMapReduceUtil.initTableReducerJob初始化TableOutputformat的Reducer輸出。此方式不須要使用Reducer,由於直接在mapper的Outputformat中會批量的調用Put API將數據提交到Regionserver上(相當於並行的運行HBase Put API)
四、實戰
1、使用TableOutputformat的Put API上傳數據,非bulk-loading
$ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c <tablename> <hdfs-inputdir>2、使用bulk-loading生成StoreFiles(HFile)
step1、生成Hfile
$ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c -Dimporttsv.bulk.output=hdfs://storefile-outputdir <tablename> <hdfs-data-inputdir>step2、完畢導入
$ bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>
五、總結
在使用ImportTsv時。一定要注意參數importtsv.bulk.output的配置,通常來說使用Bulk output的方式對Regionserver來說更加友好一些,這樣的方式載入數據差點兒不占用Regionserver的計算資源。由於僅僅是在HDFS上移動了HFile文件,然后通知HMaster將該Regionserver的一個或多個region上線。
